編輯:關於Android編程
注:原理講解可能會用到rx1.0的概念,但是代碼示例部分用rx2.0 來展示
很多做android開發朋友對rxjava都有熟悉,github上也出現了很多的基於rxjava的開源庫,比如 RxBus RxBinding RxPermission,如果我們了解了RxJava的原理,那麼我們也可以很輕松的通過RxJava來封裝我們自己的庫。後面會有簡單的例子來用RxJava來封裝Animation。
好了,開始我們的正文
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
RxJava是一個實現反應性擴展框架的Java虛擬機:用於通過使用觀察序列構成異步和基於事件的程序庫。
擴展了觀察者模式,以支持數據/事件序列,並增加了操作符,他可以將將序列清晰的組合在一起的。這些序列組合可以是抽象出來的某些數據/事件,如低級別的線程,同步,線程安全和並發數據結構。
概括的的文字剛開始一般是看不懂的,簡單來說RxJava就是一個實現異步操作的庫。
官方的概括中提到了擴展的觀察者模式,那麼我們先從此入手
在Java中通過Observable類和Observer接口實現了觀察者模式。一個Observer對象監視著一個Observable對象的變化,當Observable對象發生變化時,Observer得到通知,就可以進行相應的工作。
這裡Observable(被觀察者)對象的變化是采用注冊(Register)或者稱為訂閱(Subscribe)的方式告訴Observer(觀察者)。
RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、
Observer (觀察者)、
subscribe (訂閱)、事件。
Observable 和
Observer 通過
subscribe() 方法實現訂閱關系,從而
Observable 可以在需要的時候發出事件來通知
Observer。
與傳統觀察者模式不同,
RxJava 的事件回調方法除了普通事件
onNext() (相當於
onClick() /
onEvent())之外,還定義了兩個特殊的事件:
onCompleted() 和
onError()。
onCompleted(): 事件隊列完結。
RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。
RxJava 規定,當不會再有新的
onNext() 發出時,需要觸發
onCompleted() 方法作為標志。
onError(): 事件隊列異常。在事件處理過程中出異常時,
onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。
在一個正確運行的事件序列中, onCompleted() 和
onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,
onCompleted() 和
onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
備:
RxJava2.0還添加了一個新的回調方法:
onSubscribe(),這是為了解決
RxJava1.0的
backpressure問題,後面會講到
RxJava觀察者模式的圖如下
RxJava的基本實現
因為
RxJava2.0引入了很多新的接口,我們在講原理的時候,直接拿著2.0的代碼來做示例,但是有些得放用2.0不太好理解,我們就用1.0的代碼來理解原理吧
創建Subscriber(2.0)/Observer(2.0)
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Logger.i("hello onSubscribe");
}
@Override
public void onNext(String s) {
Logger.i("hello onNext-->" + s);
}
@Override
public void onError(Throwable t) {
Logger.i("hello onError");
}
@Override
public void onComplete() {
Logger.i("hello onComplete");
}
};
Observer observer = new Observer() {
@Override
public void onSubscribe(Disposable d) {
Logger.i("hello onSubscribe");
}
@Override
public void onNext(String value) {
Logger.i("hello onNext-->" + value);
}
@Override
public void onError(Throwable e) {
Logger.i("hello onError");
}
@Override
public void onComplete() {
Logger.i("hello onComplete");
}
};
Subscriber 和 Observer的接口是分別獨立的,Obsesrver用於訂閱Observable,而Subscriber用於訂閱Flowable
創建Flowable(2.0)/Observable(2.0)
Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用
create()方法來創建一個
Observable,並為它定義事件觸發規則
Flowable stringFlowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter e) throws Exception {
Logger.i("---rxHelloFlowable---");
}
}, FlowableEmitter.BackpressureMode.BUFFER);
Observable stringObservable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("Hello");
e.onNext("Inke");
e.onComplete();
}
});
可以看到,這裡傳入了一個
ObservableOnSubscribe對象作為參數,它的作用相當於一個計劃表,當
Observable被訂閱的時候,
ObservableOnSubscribe的
subscribe()方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者
Subscriber 將會被調用兩次
onNext()和一次
onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
RxJava提供快捷創建事件隊列的方法
just()將傳入的參數依次發送出來 fromIterable() 將傳入的Iterable 拆分成具體對象後,依次發送出來 fromArray() 還沒研究明白
心細的朋友可以看到
Flowable在
create()的時候多了一個參數
BackpressureMode,是用來處理backpressure的發射器
一共有以下幾種模式
enum BackpressureMode {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
*
Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ NONE, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers all onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST }
個人感覺BUFFER較為安全,api解釋為緩沖器存有onNext值,直到下游消費它。
因為
Observer不支持 backpressure,所以後面的代碼我們默認使用RxJava2.0的
Flowable和
Subscriber,但是為了便於理解,某些原理可能還會用RxJava1.0。
Subscribe (訂閱)
創建了 Flowable和 Subscriber 之後,再用
subscribe()方法將它們聯結起來,整條鏈子就可以工作了。代碼形式很簡單:
stringFlowable.subscribe(subscriber);
有人可能會注意到,
subscribe()這個方法有點怪:它看起來是『
observalbe訂閱了
observer/
subscriber』而不是『
observer /
subscriber 訂閱了
observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關系。這讓人讀起來有點別扭,不過如果把 API 設計成
observer.subscribe(observable) /
subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。
@Override
public final void subscribe(Subscriber s) {
ObjectHelper.requireNonNull(s, "s is null");
s = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
subscribeActual(s);
}
/**注意:這不是 subscribe()的源碼,而是將源碼中與性能、兼容性、擴性有關的代碼剔除後的核心代碼。
*如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
*/
public Disposable onSubscribe(Subscriber subscriber) {
subscriber.onSubscribe();
flowableOnSubscribe.subscribe();
return subscriber;
}
訂閱過程做了三件事
調用 Subscriber.onSubscribe()。是Rx2.0新添加的方法,第一個執行 調用 FlowableOnSubscribe中的subscribe() 。在這裡,事件發送的邏輯開始運行。從這也可以看出,在 RxJava 中,Flowable並不是在創建的時候就立即開始發送事件,而是在它被訂閱的時候,即當 subscribe()方法執行的時候。
上面我們可以看到,通過
subscriber來訂閱返回的是void
在RxJava2.0 如果是直接訂閱或傳入消費者那麼會產生一個新的類
那就是Disposable
/**
* Represents a disposable resource.
*/
代表一個一次性的資源。
代碼如下
Disposable subscribe = stringFlowable.subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
}
});
訂閱源碼如下
public final Disposable subscribe(Consumer onNext, Consumer onError,
Action onComplete, Consumer onSubscribe) {
LambdaSubscriber ls = new LambdaSubscriber(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
不過最終走的還是上面的邏輯,不過多返回了一個Disposable,
用於dispose();
線程控制
Scheduler
以下API來自RxJava1.0, 與RxJava2.0用法無區別
在RxJava 中,Scheduler ——調度器,相當於線程控制器,RxJava 通過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:
Schedulers.immediate(): 直接在當前線程運行,相當於不指定線程。這是默認的 Scheduler。 Schedulers.newThread(): 總是啟用新線程,並在新線程執行操作。
-Schedulers.io(): I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 new Thread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閒的線程,因此多數情況下 io() 比new Thread()更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。 Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在computation()中,否則 I/O 操作的等待時間會浪費 CPU。 另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
有了這幾個 Scheduler ,就可以使用
subscribeOn()和
observeOn()兩個方法來對線程進行控制了。 *
subscribeOn(): 指定
subscribe()所發生的線程,即
Observable.OnSubscribe被激活時所處的線程。或者叫做事件產生的線程。 *
observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。
下面是一個獲取本地資源並顯示在控件上的例子
private void rxSchedulerMap() {
Flowable bitmapFlowable = Flowable.just(R.drawable.effect_icon001)
.subscribeOn(Schedulers.io())
.map(new Function() {
@Override
public Bitmap apply(Integer integer) throws Exception {
Logger.i("這是在io線程做的bitmap繪制圓形");
return BitmapUtils.createCircleImage(BitmapFactory.decodeResource(MainActivity.this.getResources(), integer));
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer() {
@Override
public void accept(Bitmap bitmap) throws Exception {
Logger.i("這是在main線程做的UI操作");
imageView.setImageBitmap(bitmap);
}
});
bitmapFlowable.subscribe();
}
想必大家已經看得很清楚了
獲取drawable資源我用的io線程
通過 subscribeOn(Schedulers.io())控制
轉變成bitmap並繪制成圓形也是在io線程,可以通過observeOn(Schedulers.io())也可以順著之前的流繼續執行
最後顯示在UI上是通過observeOn(AndroidSchedulers.mainThread())。
subscribeOn(Scheduler.io())和
observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用於多數的 『後台線程取數據,主線程顯示』的程序策略。
轉換和Scheduler的原理
大家參考扔物線大神的文章吧,我沒必要再贅述一遍
常用操作符
在上一個章節
我很還通過just直接快捷的生成了Flowable
我們還通過將drwable通過map操作符轉換成了 bitmap進以下一流的操作
這些操作符使整個邏輯流程很美 很漂亮 很sexy~~~
比那些蜜汁縮進美了不是一點半點, 我們下面來個比較復雜的例子,大家對比一下(用的RxJava1.0,意思一下)
//-----------------------------------蜜汁縮進--嵌套循環--回調地獄 -----------------------------------------------------------
/**
* 實現的功能:獲取assets文件夾下所有文件夾中的jpg圖片,並且將所有的圖片畫到一個ImageView上,沒有實際的用處,只是為了說明問題--- 謎之縮進--嵌套循環--回調地獄
* 不使用RxJava的寫法-- 謎之縮進--回調地獄
*/
//思路:需要以下6個步驟完成
//1:遍歷獲取assets文件夾下所有的文件夾的名稱
//2:遍歷獲取獲取assets文件夾下某個文件夾中所有圖片路徑的集合
//3:過濾掉非JPG格式的圖片
//4:獲取某個路徑下圖片的bitmap
//5:將Bitmap繪制到畫布上
//6:循環結束後更新UI,給ImageView設置最後繪制完成後的Bitmap,隱藏ProgressBar
private void miZhiSuoJinAndNestedLoopAndCallbackHell() {
new Thread(new Runnable() {
@Override
public void run() {
runOnUiThread(new Runnable() {
@Override
public void run() {
mProgressBar.setVisibility(View.VISIBLE);
}
});
//1:遍歷獲取assets文件夾下所有的文件夾的名稱
ArrayList assetsFolderNameList = ImageNameFactory.getAssetImageFolderName();
for (String folderName : assetsFolderNameList) {
//2:遍歷獲取獲取assets文件夾下某個文件夾中所有圖片路徑的集合
ArrayList imagePathList = ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName);
for (final String imagePathName : imagePathList) {
//3:過濾掉非JPG格式的圖片
if (imagePathName.endsWith(JPG)) {
//4:獲取某個路徑下圖片的bitmap
final Bitmap bitmap = ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
runOnUiThread(new Runnable() {
@Override
public void run() {
//Logger.d(mCounter + ":" + imagePathName);
//5:將Bitmap繪制到畫布上
createSingleImageFromMultipleImages(bitmap, mCounter);
mCounter++;
}
});
}
}
}
//6:循環結束後更新UI,給ImageView設置最後繪制完成後的Bitmap,隱藏ProgressBar
runOnUiThread(new Runnable() {
@Override
public void run() {
mImageView.setImageBitmap(mManyBitmapSuperposition);
mProgressBar.setVisibility(View.GONE);
}
});
}
}).start();
}
//-----------------------------------RxJava的實現--鏈式調用--十分簡潔 -----------------------------------------------------------
private void rxJavaSolveMiZhiSuoJinAndNestedLoopAndCallbackHell() {
//1:被觀察者:
//2:數據轉換
//3:設置事件的產生發生在IO線程
//4:設置事件的消費發生在主線程
//5:觀察者
//6:訂閱:被觀察者被觀察者訂閱
mGoToRecycleImageView = false;
Observable.from(ImageNameFactory.getAssetImageFolderName())
//assets下一個文件夾的名稱,assets下一個文件夾中一張圖片的路徑
.flatMap(new Func1>() {
@Override
public Observable call(String folderName) {
return Observable.from(ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName));
}
})
//過濾,篩選出jpg圖片
.filter(new Func1() {
@Override
public Boolean call(String imagePathNameAll) {
return imagePathNameAll.endsWith(JPG);
}
})
//將圖片路徑轉換為對應圖片的Bitmap
.map(new Func1() {
@Override
public Bitmap call(String imagePathName) {
return ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
}
})
.map(new Func1() {
@Override
public Void call(Bitmap bitmap) {
createSingleImageFromMultipleImages(bitmap, mCounter);
mCounter++;
return null;
}
})
.subscribeOn(Schedulers.io())//設置事件的產生發生在IO線程
.doOnSubscribe(new Action0() {
@Override
public void call() {
mProgressBar.setVisibility(View.VISIBLE);
}
})
.observeOn(AndroidSchedulers.mainThread())//設置事件的消費發生在主線程
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
mImageView.setImageBitmap(mManyBitmapSuperposition);
mProgressBar.setVisibility(View.GONE);
}
@Override
public void onError(Throwable e) {
//Toast.makeText(MainActivity.this, ""+e.getMessage(), Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Void aVoid) {
}
});
}
操作符部分一覽 (基於Rxjava1.0)
Combining Obsercables(Observable的組合操作符)
操作符
功能
combineLatest
兩個Observable產生的結果合並成新Observable,任意Observable產生的結果和另一個Observable最後產生的結果按規則合並
join
like combineLatest 能控制每個Observable產生結果的生命周期,在每個結果的生命周期內,與另一個Observable產生的結果按規則合並
groupJoin
like join 暫不知其他區別
==merge==
==按照兩個Observable的提交結果的時間順序,對Observable合並。時間按某Observable完成的最小時間==
mergeDelayError
合並的某一個Observable中出現錯誤,把錯誤放到所有結果都合並完成之後,訂閱者回調執行onError。而merge會馬上停止合並
startWith
源Observable提交結果之前,插入指定數據
switchOnNext
把一組Observable轉換成Observable。這組Observable中取最後一個Observable提交的結果給訂閱者。
==zip==
==把兩個Observable提交的結果按照順序進行合並。==
Error Handing Operators(Observable的錯誤處理操作符)
操作符
功能
onErrorReturn
在Observable 發生錯誤或異常(即將回調onError)時,攔截錯誤並執行指定的邏輯,返回一個跟源Observable相同類型的結果,最後回調訂閱者的onComplete方法
onErrorResumeNext
like onErrorReturn 不同的是返回一個Observable 例:return Observable.just(5,2,0);
onExceptionResumeNext
like onErrorResumeNext 不同的是只有在exception的時候觸發
==retry==
==當Observable發生錯誤或異常,重新執行Observable的邏輯,如果經過n次重新執行後仍然出現錯誤或異常,則最後回調onError方法,若無錯誤或異常則按正常流程執行 例:observable.retry(2).subscribe();==
retryWhen
like retry 不同在於retryWhen是在源Observable出現錯誤或異常時,通過回調第二個Observable來判斷是否重新嘗試執行源Observable的邏輯;若第二個Observable沒錯誤或異常,則會重新嘗試執行源Observable的邏輯,否則就會直接回調執行訂閱者的onError();
其他常用
操作符
功能
map
對源Observable數據的加工處理,返回其他類型 例:return 520+”string data”;
flatMap
like map 不同的是返回一個Observable 擴展:使用了merge操作符 例:return Observable.from(…);
concatMap
like concatMap 不同的是concatMap操作遵循元素的順序 擴展:使用了concat操作符
compose
唯一一個能從流中獲取原生Observable的方法,因此,影響整個流的操作符(subscribeOn()和observeOn())需要用compose()。當你創建一個Observable流並且內聯了一堆操作符以後,compose()會立即執行,所以compose轉換的是整個流
compose與flagMap的區別
flatMap()一定是低效率的,因為他每次調用onNext()之後都需要創建一個新的Observable,compose()是操作在整個流上的
concat
按順序依次連接兩個或更多的 Observable
first
從序列中取第一個先完成的項
takeFirst
like first 區別是first()如果沒有釋放有效的數據源,那麼會throw NoSuchElementException;而takeFirst會complete沒有 exception
常用場景
我們前面已經大致理解RxJava和他的基本使用了,雖然是冰山一角,但夠我們入門了,現在我們來通過實際項目中常用的場景來進階學習。
因為RxJava2.0 是16年八九月份剛更新的,沒有時間來將1.0的代碼替換過來,但是主要的使用方法還是沒變的,所以下面的代碼大部分是基於RxJava1.0,看客請見諒
RxJava實現三級緩存(RxJava 1.0)
創建三個緩存的Observable對象
Observable memory = ...;
Observable disk = ...;
Observable network = ...;
獲取第一個源的數據
Observable source = Observable
.concat(memory, disk, network)
.first(new Func1() {//如果對象為空、說明沒有數據從下一層找
public Boolean call(Data data) {
return data!=null;
}
});
concat()訂閱了所有需要的Observable。
通過first()會因為取到數據後會停止序列
也就是說,如果memory返回了一個結果,那麼我們不會打擾disk 和 network
我們從網絡獲取到數據,記得存起來。
Observable networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});
RxJava實現心跳(RxJava 2.0)
private Disposable intervalInterval;//心跳
private void rxInterval() {
intervalInterval = Flowable.interval(1, TimeUnit.SECONDS)
.doOnNext(new Consumer() {
@Override
public void accept(Long aLong) throws Exception {
Logger.i("rxInterval---" + aLong);
}
})
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer() {
@Override
public void accept(Long aLong) throws Exception {
Logger.i("rxInterval---txt.setText---" + aLong);
txt.setText("----心跳---" + aLong);
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Consumer() {
@Override
public void accept(Long aLong) throws Exception {
}
});
}
/**
* 停止心跳
* @param v
*/
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.btn:
if (intervalInterval != null) {
intervalInterval.dispose();
}
break;
}
}
@Override
protected void onDestroy() {
super.onDestroy();
if (intervalInterval != null) {
intervalInterval.dispose();
}
}
遍歷集合
Flowable.just(new ArrayList())
.doOnNext(new Consumer>() {
@Override
public void accept(ArrayList stringEntities) throws Exception {
for (int i = 0; i < 10; i++) {
stringEntities.add(new StringEntity("rxFromFilter--" + i, i));
}
}
})
.flatMap(new Function, Publisher>() {
@Override
public Publisher apply(ArrayList stringEntities) throws Exception {
return handleList(stringEntities);
}
})
.subscribe(new Subscriber
並發任務(RxJava 1.0)
/**
* 兩個耗時任務一起執行
*/
private static Observable createLivePlayerRoomPageOrDonePageObservable(final Context context, final int roomId, final String url) {
//獲取網絡資源的Observable
Observable rspLiveInfoObservable = LiveNetManager.liveInfo(roomId, null, false);
//獲取圖片高斯模糊的Observable
Observable glideBitmapDrawableObservable = RxGlide.afterGlideRequestListener(Global.getContext(), ImageWorker.buildBlurBitmapRequest(context, url));
return Observable.zip(rspLiveInfoObservable, glideBitmapDrawableObservable,
new Func2() {
@Override
public Intent call(RspLiveInfo rspLiveInfo, GlideBitmapDrawable glideBitmapDrawable) {
})
.observeOn(AndroidSchedulers.mainThread());
}
1.android 的UI線程阻超過5秒就會引發ANR(Application not responding)異常,如果等待超過3秒,你就會失去用戶。2.在android
項目簡介 該項目主要是使用SSH開發Android後端服務器程序和前端App代碼的實現,主要技術包含: Android AsyncTask 、常見自定義控件、客戶端高層類
最近做項目,有個一個需求,就是圓角進度條。效果圖如下。當時項目時間很緊,沒多去想怎麼實現最佳,就直接把美工給的圓角進度條裁剪成了四份。來做 Canvas 剪切繪制。這樣雖
新建項目前面,已經介紹了,系統相關配置,接下來就可以開始創建項目了。選擇新建—>構建一個自由風格的軟件項目,然後填寫項目名稱。項目如下:注意:項目配置源