Reactive Programming - 2

Mobile Feb 01, 2021
Phần 2: Chinh phục Reactive Programing

Phần 1 mình có giới thiệu về ReactiveX bằng các khái niệm và giới thiệu sơ qua về nó, bạn có thể đọc lại ở link.

Sang phần 2 này, chúng ta sẽ đi sâu hơn về ReactiveX, trong đó sẽ tập trung vào những phần mà trong thực tế mọi người thường xuyên sử dụng nhất. Bây giờ thì bắt đầu thôi.

Subject

  • Subject là 1 kiểu đặc biệt của Observable khi nó vừa có thể là lắng nghe dữ liệu hoặc là phát ra dữ liệu.
Các loại subject
PublishSubject:
BehaviorSubject
ReplaySubject
AsyncSubject

PublishSubject: Nó emits tất cả subsequent items của nguồn Observable tại thời điểm subscription.

val publishSubject = PublishSubject.create<Int>()
publishSubject.onNext(2)
publishSubject.subscribe {
	  println("subject1: $it")
}
publishSubject.onNext(3)
publishSubject.onNext(4)
publishSubject.subscribe {
	  println("subject2: $it")
}
publishSubject.onNext(5)

Kết quả

subject1: 3
subject1: 4
subject1: 5
subject2: 5

Từ kết quả có thể thấy subject1 và subject1 sẽ không nhận được giá trị 1 vì tại thời điểm emit giá trị 1 subject1 và subject2 chưa subscribe.

BehaviorSubject: Nó emits item gần nhất được emit khi một observer subscribe đến nó.

val behaviorSubject = BehaviorSubject.createDefault(1)
    behaviorSubject.onNext(2)
    behaviorSubject
        .subscribe {
            println("behaviorSubject $it")
        }
    behaviorSubject.onNext(3)
    behaviorSubject.onNext(4)

Kết quả

behaviorSubject 2
behaviorSubject 3
behaviorSubject 4

Từ kết quả có thấy thấy BehaviorSubject sẽ nhận được giá trị được emit gần nhất. Trong trường hợp không có giá trị cuối cùng thì sẽ không vào onNext().

ReplaySubject: Nó emits tất cả subsequent items của Observable, mặc cho subscriber subscribe khi nào.

val replaySubject = ReplaySubject.create<Int>()
    replaySubject.subscribe {
        println("replaySubject1: $it")
    }
    replaySubject.onNext(1)
    replaySubject.onNext(2)
    replaySubject.onNext(3)
    replaySubject.subscribe {
    println("replaySubject2: $it")
}

Kết quả:

replaySubject1: 1
replaySubject1: 2
replaySubject1: 3
replaySubject2: 1
replaySubject2: 2
replaySubject2: 3

Kết quả cho thấy subscribe 1 và subscribe 2 đều nhận được toàn bộ giá trị cho dù trước hay sau khi subscribe.

AsyncSubject: Nó chỉ emits giá trị cuối cùng của source Observable (chỉ giá trị cuối cùng), để giá trị cuối cùng được emit thì điều kiện là Observable gọi onComplete.

val asyncSubject = AsyncSubject.create<Int>()
    asyncSubject.onNext(1)
    asyncSubject.onNext(2)
    asyncSubject.subscribe {
    	println("relaySubject: $it")
    }
    asyncSubject.onNext(3)
    asyncSubject.onComplete()

Kết quả

relaySubject: 3

Chú ý: Nếu chúng ta không gọi onComplete thì kết quả bên trên sẽ không được in ra.

=> Vậy là chúng ta đã tìm hiểu xong Subject trong ReactiveX Java. Về bản chất thì các subject này đều giống nhau cho các ngôn ngữ hỗ trợ ReactiveX, tuy nhiên vẫn có 1 chút khác biệt nho nhỏ giữa các ngôn ngữ.

Ví dụ: ReplaySubject bên swift khi khởi tạo sẽ truyền 1 bộ nhớ đệm vào và nó chỉ replay số item bằng với số bộ nhớ đệm này.

Operator

  • subscribeOn: Operator này sẽ chỉ định Observable chạy trên đâu (main, io, computation, newThread….) và nó chỉ nhận duy nhất 1 lần. Nếu chúng ta thực hiện operator này nhiều thì nó sẽ chỉ nhận thằng đầu tiên.
  • observeOn: Operator này sẽ báo cho Operator tiếp theo nhận kết quả ở đâu. Không giống như subscribeOn thì ObserveOn sẽ sử dụng được nhiều.
  • throttle: Throttle giới hạn số lần gọi hàm trong một khoảng thời gian. Ví dụ khi một hàm dùng throttle, throttle sẽ gọi hàm này nhiều nhất 1 lần mỗi x mili giây với x là khoảng thời gian mà ta cài đặt.
val observable = Observable.just(1, 2, 3, 4)
    .concatMap { Observable.just(it).delay(300, TimeUnit.MILLISECONDS) }
    .throttleFirst(1, TimeUnit.SECONDS)
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ chỉ in ra giá trị 1

  • debounce: Không giống như Throttle, Debounce sẽ giữ trigger rate của event listener là 0 kể cả khi event được thực hiện. Và sau một khoảng thời gian mà event không được thực hiện, event listener mới được trigger và hàm được gọi.
