Kết hợp Flow trong Coroutine Android

Android Mar 18, 2021

Giới thiệu

Trong Kotlin, Coroutine chỉ là một phần lập lịch của RxJava nhưng giờ đây với sự kết hợp với Flow, nó có thể thay thế cho RxJava trong Android.

1. Flow là gì?

Flow trong Kotlin là cách tốt nhất xử lý luồng dữ liệu bất đồng bộ nhưng thực thi một cách tuần tự.

Tích hợp Flow vào project Android

Thêm vào build.gradle của ứng dụng

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3"

và trong phần bổ sung build.gradle của dự án

classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:1.3.61"
Flow là nguồn dữ liệu lạnh

Trong RxJava, mỗi Observables đại diện cho một luồng dữ liệu, và phần thân của nó không được thực thi cho đến khi nó được đăng ký (subcribed) bởi một người đăng ký (subcriber) và sẽ nhận được dữ liệu khi nguồn phát ra dữ liệu.

Flow hoạt động tương tự như vậy, nó cũng không nhận được dữ liệu cho đến khi collect dữ liệu.

runBlocking {
   coroutinesFlow.foo()
   println("Delay 2s")
   delay(2000)
   coroutinesFlow.foo().collect {
       println(it)
   }
}

fun foo(): Flow<Int> = flow {
   println("Flow started")
   emit(1)
   delay(500)
   emit(2)
}
Delay 2s
Flow started
1
2
Flow cancellation

Flow tuân thủ việc các nguyên tắc cancellation chung của coroutines. Việc collect của flow chỉ có thể bị hủy khi và chỉ khi flow đang bị suspend (chẳng hạn như gặp hàm delay) và ngược lại flow không thể bị hủy.

Khi chúng ta sẽ sử dụng scope launch{} sẽ trả về 1 Job, từ Job này chúng ta có thể cancel scope đó, và nếu chúng ta đặt flow bên scope và khi cancel thì flow cũng bị cancel theo.

Ví dụ 1.

fun cancelFlow() = runBlocking {
   val job = launch(Dispatchers.Default) {
       flow {
           (1..9).forEach {
               delay(1000)
               emit(it)
           }
       }.collect {
           println("value $it")
       }
   }
   delay(5000)
   job.cancel()
}
value 1
value 2
value 3
value 4

Kết quả chính xác sẽ là in ra từ value 1 -> value 9, nhưng do chúng ta delay 5s rồi cancel job nên flow cũng bị cancel từ đó luôn.

Từ ví dụ này chúng ta biết được các scope lồng nhau thì khi scope cha bị cancel thì các scope con cũng bị cancel theo

Ví dụ 2

fun cancelScope() = runBlocking {
   val startTime = System.currentTimeMillis()
   val job = launch(Dispatchers.Default) {
       var nextPrintTime = startTime
       var i = 0
       while (i < 5) {
           if (System.currentTimeMillis() >= nextPrintTime) {
               println("job: I'm sleeping ${i++} ...")
               nextPrintTime += 1000L
           }
       }
   }
   delay(1300L)
   println("main: I'm tired of waiting!")
   job.cancel()
   println("main: Now I can quit.")
}
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
main: I'm tired of waiting!
main: Now I can quit.
job: I'm sleeping 2 ...
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...

Mặc dù đã gọi cancel để huỷ coroutine rồi nhưng vòng while kia vẫn chạy bất chấp, đó là bởi vì khi gọi cancel thì nó sẽ set lại 1 property là isActive từ true sang false, và mọi hàm support từ coroutine như delay, emit đều check xem isActive còn bằng true hay không? Nếu bằng  false thì nó sẽ huỷ bỏ tiến trình đó luôn.

Vậy sửa đoạn code trên ta chỉ cần thay điều kiện while (i < 5) sang while (isActive) là được.

Qua ví dụ này chúng ta biết được để xem 1 coroutine bị cancel hay chưa, chỉ cần check property isActive

Các cách tạo Flow
  • flowOf () - Nó được sử dụng để tạo luồng từ một tập giá trị nhất định.
flowOf(4, 2, 5, 1, 7).onEach { delay(400) }
  • asFlow () - Đây là một hàm mở rộng giúp chuyển đổi kiểu thành các luồng.
(1..5).asFlow().onEach{ delay(300)}
  • flow {} - Đây là một scope trình tạo để xây dựng các luồng tùy ý như các ví dụ trên.
  • channelFlow {} (cold stream) - Cách này tạo ra luồng dữ liệu bằng cách sử dụng hàm send, giống như onNext trong RxJava Ví dụ:
            channelFlow {
                send(1)
            }.flowOn(Dispatchers.Default)

2. Các operator trong Flow

Phần này chúng ta sẽ đi sau vào sức mạnh thực sự của Flow, đó là các toán tử (operators), về cơ bản thì các toán tử này khá giống với các toán tử bên RxJava, nên các bạn đã có kinh nghiệm với RxJava rồi thì phần này coi như các bạn đã nắm chắc luôn rồi.

