溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RxJava線程模型是什么

發布時間:2021-12-01 11:29:30 來源:億速云 閱讀:116 作者:iii 欄目:移動開發

本篇內容介紹了“RxJava線程模型是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

入手體驗

RxJava 中切換線程非常簡單,例如最常見的異步線程處理,主線程回調的模型,可以很優雅的用如下代碼來做處理:

Observable.just("magic")         .map(str -> doExpensiveWork(str))         .subscribeOn(Schedulers.io())         .observeOn(AndroidSchedulers.mainThread())         .subscribe(obj -> System.out.print(String.valueOf(obj)));

如上,subscribeOn(Schedulers.io())保證了doExpensiveWork  函數發生在io線程,observeOn(AndroidSchedulers.mainThread())保證了subscribe 回調發生在Android  的主線程。所以,這自然而然的引出了本文的關鍵點,subscribeOn與observeOn到底區別在哪里?

流程淺析

要想回答上面的問題,我們首先需要對RxJava的流程有大體了解,一個Observable從產生,到最終執行subscribe,中間可以經歷n個變換,每次變換會產生一個新的Observable,就像奧運開幕的傳遞火炬一樣,每次火炬都會傳遞到下一個人,最終點燃圣火的是***一個火炬手,即最終執行subscribe操作的是***一個Observable,所以,每個Observable之間必須有聯系,這種關系在代碼中的體現就是,每個變換后的Observable都會持有上一個Observable  中OnSubscribe對象的引用(Observable.create 函數所需的參數),最終  Observable的subscribe函數中的關鍵代碼是這一句:

observable.onSubscribe.call(subscriber)

這個observable就是***一個變換后的observable,那這個onSubscribe對象是誰呢?如何一個observable沒有經過任何變換,直接執行了subscribe,當然就是我們在create中傳入的onSubscribe,  但如果中間經過map、reduce等變換,這個onSubscribe顯然就應該是創建變換后的observable傳入的參數,大部分變換最終都交由lift函數:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {     return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator)); }

所以,上文所提到的onSubscribe對象應該是OnSubscribeLift的實例,而這個OnSubscribeLift所接收的兩個參數,一個是前文提到的,上一個Observable中的OnSubscribe對象,而operator則是每種變換的一個抽象接口。再來看這個OnSubscribeLift對象的call方法:

public void call(Subscriber<? super R> o) { Subscriber<? super T> st = operator.call(o); parent.call(st); }

operator與parent就是前文提到的兩個參數,可見,operator接口會擁有call方法,接收一個Subscriber,  并返回一個新的Subscriber對象,而接下來的parent.call(st)是回調上一層observable的onSubscribe的call方法,這樣如此繼續,一直到一個onSubscribe截止。這樣我們首先理清了一條線路,就是從***一個observable的subscribe后,OnSubscribe調用的順序是從后向前的。

這就帶來了另外一個疑問,從上面的代碼可以看到,在執行parent.call(st)之前已經執行了operator.call(o)方法,如果call方法里就把變換的操作執行了的話,那似乎變換也會是從后向前傳遞的呀?所以這個operator.call方法絕對不是我們想象的那么簡單。這里以map操作符為例,看源碼:

public Subscriber<? super T> call(final Subscriber<? super R> s) {     MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);     o.add(parent);     return parent; }

這里果然沒有執行變換操作,而是生成一個MapSubscriber對象,這里需要注意MapSubscriber構造函數的兩個參數,transformer是真正要執行變換的Func1對象,這很好理解,那對于o這個Subscriber是哪一個呢?什么意思?舉個?:

o1 -> o2 -> subscribe(Subscriber s0);

o1 經過map操作變為o2, o2執行subscribe操作,如果你理解上文可以知道,這段流程的執行順序為s0會首先傳遞給o2,  o2的lift操作會將s0轉換為s1傳遞給o1, 那么在生成o2 這個map操作的 call(final Subscriber<? super R>  s)方法中,s值得是誰呢?是s0還是s1呢?答案應該是s0,也就是它的下一級Subscriber,原因很簡單,call方法中返回的MapSubscriber對象parent才是s1.

所以,我們來看一下MapSubscriber的onNext方法做了什么呢?

public void onNext(T t) {     R result;     result = transformer.call(t);     s.onNext(result); }

很明了,首先執行變換,然后回調下一級的onNext函數。

至此,一個observable從初始,到變換,再到subscribe,我們已經對整個流程有了大體了解。簡單來講一個o1經過map變為o2,可以理解為o2對o1做了一層hook,會經歷兩次流程,首先是onSubscribe對象的call流程會從o2流向o1,我們簡稱流程a,到達o1后,o1又會出發Subscriber的onNext系列流程,簡稱流程b,流程b才是真正執行變換的流程,其走向是從o1流向o2.理解了這個,我們就可以更近一步的理解RxJava中線程的模型了。

tip: 一定要深刻理解流程a與流程b的區別。這對下文理解線程切換至關重要。

切換方式

RxJava對線程模型的抽象是Scheduler,這是一個抽象類,包含一個抽象方法:

public abstract Worker createWorker();

這個Worker是何方神圣呢?它其實是Scheduler的抽象內部類,主要 包含兩個抽象方法:

1) public abstract Subscription schedule(Action0 action); 2) public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);

可見,Worker才是線程執行的主力,兩個方法一個用與立即執行任務,另一個用與執行延時任務。而Scheduler是Worker的工廠,用于對外提供Worker。