Observable.just(1, 2, 3, 4)
    .concatMap { Observable.just(it) }
    .debounce(1, TimeUnit.SECONDS)
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ chỉ in ra giá trị 4 sau 1s

  • filter: Kiểm tra điều kiện đầu vào, nếu đúng thì chạy các  operator tiếp theo, ngược lại thì sẽ không làm gỉ cả hoặc vào onComplete.
Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.computation())
    .filter { it % 2 == 0 }
    .subscribe {
    	println("result: $it")
    }

Kết quả:

result: 2
result: 4
  • take/ skip
Take định nghĩa số item được lấy
Observable.just(1, 2, 3, 4)
    .take(1)
    .subscribeOn(Schedulers.computation())
    .filter { it % 2 == 0 }
    .subscribe {
   		println("result: $it")
    }

Kết quả sẽ là giá trị 1

Skip định nghĩa số item được bỏ qua
Observable.just(1, 2, 3, 4)
    .skip(2)
    .subscribeOn(Schedulers.computation())
    .filter { it % 2 == 0 }
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ in ra là:  3, 4

  • map: Biến đổi mỗi item được phát ra (ví dụ từ Int -> String...)
Observable.just(1, 2, 3, 4)
    .map { it * it }
    .map { it.toString() }
    .subscribe {
    	println("result: $it")
    }

Chú ý là operator này chạy đồng bộ

  • flatMap: Biến đổi danh sách những items từ Observable vào Observables khác.
Observable.just(1, 2, 3, 4)
    .flatMap { Observable.just(it).delay(300,TimeUnit.MILLISECONDS) }
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ in ra là: 1,4,3,2 hoặc 1 bộ nào khác vì operator này chạy bất đồng bộ.

  • concatMap: Operator này là sự kết hợp của Map và Flat map khi nó sẽ chạy đồng bộ và biến đổi item từ Observable này sang Observable khác.
Observable.just(1, 2, 3, 4)
    .concatMap { Observable.just(it).delay(300,TimeUnit.MILLISECONDS) }
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ là: 1, 2, 3, 4

  • switchMap/ flatMapLatest(Swift): Khi một phần tử mới được emit, thì nó sẽ huỷ (unsubscribe) Observable được tạo ra trước đó và sẽ chạy Observable mới.
Observable.just(1, 2, 3, 4)
    .switchMap { Observable.just(it).delay(300,TimeUnit.MILLISECONDS) }
    .subscribe {
    	println("result: $it")
    }

Kết quả sẽ là: 4

  • merge/ combineLatest/ zip
Merge: Hàm merge giúp chúng ta thực hiện đồng thời nhiều Observable và trả về riêng lẻ các kết quả của Observable sau khi thực hiện xong Observable đó.

Kết quả sẽ là 1, 5, 2, 6, 3, 4, 7 hoặc 1 thứ tự khác.

combineLatest: Hàm này sẽ gộp 2 hay nhiều nguồn dữ liệu lại và phát ra tính hiệu bất kể khi nào có giá trị mới được cập nhật từ các nguồn dữ liệu.

Kết quả là: (1: 5), (2: 5), (3, 5), (3, 6), (3, 7)

Sự khác nhau giữa combineLatest và merge là thằng combineLatest yêu cầu lần đầu các source đầu vào phải emit item, từ lần sau trở đi chỉ cần 1 trong các source emit item là được.

zip: Gộp 2 hay nhiều source với nhau theo thứ tự emit item.

Kết quả là: (1, 5), (2, 6), (3, 7)

Vậy tạo sao giá trị 8 không được in ra? Câu trả lời là giá trị 8 là index thứ 4, nhưng source1 chỉ có index thứ 3. Nên giá trị 8 sẽ không được in ra.

CompositeDisposable

  • Phía bên trên mình quên không nói với mọi người là mỗi khi subscribe tới Observable thì sẽ trả cho chúng ta 1 Disposable, thằng này sẽ giúp chúng ta unsubscribe Observable bằng việc gọi dispose()

Vậy có 1 câu hỏi là nếu trong ứng dụng của chúng ta có rất nhiều Disposable thì chúng ta sẽ xử lý như thế nào?

Câu trả lời là CompositeDisposable, nó sẽ quản lý các Disposable đơn lẻ, vì nếu chúng ta muốn unsubscribe toàn bộ chỉ cần sử dụng composite.dispose() là xong.

Tổng kết

Trên đây là những kinh nghiệm mình tìm hiểu được về ReactiveX trong quá trình làm dự án, mặc dù nó không thể bao quát hết được khả năng cũng như độ rộng lớn của ReactiveX nhưng nếu các bạn nắm được những kiến thức trên cũng đã giảm được kha khá công sức code cũng như giải pháp khi gặp các vấn đề liên quan đến bất đồng bộ.

Bài viết của mình đến đây là hết, cảm ơn các bạn đã theo dõi.

Hẹn gặp lại!

Tags

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.