編輯:關於Android編程
自從項目中使用RxJava以來,可以很方便的切換線程。至於是怎麼實現的,一直沒有深入的研究過!本篇文章就是分析RxJava的線程模型。
RxJava基本使用
先上一個平時使用RxJava切換線程的例子:
Observable observable = Observable.create(new Observable.OnSubscribe() { @Override public void call(Subscriber subscriber) { subscriber.onNext("hello"); subscriber.onCompleted(); } }); Subscriber subscriber = new Subscriber () { @Override public void onCompleted() { //TODO } @Override public void onError(Throwable e) { //TODO } @Override public void onNext(String o) { Log.e("RxJava", o); } }; observable .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
如果結合Lamda表達式,代碼會非常的簡潔,為了更直觀的分析代碼,就沒有使用Lamda。通過subscribeOn(Schedulers.io()),
observeOn(AndroidSchedulers.mainThread())輕松的實現線程的切換。非常的簡單。下面一步一步分析:
RxJava主要類
如果對RxJava有了解的話,都知道它實際是用觀察者模式實現的,我們這裡只是簡單的介紹一下主要的類。
Observable和Subscriber大家已經很熟悉了,分別是被觀察者和觀察者。在使用RxJava過程中我們一般都是三步,第一步創建Observable,第二部創建Subscriber,第三步通過subscribe()方法達到訂閱的目的。為了搞清楚線程切換的實現,必須先搞清楚RxJava內部調用的流程!
在上邊代碼中,Observable.create()對應於第一步,會產生一個Observable。那麼是怎麼創建的呢?而create()方法的源碼:
public staticObservable create(OnSubscribe f) { return new Observable (hook.onCreate(f)); }
create()會接收一個OnSubscribe實例,OnSubscribe繼承自Action1,有一個call()回調方法。在subscribe()方法執行時會被調用。這裡的hook.onCreate()其實其他的什麼也沒有做就是你傳進去什麼就返回什麼。最後可以看到產生了一個Observable的實例。
接著看一下subscribe(subscriber) , subscribe()方法會產生訂閱行為,接收一個Subscriber實例,現在有了被觀察者和觀察者,那麼我們就可以分析訂閱行為了。看subscribe()方法的最終實現:
staticSubscription subscribe(Subscriber subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber (subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaPluginUtils.handleException(hook.onSubscribeError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. hook.onSubscribeError(r); // TODO why aren't we throwing the hook's return value. throw r; } } return Subscriptions.unsubscribed(); } }
一般情況下會執行第31行,
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
hook.onSubscribeStart()什麼也沒有做,將observable.onSubscribe直接返回,那麼這句話就可以簡化成
observable.onSubscribe.call(subscriber);
如果我們沒有使用任何的操作符,那麼這裡的observable.onSubscribe就是我們在create()方法中傳入的onSubscribe實例。就會回調其call( subscriber)方法。在call()方法中,我們一般又會根據獲得的subscriber引用,去調用相應的onNext()和onComplete()方法。這就是調用的基本流程!
如果我們使用了例如map這樣的操作符,那麼基本的流程大致是一樣的,只不過是將Observable實例進行相應的變化後,向下傳遞。最終執行subscribe操作的是最後一個Observable,所以,每個變換後的Observable都會持有上一個Observable 中OnSubscribe對象的引用。
目前還是沒有分析subscribeOn()和observeOn(),接下來就看看他們內部怎麼實現的吧!
subscribeOn()源碼:
public final ObservablesubscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable )this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn (this, scheduler)); }
正如前邊所說,這裡會產生新的Observable,並且持有上一個Observable的OnSubscribe(我們的例子中就是我們在create()方法中傳入的)的引用。我們繼續看OperatorSubscribeOn這個類:
public final class OperatorSubscribeOnimplements OnSubscribe { final Scheduler scheduler; final Observable source; public OperatorSubscribeOn(Observable source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber s = new Subscriber (subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); } }
該類實現了OnSubscribe接口,並且實現了call()方法,也就意味著,我們之前所分析的在subscribe()時,會一步一步去執行observable.onSubscribe.call(),對於SubscribeOn來說他的call()方法的真正實現就是在這裡。那麼還要注意一點的是這個call()的回調時機,這裡要和ObserveOn()做比較,稍後再分析。回到這裡的call方法,首先通過外部傳入的scheduler創建Worker - inner對象,接著在inner中執行了一段代碼,Action0中call()方法這段代碼就在worker線程中執行了,也就是此刻線程進行了切換。
注意最後一句代碼source.unsafeSubscribe(s)就是將當前的Observable與上一個Observable通過onSubscribe關聯起來。那麼如果上一個Observable也是一個subscribeOn()產生的那麼會出現什麼情況?很顯然最終會切換到上一個subscribeOn指定的線程中。例如:
btn_map.setOnClickListener(v -> getObservable() .map(integer -> null) .subscribeOn(Schedulers.computation()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(integer1 -> Log.e("map", integer1 + "")));
map的轉換實際上會發生在computation線程而不是io線程,換句話說就是設置多個subscribeOn時,實際上只會切換到第一個subscribeOn指定的線程。這一點很重要!!!
到現在我們已經分析了subscribe()訂閱後,一直到回調到我們在create()方法中傳入的OnSubscribe的call()方法,subscribeOn()方法是在這個過程中產生作用的。那麼ObserveOn呢?看源碼:
public final ObservableobserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable )this).scalarScheduleOn(scheduler); } return lift(new OperatorObserveOn (scheduler, delayError, bufferSize)); }
這裡經過了一次lift變化,這是個啥玩意呢?
public finalObservable lift(final Operator operator) { return new Observable (new OnSubscribeLift (onSubscribe, operator)); }
實際上也是包裝了一層Observable。明白了這一點,在回到observeOn()源碼中的OperatorObserveOn。
public final class OperatorObserveOnimplements Operator { ...省略... /** * @param scheduler the scheduler to use * @param delayError delay errors until all normal events are emitted in the other thread? * @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0 */ public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; } @Override public Subscriber call(Subscriber child) { if (scheduler instanceof ImmediateScheduler) { // avoid overhead, execute directly return child; } else if (scheduler instanceof TrampolineScheduler) { // avoid overhead, execute directly return child; } else { ObserveOnSubscriber parent = new ObserveOnSubscriber (scheduler, child, delayError, bufferSize); parent.init(); return parent; } } ...省略... /** Observe through individual queue per observer. */ private static final class ObserveOnSubscriber extends Subscriber implements Action0 { ...省略... @Override public void onNext(final T t) { if (isUnsubscribed() || finished) { return; } if (!queue.offer(on.next(t))) { onError(new MissingBackpressureException()); return; } schedule(); } @Override public void onCompleted() { if (isUnsubscribed() || finished) { return; } finished = true; schedule(); } @Override public void onError(final Throwable e) { if (isUnsubscribed() || finished) { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); return; } error = e; finished = true; schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } ...省略... } }
安裝前文的分析,先看他的call方法,返回了個ObserveOnSubscriber實例,我們需要關注這個實例的onNext方法,很簡單,只是執行了schedule()方法,該方法中的recursiveScheduler是在構造方法中根據我們設置的schedule創建的Scheduler.Worker 。
this.recursiveScheduler = scheduler.createWorker();
線程再次切換了,並且這次是在OnNext()方法中切換的,注意是在OnNext()方法中切換,和subscribeOn()在call中切換是有區別的。這樣observeOn設置的線程會影響其後面的流程,直到出現下一次observeOn或者結束。
這樣最基本的線程切換已經搞清楚了,現在我們來分析一下加入例如map這樣的操作符的過程:
observable .map(str->str+"Rx") .map(str1->str1+"Java") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
根據上邊的分析,上一張流程圖(不要在意,圖很挫):
總的來說,RxJava的處理順序像一條流水線,這不僅僅是代碼寫起來像一條鏈上,邏輯上也是如此,也就是說,當你切換流水的流向(線程),整條鏈都改變了方向,並不會進行分流。理解了這一點,對RxJava的線程切換也就不會感到困難了。
源碼與示例apkhttps://github.com/dengyuhan/android-adapters 快速開始Android Studio - 在buil
小米手環有一個免密碼支付功能,作為忠實米粉小米手環免密碼支付怎麼用都不會有點說不過去,那麼小米手環免密碼支付怎麼設置?小米手環免密碼支付怎麼用?下面為大家帶
轉帖請注明本文出自xiaanming的博客(http://blog.csdn.net/xiaanming/article/details/17483273),請尊重他人的
前言:github對開發者而言無疑是個寶藏,但想利用它可不是件簡單的事,用Android studio導入開源項目會遇到各種問題,今天我就以github上的一個圖片輪播項