```java int b = 1; int c = 2 int a = b + c; ```

Reactive Programming - 1

Mobile Feb 01, 2021
Phần 1: Tổng quan về Reactive Programing

Mở đầu

Trong lập trình mobile, làm quá nhiều việc trên luồng chính(main thread) là điều cấm kỵ vì nó sẽ gây cho ứng dụng bị đơ, lag dẫn đến trải nghiệm người dùng không tốt, chính vì vậy lập trình viên nên giảm tải những task vụ bên trong luồng chính sang những luồng khác. Và hôm nay mình sẽ giới thiệu cho mọi người một cách để thực hiện điều đó, nó được gọi là Reactive Programming(ReactiveX). Vậy nó là gì? Chúng ta sẽ tìm hiểu nó ở ngay bên dưới nhé.

Reactive Programming là gì?

  • Reactive Programming(ReactiveX) là lập trình với các luồng dữ liệu bất đồng bộ. Mọi thứ bạn thấy là một luồng dữ liệu không đồng bộ (asynchronous data stream), cái mà có thể quan sát được và một hành động sẽ được thực hiện khi nó phát ra các giá trị.
  • ReactiveX là sự kết hợp của 3 mô hình Observer, Iterator và Functional programming
Observer pattern: Observer pattern là một software design pattern mà trong đó mỗi Object được gọi là một subject, duy trì một danh sách những object phụ thuộc vào nó được gọi là observer. Và nó sẽ thông báo khi có bất cứ sự thay đổi nào trong trạng thái của object.
Iterator pattern : truy cập những phần tử của một đối tượng một cách tuần tự mà không làm lộ cách thức thể hiện của chúng.
Functional Programming:  hay lập trình chức năng là kiểu lập trình mà function được chọn làm đơn vị thao tác cơ bản. Functional programming tập trung vào sử dụng tính năng của các hàm và tránh sử dụng các biến cũng như thay đổi giá trị của chúng.

Lấy một ví dụ đơn giản:

int b = 1;
int c = 2
int a = b + c;

Khi áp dụng ReactiveX vào thì khi b hoặc c thay đổi thì a cũng sẽ thay đổi theo, đây là điều khác biệt giữa cách lập trình hiện tại và ReactiveX.

Các thành phần

RX = Observable + Observer + Scheduler (Operator/ Trasformation)
  • Observable : Là luồng dữ liệu thực hiện một số công việc và phát ra dữ liệu, bao gồm 2 loại:
HOT Observable
COLD Observable
  • Observers + Subscribe: Là thành phần đi kèm không thể thiếu của Observable. Nó nhận dữ liệu được phát ra bởi Observable.
onSubscribe(): Phương thức sẽ được gọi khi một Observer đăng ký vào Observable.
onNext(): Phương thức này sẽ được gọi khi Observable bắt đầu phát ra dữ liệu.
onError(): Trong trường hợp có lỗi, phương thức onError() sẽ được gọi.
onComplete(): Khi một Observable hoàn thành việc phát ra dữ liệu, onComplete() sẽ được gọi.
  • Schedulers : Scheduler quyết định thread mà Observable sẽ phát ra dữ liệu và trên thread nào Observer sẽ nhận dữ liệu
Schedulers.io: Không dùng đến CPU, nó thực hiện các công việc chuyên sâu như networks call, đọc đĩa/file, database, … Nó duy trì được pool của thread.
AndroidSchedulers.mainThread: Nó cung cấp quyền truy cập đến Main Thread/UI Thread. Thông thường cập nhật giao diện hay tương tác với người dùng sẽ xảy ra trên luồng này. Chúng ta không thực hiện bất kì công việc chuyên sâu trên luồng này vì nó sẽ làm cho ứng dụng bị crash hoặc ANR.
Schedulers.newThread: Sử dụng cái này thì mỗi thread sẽ được tạo ra mỗi lần nhiệm vụ được xếp lịch. Thường thì không khuyến cáo sử dụng cách này trừ khi công việc rất dài. Thread được tạo qua newThread() sẽ không được dùng lại.
Schedulers.computation: Đòi hỏi nhiều CPU như xử lý dữ liệu lớn, xử lý bitmap, … Số lượng các thread được tạo ra bằng cách sử dụng Scheduler này hoàn toàn phụ thuộc vào số lõi CPU.
Schedulers.single: Scheduler này sẽ thực hiện tất cả các nhiệm vụ theo thứ tự tuần tự mà chúc được add vào. Việc này có thể cần thiết trong một số trường hợp cần tuần tự.
Schedulers.immediate: Thực hiện nhiệm vụ ngay lập tức một cách đồng bộ bằng cách chặn main thread.
Schedulers.trampoline: Nó thực hiện các nhiệm vụ theo Last In - First Out. Tất cả các nhiệm vụ được xếp lịch sẽ được thực hiện từng cái một bằng cách giới hạn số lượng các background thread thành một.
Schedulers.from: Cách này cho phép tạo ra một Scheduler từ một Executor bởi giới hạn số lượng các thread được tạo ra. Khi thread pool bị full, các nhiệm vụ sẽ xếp hàng đợi.
  • Operator: Hỗ trợ cho việc sửa đổi dữ liệu được phát ra bởi Observable trước khi observer nhận chúng. Phần này mình sẽ nói chi tiết hơn ở mục Subject
