編輯:關於Android編程
前段時間閱讀了RxJava1.x的源碼,剛好RxJava2.x也發布了RC版,為了迎接10月底的正式版,趁熱打鐵,本篇將對RxJava2.x進行一個簡單的剖析。
在RxJava1.x中,最熟悉的莫過於Observable這個類了,筆者剛使用RxJava2.x時,創建一個Observable時,頓時是懵逼的。因為我們熟悉的Subscriber居然沒影了。取而代之的是ObservableEmitter,俗稱發射器。此外,由於沒有了Subscriber的蹤影,我們創建觀察者時需使用Observer。而Observer也不是我們熟悉的那個Observer,回調的Disposable參數更是讓人摸不到頭腦。
廢話不多說,從會用開始,還記得使用RxJava的三部曲嗎?
第一步:初始化一個Observable
Observableobservable=Observable.create(new ObservableOnSubscribe () { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } });
第二步:初始化一個Observer
Observerobserver= new Observer () { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }
第三部:建立訂閱關系
observable.subscribe(observer); //建立訂閱關系
不難看出,與RxJava1.x還是存在著一些區別的。首先,創建Observable時,回調的是ObservableEmitter,字面意思即發射器,用於發射數據(onNext)和通知(onError/onComplete)。其次,創建的Observer中多了一個回調方法onSubscribe,傳遞參數為Disposable ,Disposable 相當於RxJava1.x中的Subscription,用於解除訂閱。你可能納悶為什麼不像RxJava1.x中訂閱時返回Disposable,而是選擇回調出來呢。官方說是為了設計成Reactive-Streams架構。不過仔細想想這麼一個場景還是很有用的,假設Observer需要在接收到異常數據項時解除訂閱,在RxJava2.x中則非常簡便,如下操作即可。
Observerobserver = new Observer () { private Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(Integer value) { Log.d("JG", value.toString()); if (value > 3) { // >3 時為異常數據,解除訂閱 disposable.dispose(); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } };
此外,RxJava2.x中仍然保留了其他簡化訂閱方法,我們可以根據需求,選擇相應的簡化訂閱。只不過傳入的對象改為了Consumer。`
Disposable disposable = observable.subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { //這裡接收數據項 } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { //這裡接收onError } }, new Action() { @Override public void run() throws Exception { //這裡接收onComplete。 } });
不同於RxJava1.x,RxJava2.x中沒有了一系列的Action/Func接口,取而代之的是與Java8命名類似的函數式接口,如下圖:
其中Action類似於RxJava1.x中的Action0,區別在於Action允許拋出異常。
public interface Action { /** * Runs the action and optionally throws a checked exception * @throws Exception if the implementation wishes to throw a checked exception */ void run() throws Exception; }
而Consumer即消費者,用於接收單個值,BiConsumer則是接收兩個值,Function用於變換對象,Predicate用於判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裡也就不再贅述了。
關於線程切換這點,RxJava1.x和RxJava2.x的實現思路是一樣的。這裡就簡單看下相關源碼。
同RxJava1.x一樣,subscribeOn用於指定subscribe()時所發生的線程,從源碼角度可以看出,內部線程調度是通過ObservableSubscribeOn來實現的。
public final ObservablesubscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn (this, scheduler)); }
ObservableSubscribeOn的核心源碼在subscribeActual方法中,通過代理的方式使用SubscribeOnObserver包裝Observer後,設置Disposable來將subscribe切換到Scheduler線程中
@Override public void subscribeActual(final Observer s) { final SubscribeOnObserverparent = new SubscribeOnObserver (s); s.onSubscribe(parent); //回調Disposable parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設置`Disposable` @Override public void run() { source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中 } })); }
observeOn方法用於指定下游Observer回調發生的線程。
public final ObservableobserveOn(Scheduler scheduler, boolean delayError, int bufferSize) { //.. //驗證安全 return RxJavaPlugins.onAssembly(new ObservableObserveOn (this, scheduler, delayError, bufferSize)); }
主要實現在ObservableObserveOn中的subscribeActual,可以看出,不同於subscribeOn,沒有將suncribe操作全部切換到Scheduler中,而是通過ObserveOnSubscriber與Scheduler配合,通過schedule()達到切換下游Observer回調發生的線程,這一點與RxJava1.x實現幾乎相同。關於ObserveOnSubscriber的源碼這裡不再重復描述了,有興趣的可以查看本人RxJava源碼解讀這篇文章
@Override protected void subscribeActual(Observer observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnSubscriber(observer, w, delayError, bufferSize)); } }
Flowable是RxJava2.x中新增的,專門用於應對背壓(Backpressure)問題。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,比如在Android中常見的點擊事件,點擊過快則經常會造成點擊兩次的效果。其中,Flowable默認隊列大小為128.
通過Flowable我們可以自定義背壓處理策略。
測試Flowable例子如下:
Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureMode.ERROR) //指定背壓處理策略,拋出異常 .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer () { @Override public void accept(Integer integer) throws Exception { Log.d("JG", integer.toString()); Thread.sleep(1000); } }, new Consumer () { @Override public void accept(Throwable throwable) throws Exception { Log.d("JG",throwable.toString()); } });
其中還需要注意的一點在於,Flowable並不是訂閱就開始發送數據,而是需等到執行Subscription#request才能開始發送數據。當然,使用簡化subscribe訂閱方法會默認指定Long.MAX_VALUE,而無需手動指定。手動指定的例子如下:
Flowable.range(1,10).subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE);//設置請求數 } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
不同於RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一個回調方法onSubscribe。
interface SingleObserver{ void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); }
同Single,Completable也被重新設計為Reactive-Streams架構,RxJava1.x的CompletableSubscriber改為CompletableObserver,源碼如下:
interface CompletableObserver{ void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); }
Processor和Subject的作用是相同的。關於Subject部分,RxJava1.x與RxJava2.x在用法上沒有顯著區別,這裡就不介紹了。其中Processor是RxJava2.x新增的,繼承自Flowable,所以支持背壓控制。而Subject則不支持背壓控制。
關於操作符,RxJava1.x與RxJava2.x在命名和行為上大多數保持了一致,具體區別請查閱文檔。
先看看效果圖:ImageLockActivitypackage com.example.imagelock; import com.example.view.NineP
前言相信大家都知道Android滾動控件的實現方式有很多, 使用RecyclerView也比較簡單. 做了一個簡單的年齡滾動控件, 讓我們來看看RecyclerView的
前言:在Android開發中,對於圖片的加載可以說是個老生常談的問題了,圖片加載是一個比較坑的地方,處理不好,會有各種奇怪的問題,比如 加載導致界面卡頓,程序crash。
為了讓界面可以在平板上更好地展示,Android在3.0版本引入了Fragment(碎片)功能。首先需要注意,Fragment是在3.0版本引入的,如果你使用的是3.0之