take(): sử dụng để lấy 1 số lượng item nhất định
(1..5).asFlow()
   .take(2)
   .collect {
       println("$it")
   }
1 
2
transform(): dùng để biến đổi, bỏ qua hoặc tăng số lượng item được phát ra từ nguồn (filter + map)
(1..2).asFlow()
   .transform { emit(it + 1) }
   .collect {
       println("$it")
   }
2
3
map(): Biến đổi giá trị hoặc kiểu trá trị từ nguồn
(1..2).asFlow()
   .map { it.toString() }
   .collect {
       println(it)
   }
//biến đổi kiểu Int -> String
1
2
filter(): lọc các điều kiện thoả mãn và loại các giá trị không thoả mãn
(1..2).asFlow()
   .filter { it > 1 }
   .collect {
       println("$it")
   }
2
onEach(): khi ta muốn một action gì đó trước khi giá trị được emit
(1..2).asFlow()
   .onEach { delay(1000) }
   .collect {
       println("$it")
   }

Mỗi phần tử sẽ bị delay 1s trước khi được in ra

reduce(): Cộng dồn các giá trị được emit ra với giá trị khởi tạo ban đầu bằng item đầu tiên, chú ý nếu list giá trị bằng rỗng sẽ throw ra exception
val sum = (1..2).asFlow()
   .reduce(operation = { a, b ->
       a + b
   })
println("SUM = $sum")
SUM = 3
fold(): hàm này khá giống với reduce() bên trên, duy chỉ khác một chỗ là hàm này cho phép init giá trị khởi tạo đầu vào.
val sum = (1..2).asFlow()
   .fold(initial = 3, operation = { a, b ->
       a + b
   })
println("SUM = $sum")
SUM = 6
toList(), toSet(): chuyển từ flow sang List, Set
first(): lấy phần từ đầu tiên, hoặc có thể truyền điều kiện vào để lấy phần từ đầu tiên thoả mãn. Cũng chú ý là sẽ throw exception khi nguồn rỗng.
listOf<Int>().asFlow().first()
single(), singleOrNull(): Kiểm tra nguồn chắc chắc chỉ có 1 giá trị và lấy ra giá trị đó.

single(): sẽ throw nếu có nhiều hơn 1 giá trị

singleOrNull(): sẽ không throw nếu có nhiều hơn 1 giá trị

combine(): gộp các giá trị của 2 flow vào với nhau với điều kiện bắt buộc 2 flow đã từng emit giá trị
val flow1 = (1..2).asFlow()
val flow2 = listOf<Int>().asFlow()
flow1.combine(flow2) { d1, d2 ->
   Pair(d1, d2)
}.collect {
   println(it)
}

Kết quả: Không in ra giá trị nào vì flow2 chưa emit item nào cả

val flow1 = (1..2).asFlow()
val flow2 = listOf<Int>(3).asFlow()
flow1.combine(flow2) { d1, d2 ->
   Pair(d1, d2)
}.collect {
   println(it)
}
(1, 3)
(2, 3)
zip(): Giống như combile(), nhưng zip sẽ gộp các giá trị của 2 flow dựa vào thứ tự emit
val flow1 = (1..2).asFlow()
val flow2 = listOf<Int>(3).asFlow()
flow1.zip(flow2) { d1, d2 ->
   Pair(d1, d2)
}.collect {
   println(it)
}
(1,3)
flatMapConcat(): xử lý các giá trị nhận được từ flow một cách đồng bộ
private fun flatConcat() = runBlocking{
   fun requestFlow(i: Int): Flow<String> = flow {
       emit("$i: First")
       delay(500)
       emit("$i: Second")
   }

   (1..3).asFlow().onEach { delay(100) }
       .flatMapConcat { requestFlow(it) }
       .collect { value ->
           println(value)
       }
}
1: First
1: Second
2: First
2: Second
flatMapLastest(): khi có một emit giá trị mới thì nó sẽ cancel flow trước đó.
private fun flatMapLatsted() = runBlocking{
   fun requestFlow(i: Int): Flow<String> = flow {
       emit("$i: First")
       delay(500)
       emit("$i: Second")
   }

   (1..2).asFlow().onEach { delay(100) }
       .flatMapLatest { requestFlow(it) }
       .collect { value ->
           println(value)
       }
}
1: First
2: First
2: Second
flatMapMerge(): item nào đến trước sẽ được emit trước
private fun flatMapMerge() = runBlocking{
   fun requestFlow(i: Int): Flow<String> = flow {
       emit("$i: First")
       delay(500)
       emit("$i: Second")
   }

   (1..2).asFlow().onEach { delay(100) }
       .flatMapMerge { requestFlow(it) }
       .collect { value ->
           println(value)
       }
}
1: First
2: First
1: Second
2: Second

Trên đây là những operator mọi người sẽ thường xuyên sử dụng nhất, hy vọng các bạn sẽ hiểu hết về nó để có thể áp dụng được vào thực tế.

3. Context trong Flow

Code trong khổi flow{} sẽ chạy trên context của scope chứa flow