RxJava中共有兩種常見的方式來切換線程,分別是subscribeOn變換與observeOn變換,這兩者接收的參數都是Scheduler。接下來從源碼層面來對比這兩者的差別。

subscribeOn

首先看subscribeOn的部分

public final Observable<T> subscribeOn(Scheduler scheduler) {     return create(new OperatorSubscribeOn<T>(this, scheduler)); }

create一個新的Observable,傳入的參數是OperatorSubscribeOn,很明顯這應該是OnSubscribe的一個實現,關注這個OperatorSubscribeOn的call實現方法:

public void call(final Subscriber<? super T> subscriber) {      final Worker inner = scheduler.createWorker();              inner.schedule(new Action0() {            @Override             public void call() {                 final Thread t = Thread.currentThread();                                  Subscriber<T> s = new Subscriber<T>(subscriber) {                     @Override                     public void onNext(T t) {                         subscriber.onNext(t);                     }                                          ...                                      };                                  source.unsafeSubscribe(s);             }     }); }

這里比較關鍵了,上文提到了流程a與流程b,首先明確一點,這個call方法的執行時機是流程a,也就是說這個call發生在流程b之前,call方法里首先通過外部傳入的scheduler創建Worker  &ndash;  inner對象,接著在inner中執行了一段代碼,神奇了,Action0中call方法這段代碼就在worker線程中執行了,也就是此刻程進行了切換。注意***一句代碼source.unsafeSubscribe(s),source  代表創建OperatorSubscribeOn對象是傳進來的上一個Observable, 這句的源碼如下:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {             return onSubscribe.call(subscriber); }

和上文提到的lift方法中OnSubscribeLift對象的call方法中parent.call(st)作用類似,就是將當前的Observable與上一個Observable通過onSubscribe關聯起來。

至此,我們可以大致了解了subscribeOn的原理,它會在流程a就進行了線程切換,但由于流程a上實際上都是Observable之間串聯關系的代碼,并且是從后面的Observable流向前面的Observable,這帶來的一個隱含意思就是,對于流程b而言,最早的subscribeOn會屏蔽其后面的subscribeOn!  比如:

Observable.just("magic")           .map(file -> doExpensiveWork(file))           .subscribeOn(Schedulers.io())           .subscribeOn(AndroidSchedulers.mainThread())           .subscribe(obj -> doAction(obj)));

這段代碼中無論是doExpensiveWork函數還是doAction函數,都會在io線程出觸發。

observeOn

理解了subscribeOn,那理解observeOn就會更容易一下,observeOn函數最終會轉換到這個函數:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {         return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); }

很明顯,這是做了一次lift操作,我們需要關注OperatorObserveOn這個Operator,查看其call方法:

public Subscriber<? super T> call(Subscriber<? super T> child) {     ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);     parent.init();     return parent; }

這里返回的是一個ObserveOnSubscriber對象,我們關注這個Subscriber的onNext函數,

public void onNext(final T t) {     schedule(); }

它只是簡單的執行了schedule函數,來看下這個schedule:

protected void schedule() {         recursiveScheduler.schedule(this); }

這里亂入的recursiveScheduler.schedule是什么鬼?它并不神奇,它就是ObserveOnSubscriber構造函數傳進來的scheduler創建的worker:

this.recursiveScheduler = scheduler.createWorker();

所以,magic再次產生,observeOn在其onNext中進行了線程的切換,那這個onNext是在什么時候執行的呢?通過上文可知,是在流程b中。所以observeOn會影響其后面的流程,直到出現下一次observeOn或者結束。

周邊技巧

線程模型的選擇

RxJava為我們內置了幾種線程模型,主要區別如下:

  • computation內部是一個線程,線程池的大小cpu核數:Runtime.getRuntime().availableProcessors(),這種線程比較適合做純cpu運算,如求100億以內的斐波那契數列的和之類。

  • newThread每次createWorker都會生成一個新的線程。

  • io與newThread類似,但內部是一個沒有上線的線程池,一般來講,使用io會比newThread好一些,因為其內部的線程池可以重用線程。

  • immediate在當前線程立即執行

  • trampoline

在當前線程執行,與immediate不同的是,它并不會立即執行,而是將其存入隊列,等待當前Scheduler中其它任務執行完畢后執行,這個在我們時常使用的并不多,它主要服務于repeat  ,retry這類特殊的變換操作。

  • from接收一個Executor,允許我們自定義Scheduler。

Scheduler.Worker強勢搶鏡

其實RxJava中的Worker完全可以抽出來為我所用,如下面這種寫法,就是新開線程執行了一個action。

Scheduler.Worker worker = Schedulers. newThread().createWorker(); worker.schedule(new Action0() {             @Override             public void call() {                 throw new RuntimeException("surprise");             }         });

當然,你要選擇合適的時機去關閉(unsubscribe)worker來釋放資源。

自帶光環的操作符

某些操作符是有默認的線程模型的,比如前文提到的repeat 與retry會默認在trampoline線程模型下執行, buffer  ,debounce之類會默認切換到computation。這里不做深入探討,只是當你遇到某些問題時記得,有些人物是自帶裝備與光環的。

“RxJava線程模型是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

亚洲午夜精品一区二区_中文无码日韩欧免_久久香蕉精品视频_欧美主播一区二区三区美女