subscribeOn, observeOn
map, flatMap, concatMap
throttle, debounce
filter, take…
merge, combineLaster, zip...

Các loại Observable

Như ở trên mình có nhắc tới, Observable được chia thành 2 loại:

  • HOT Observable:
Dữ liệu được tạo bên ngoài Observable
Không quan tâm có ai subscribe nó vẫn thực hiện công việc
  • COLD
Dữ liệu được tạo bên trong Observable
Phải có subscribe thì nó mới thực công việc
  • Ví dụ: Hãy so sánh xem một 1 film trên Netflix và xem chiếu rạp và bạn là 1 người quan sát (Observer)

Bạn đi xem ở rạp và xuất chiếu là lúc 4 giờ và rạp chiếu phim sẽ luôn chiếu phim lúc 4 giờ cho dù bạn có đến hay không? Bạn hoặc ai đó đến muộn thì chỉ xem được từ thời điểm bạn đến => HOT Observable

Bạn xem film trên netflix, bộ phim chỉ được phát khi bạn nhấn nút play và bạn sẽ nhận được toàn bộ nội dung bộ film => COLD Observable

Mình sẽ demo 1 đoạn code nho nhỏ để giải thích rõ hơn về phần này:

fun randomNumber(): Int {
	println("Start random")
	return Random.nextInt();
}

println("===From Hot===")
val hotObservable = Observable.just(randomNumber())
println("Start subscribing")
hotObservable.subscribe { println("hot1: $it") }
hotObservable.subscribe { println("hot2: $it") }

println("===From Cold===")
val coldObservable = Observable.fromCallable { randomNumber() }
println("Start subscribing")
coldObservable.subscribe { println("cold1: $it") }
coldObservable.subscribe { println("cold2: $it") }

Kết quả

===From Hot===
Start random
Start subscribing
hot1: 1122774128
hot2: 1122774128
===From Cold===
Start subscribing
Start random
cold1: -198463554
Start random
cold2: 396841398

Trong thực tế chúng ta thường sử dụng 1 số Observable sau

  • Observable/ Observer: Được sử dụng nhiều nhất, có thể phát ra item hoặc không.
  • Single/ SingleObserver: Chỉ phát ra duy nhất một item hoặc một error
  • Maybe/ MaybeObserver: Có thể phát ra một hoặc không item nào.
  • Flowable/ Observer: Được sử dụng khi số lượng item lớn (10k+)
  • Completable/ CompletableObserver: Không phát ra item nào, chỉ thông báo hoàn thành hay chưa.

Các cách tạo Observables

Như trên trang document của ReactiveX thì có rất nhiều cách để khởi tạo 1 Observable. Tuy nhiên trong phạm vi bài viết này mình không đi chi tiết vào từng cách khởi tạo. Mình chỉ lấy 1 vài cách trong thực tế thường hay sử dụng

  • create: Tạo Observable có thể phát ra dữ liệu trong quá trình xử lý bằng cách gọi onNext() tới Observer
  • just: Tạo một Observable phát ra item cụ thể
  • error: Tạo ra một Observable lỗi
  • interval: Tạo một Observable lặp lại vô hạn với thời gian định sẵn .
  • fromCallable: Khi có observer đăng ký, Callable truyền vào được gọi và trả về giá trị hoặc ném ra ngoại lệ.

ví dụ:

var emitter: ObservableEmitter<Int>? = null
val observableCreate = Observable.create<Int> {
		emitter = it
		it.onNext(10)
}
observableCreate
		.map { it * 2 }
		.subscribe {
		 	println("subscribe $it")
		}
emitter!!.onNext(20)

Kết quả của đoạn code trên là:

subscribe: 20
subscribe: 40

Lý thuyết của ReactiveX mình xin được kết thúc ở đây. Phần này sẽ trang bị cho các bạn những lý thuyết cốt lõi của ReactiveX và hy vọng các bạn sẽ đọc được đến đây để chúng ta sẽ tiếp tục hành trình chinh phục ReactiveX.

Phần 2: Chinh phục Reactive Programming

Tags

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.
int b = 1; int c = 2 int a = b + c;