private fun contextFlow() = runBlocking {
   flow {
       println("[${Thread.currentThread().name}] emit value")
       emit(1)
   }.collect {
       println("[${Thread.currentThread().name}]collect value")
   }
}
[main] emit value
[main] collect value

Giả sử chúng ta truyền Dispatcher.IO vào runBlocking thì kết quả sẽ chạy trên luồng IO.

Giả sử chúng ta muốn thay đổi luồng chạy trong flow{} thì sẽ làm như nào?

Có thể sử dụng withContext hoặc launch với Dispatcher chúng ta muốn chạy và kết quả là sẽ có exception xảy ra như đoạn code bên dưới.

flow {
   emit(1)
   withContext(Dispatchers.IO){
       emit(2)
   }
}.collect {
   println(it)
}

Bởi vì code trong khối flow sẽ giữ và bảo toàn context, có nghĩa là nó đã chạy với context nào rồi sẽ chỉ chạy với context đó và chúng ta không thể thay đổi nó.

Nhưng đừng lo chúng ta sẽ giải quyết được vấn đề này với toán tử flowOn

Toán tử flowOn

Toán tử flowOn sẽ cho phép code trong khối flow được chạy trên bất kỳ context nào ta muốn.

private fun contextFlow() = runBlocking {
   flow {
       println("[${Thread.currentThread().name}] emit value")
       emit(1)
   }.flowOn(Dispatchers.IO)
       .collect {
           println("[${Thread.currentThread().name}] collect value")
       }
}
[DefaultDispatcher-worker-1] emit value
[main] collect value

Chú ý: flowOn không thay đổi context của coroutine đang chạy mà nó tạo ra 1 coroutine khác trên context truyền vào flowOn

Flow Exceptions

Code trong khối flow có thể throw exception nếu xảy ra exception trong khối.

private fun throwException() = runBlocking {
   flow {
       emit(3)
       emit(3 / 0)
       emit(3 / 1)
   }.flowOn(Dispatchers.IO)
       .collect {
           println(it)
       }
}
3
Exception in thread "main" java.lang.ArithmeticException: / by zero.

Cũng giống như RxJava khi xảy ra Exception, flow cũng dừng tiến trình lại và để giải quyết vấn đề này chúng ta có thể sử dụng try/catch hoặc sử dụng catch trong flow

private fun throwException() = runBlocking {
   flow {
       emit(3)
       emit(3 / 0)
       emit(3 / 1)
   }.flowOn(Dispatchers.IO)
       .catch { ex ->
           if (ex is ArithmeticException) {
               emit(-1)
           }
       }
       .collect {
           println(it)
       }
}
3
-1
Flow LauchIn

Mặc định, khi run một flow trong 1 scope thì những dòng code phải đợi flow chạy xong thì mới được thực thi.

private fun launcIn() = runBlocking {
   (1..2).asFlow()
       .onEach {
           delay(1000)
       }.collect {
           println(it)
       }
   println("DONE")
}
1
2
DONE

Giả sử flow trên chạy mất rất nhiều thời gian thì các công việc phía sau flow sẽ phải đợi cho đến khi flow kết thúc. Điều này thực sự không tốt nếu các công việc phía sau không cần quan tâm kết quả của flow.

Chính vì điều này mà launchIn ra đời để giải quyết bài toán trên. Khi sử dụng toán tử này nó sẽ chạy các công việc trong scope chứa flow song song, ngoài ra toán tử này sẽ return Job, thứ mà ta có thể cancel được flow.

rivate fun launcIn() = runBlocking {
   val job = (1..20).asFlow()
       .onEach {
           delay(1000)
           println(it)
       }.launchIn(this)
   println("delay 5s")
   delay(5000)
   println("cancel")
   job.cancel()
}
delay 5s
1
2
3
4
cancel
Flow completion

Đôi khi chúng ta muốn biết khi nào thì flow kết thúc tiến trình thì 2 cách dưới đây sẽ giúp mọi người.
Cách 1. sử dụng khối finally

fun completeWithFinally() = runBlocking {
   try {
       (1..2).asFlow().collect { value -> println(value) }
   } finally {
       println("Done")
   }
}
1
2
Done

Cách 2. Toán tử onComplete

fun completeWithOperator() = runBlocking {
   (1..2).asFlow()
       .onCompletion {
           println("Done")
       }
       .collect { value -> println(value) }
}
Tương tự như trên

Ngoài ra khi sử dụng onComplete thì có một tham số để chúng ta biết được tiến trình có xảy ra exception hay không? Nếu tham số này là null thì không có lỗi xảy ra và ngược lại thì có.

Bài viết cũng khá dài rồi, mình xin dừng tại đây, hy vọng các bạn đã đọc đến đây có một cách khác, hướng khác để giải quyết bài toán bất đồng bộ trong Android.
Bài viết này vẫn chưa thực sự trọn vẹn khi còn thiếu phần về Channels nữa,
Phần tiếp theo mình sẽ nói thêm về Channels để mọi người hiểu thêm về nó.

Còn bây giờ xin chào và hẹn gặp lại.

Channels (Phần tiếp theo)

Tags

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.