Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> RxJava菜鳥驿站(一)

RxJava菜鳥驿站(一)

編輯:關於Android編程

前言

終究沒有經受住RxJava的誘惑,只恨自己來的比較晚,走起~

RxJava 是什麼?

一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫~

Rx Java 有什麼優勢?

邏輯簡潔 異步

RxJava 能做什麼?

EventBus事件中心 與Retrofit結合進行網絡處理 RxBinding
….

初始RxJava

我們先來看一下這坨代碼:

    new Thread() {
        @Override
        public void run() {
            super.run();
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        getActivity().runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                imageCollectorView.addImage(bitmap);
                            }
                        });
                    }
                }
            }
        }}.start();

上面就是一個圖片的遍歷、過濾、加載過程,可是有強迫症的程序員都會很難過,因為又看到了影響心情的代碼。

用RxJava如何實現呢?

    Observable.from(folders)
        .flatMap(new Func1>() {
            @Override
            public Observable call(File file) {
                return Observable.from(file.listFiles());
            }
        })
        .filter(new Func1() {
            @Override
            public Boolean call(File file) {
                return file.getName().endsWith(".png");
            }
        })
        .map(new Func1() {
            @Override
            public Bitmap call(File file) {
                return getBitmapFromFile(file);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1() {
            @Override
            public void call(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }
        });

吾皇勿怒,臣妾知道代碼並沒有減少,可是臣妾說的是邏輯簡單,鏈式結構非常清晰不是嗎?

相信您在頓悟了5s中之後也看到了其好處~

使用介紹與原理分析

基本概念

1、擴展的觀察者模式
RxJava 的異步實現,是通過一種擴展的觀察者模式來實現的,A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應。

2、RxJava的四個基本概念:

Observable (可觀察者,即被觀察者) Observer (觀察者) subscribe (訂閱) 事件 : onNext()、onCompleted() 和 onError()

事件

onNext(): 普通事件,每個事件執行之後的event。

onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規定,當不會再有新的onNext() 發出時,需要觸發 onCompleted() 方法作為標志。

onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發,同時隊列自動終止,不允許再有事件發出。

注意 :

在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,並且是事件序列中的最後一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。

RxJava觀察者模式

基本實現

基於以上的概念, RxJava 的基本實現主要有三點:

1) 創建 Observer

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行為。 RxJava 中的 Observer 接口的實現方式:

Observer observer = new Observer() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }};

除了 Observer 接口之外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:

Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }};

不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區別對於使用者來說主要有兩點:

onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發送之前被調用,可以用於做一些准備工作,例如數據的清零或重置。這是一個可選方法,默認情況下它的實現為空。需要注意的是,如果對准備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,因為它總是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來做准備工作,可以使用 doOnSubscribe() 方法,具體可以在後面的文中看到。

unsubscribe(): 這是 Subscriber 所實現的另一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe() 這個方法很重要,因為在 subscribe() 之後, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內存洩露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用unsubscribe() 來解除引用關系,以避免內存洩露的發生。

2) 創建 Observable

Observable 即被觀察者,它決定什麼時候觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來創建一個 Observable ,並為它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }});

可以看到,這裡傳入了一個 OnSubscribe 對象作為參數。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當於一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個例子很簡單:事件的內容是字符串,而不是一些復雜的對象;事件的內容是已經定好了的,而不像有的觀察者模式一樣是待確定的(例如網絡請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發送出去,而不是夾雜一些確定或不確定的時間間隔或者經過某種觸發器來觸發的。總之,這個例子看起來毫無實用價值。但這是為了便於說明,實質上只要你想,各種各樣的事件發送規則你都可以自己來寫。至於具體怎麼做,後面都會講到,但現在不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。

create() 方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷創建事件隊列,例如:
just(T…): 將傳入的參數依次發送出來。

Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();

from(T[]) / from(Iterable

線程控制 —— Scheduler基礎

在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產事件;在哪個線程生產事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調度器)。

1) Scheduler 基礎的 API

在RxJava 中,Scheduler ——調度器,相當於線程控制器,RxJava 通過它來指定每一段代碼應該運行在什麼樣的線程。RxJava 已經內置了幾個 Scheduler ,它們已經適合大多數的使用場景:

Schedulers.immediate(): 直接在當前線程運行,相當於不指定線程。這是默認的 Scheduler。 Schedulers.newThread(): 總是啟用新線程,並在新線程執行操作。 Schedulers.io(): I/O 操作(讀寫文件、讀寫數據庫、網絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區別在於 io() 的內部實現是是用一個無數量上限的線程池,可以重用空閒的線程,因此多數情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創建不必要的線程。

Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。

另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。

有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。

subscribeOn(): 指定 subscribe() 所發生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產生的線程。 observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。

文字敘述總歸難理解,上代碼:

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
    .subscribe(new Action1() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });

上面這段代碼中,由於 subscribeOn(Schedulers.io()) 的指定,被創建的事件的內容 1、2、3、4 將會在IO線程發出; 而由於observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數字的打印將發生在主線程 。事實上,這種在subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用於多數的 『後台線程取數據,主線程顯示』的程序策略。

而前面提到的由圖片 id 取得圖片並顯示的例子,如果也加上這兩句:

int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        Drawable drawable = getTheme().getDrawable(drawableRes));
        subscriber.onNext(drawable);
        subscriber.onCompleted();
    }
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發生在主線程
.subscribe(new Observer() {
    @Override
    public void onNext(Drawable drawable) {
        imageView.setImageDrawable(drawable);
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
        Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
    }
});

那麼,加載圖片將會發生在 IO 線程,而設置圖片則被設定在了主線程。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓。

2) Scheduler 的原理基礎

RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎麼做到的?而且 subscribe() 不是最外層直接調用的方法嗎,它竟然也能被指定線程?)。然而 Scheduler 的原理需要放在後面講,因為它的原理是以下一節《變換》的原理作為基礎的。

好吧這一節其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。

變換

RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。

1) API

首先看一個 map() 的例子:

Observable.just("images/logo.png") // 輸入類型 String
    .map(new Func1() {
        @Override
        public Bitmap call(String filePath) { // 參數類型 String
            return getBitmapFromPath(filePath); // 返回類型 Bitmap
        }
    })
    .subscribe(new Action1() {
        @Override
        public void call(Bitmap bitmap) { // 參數類型 Bitmap
            showBitmap(bitmap);
        }
    });

這裡出現了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。 Func1 和Action 的區別在於, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用於不同參數個數的方法。FuncX和 ActionX 的區別在 FuncX 包裝的是有返回值的方法。

可以看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在經過 map() 方法後,事件的參數類型也由 String轉為了 Bitmap。這種直接變換對象並返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:

map(): 事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最常用的變換。
map() 示意圖:
map 示意圖vcq9uty88rWlo7o8L2NvZGU+PC9jb2RlPjwvY29kZT48L2NvZGU+PC9jb2RlPjwvY29kZT48L2NvZGU+PC9wPg0KPHByZSBjbGFzcz0="brush:java;"> Student[] students = ...; Subscriber subscriber = new Subscriber() { @Override public void onNext(String name) { Log.d(tag, name); } ... }; Observable.from(students) .map(new Func1() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber);

很簡單。那麼再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區別在於,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現:

Student[] students = ...;
Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(Student student) {
        List courses = student.getCourses();
        for (int i = 0; i < courses.size(); i++) {
            Course course = courses.get(i);
            Log.d(tag, course.getName());
        }
    }
    ...
};
Observable.from(students)
    .subscribe(subscriber);

依然很簡單。那麼如果我不想在 Subscriber 中使用 for 循環,而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對於代碼復用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現在的要求是一對多的轉化。那怎麼才能把一個 Student 轉化成多個 Course 呢?

這個時候,就需要用 flatMap() 了:

Student[] students = ...;
Subscriber subscriber = new Subscriber() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1>() {
        @Override
        public Observable call(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);

從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數轉化之後返回另一個對象。但需要注意,和 map()不同的是, flatMap() 中返回的是個 Observable 對象,並且這個 Observable 對象並不是被直接發送到了 Subscriber 的回調方法中。
flatMap() 的原理是這樣的:
1. 使用傳入的事件對象創建一個 Observable 對象;
2. 並不發送這個 Observable, 而是將它激活,於是它開始發送事件;
3. 每一個創建出來的 Observable 發送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創建的 Observable 將初始的對象『鋪平』之後通過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。

flatMap() 示意圖:

flatMap 示意圖

擴展:由於可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用於嵌套的異步操作,例如嵌套的網絡請求。示例代碼(Retrofit + RxJava):

networkClient.token() // 返回 Observable,在訂閱時請求 token,並在響應後發送 token
    .flatMap(new Func1>() {
        @Override
        public Observable call(String token) {
            // 返回 Observable,在訂閱時請求消息列表,並在響應後發送請求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1() {
        @Override
        public void call(Messages messages) {
            // 處理顯示消息列表
            showMessages(messages);
        }
    });

傳統的嵌套請求需要使用嵌套的 Callback 來實現。而通過 flatMap() ,可以把嵌套的請求寫在一條鏈中,從而保持程序邏輯的清晰。

throttleFirst(): 在每次事件觸發後的一定時間間隔內丟棄新的事件。常用作去抖動過濾,例如按鈕的點擊監聽器:

RxView.clickEvents(button) // RxBinding 代碼,後面的文章有解釋
    .throttleFirst(500, TimeUnit.MILLISECONDS) // 設置防抖間隔為 500ms
    .subscribe(subscriber);

媽媽再也不怕我的用戶手抖點開兩個重復的界面啦。

此外, RxJava 還提供很多便捷的方法來實現事件序列的變換,這裡就不一一舉例了。

2) 變換的原理:lift()

這些變換雖然功能各有不同,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的變換方法:lift(Operator)。首先看一下 lift() 的內部實現(僅核心代碼):

// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public  Observable lift(Operator operator) {
    return Observable.create(new OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

這段代碼很有意思:它生成了一個新的 Observable 並返回,而且創建新 Observable 所用的參數 OnSubscribe 的回調方法 call() 中的實現竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們並不一樣喲~不一樣的地方關鍵就在於第二行onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對象不同(高能預警:接下來的幾句話可能會導致身體的嚴重不適)——

subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象,這個沒有問題,但是 lift() 之後的情況就復雜了點。

當含有 lift() 時: 
1.lift() 創建了一個 Observable 後,加上之前的原始 Observable,已經有兩個 Observable 了; 
2.而同樣地,新 Observable 裡的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個OnSubscribe; 
3.當用戶調用經過 lift() 後的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,於是它所觸發的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe; 
4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call()方法裡,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這裡,通過自己的call() 方法將新 Subscriber 和原始 Subscriber 進行關聯,並插入自己的『變換』代碼以實現變換),然後利用這個新Subscriber 向原始 Observable 進行訂閱。 
這樣就實現了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現事件序列的變換。

精簡掉細節的話,也可以這麼說:在 Observable 執行了 lift(Operator) 方法之後,會返回一個新的 Observable,這個新的Observable 會像一個代理一樣,負責接收原始的 Observable 發出的事件,並在處理後發送給 Subscriber。

如果你更喜歡具象思維,可以看圖:

lift() 原理圖

lift 原理圖

兩次和多次的 lift() 同理,如下圖:

兩次和多次的 lift

舉一個具體的 Operator 的實現。下面這是一個將事件中的 Integer 對象轉換成 String 的例子,僅供參考:

observable.lift(new Observable.Operator() {
    @Override
    public Subscriber call(final Subscriber subscriber) {
        // 將事件序列中的 Integer 對象轉換為 String 對象
        return new Subscriber() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

講述 lift() 的原理只是為了讓你更好地了解 RxJava ,從而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建議開發者自定義 Operator 來直接使用 lift(),而是建議盡量使用已有的 lift() 包裝方法(如map() flatMap() 等)進行組合來實現需求,因為直接使用 lift() 非常容易發生一些難以發現的錯誤。

3) compose: 對 Observable 整體的變換

除了 lift() 之外, Observable 還有一個變換方法叫做 compose(Transformer)。它和 lift() 的區別在於, lift() 是針對事件項和事件序列的,而 compose() 是針對 Observable 自身進行變換。舉個例子,假設在程序中有多個 Observable ,並且他們都需要應用一組相同的 lift() 變換。你可以這麼寫:

observable1
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
observable2
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);
observable3
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);
observable4
    .lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

你覺得這樣太不軟件工程了,於是你改成了這樣:

private Observable liftAll(Observable observable) {
    return observable
        .lift1()
        .lift2()
        .lift3()
        .lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);

可讀性、可維護性都提高了。可是 Observable 被一個方法包起來,這種方式對於 Observale 的靈活性似乎還是增添了那麼點限制。怎麼辦?這個時候,就應該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer {
    @Override
    public Observable call(Observable observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);

像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對象的 call 方法直接對自身進行處理,也就不必被包在方法的裡面了。

compose() 的原理比較簡單,不附圖喽。

線程控制:Scheduler高級

除了靈活的變換,RxJava 另一個牛逼的地方,就是線程的自由控制。

1) Scheduler 高級的 API

前面講到了,可以利用 subscribeOn() 結合 observeOn() 來實現線程控制,讓事件的產生和消費發生在不同的線程。可是在了解了map() flatMap() 等變換方法後,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程?

答案是:能。因為 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 並不是(嚴格說應該為『不一定是』,但這裡不妨理解為『不是』)subscribe() 參數中的 Subscriber ,而是 observeOn() 執行時的當前 Observable 所對應的 Subscriber ,即它的直接下級 Subscriber 。換句話說,observeOn() 指定的是它之後的操作所在的線程。因此如果有多次切換線程的需求,只要在每個想要切換線程的位置調用一次 observeOn() 即可。上代碼:

Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
    .subscribeOn(Schedulers.io())
    .observeOn(Schedulers.newThread())
    .map(mapOperator) // 新線程,由 observeOn() 指定
    .observeOn(Schedulers.io())
    .map(mapOperator2) // IO 線程,由 observeOn() 指定
    .observeOn(AndroidSchedulers.mainThread) 
    .subscribe(subscriber);  // Android 主線程,由 observeOn() 指定

如上,通過 observeOn() 的多次調用,程序實現了線程的多次切換。

不過,不同於 observeOn() , subscribeOn() 的位置放在哪裡都可以,但它是只能調用一次的。

又有好事的(其實還是當初的我)問了:如果我非要調用多次 subscribeOn() 呢?會有什麼效果?

這個問題先放著,我們還是從 RxJava 線程控制的原理說起吧。

2) Scheduler 的原理(二)

其實, subscribeOn() 和 observeOn() 的內部實現,也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的線程):

subscribeOn() 原理圖:

subscribeOn 原理圖

observeOn() 原理圖:

observeOn 原理

從圖中可以看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 “schedule…” 部位)。不同的是, subscribeOn()的線程切換發生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發送,因此 subscribeOn() 的線程控制可以從事件發出的開端就造成影響;而 observeOn() 的線程切換則發生在它內建的 Subscriber 中,即發生在它即將給下一級Subscriber 發送事件時,因此 observeOn() 控制的是它後面的線程。

最後,我用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,線程調度是怎麼發生的(由於圖中對象較多,相對於上面的圖對結構做了一些簡化調整):

線程控制綜合調用

線程控制綜合調用

圖中共有 5 處含有對事件的操作。由圖中可以看出,①和②兩處受第一個 subscribeOn() 影響,運行在紅色線程;③和④處受第一個observeOn() 的影響,運行在綠色線程;⑤處受第二個 onserveOn() 影響,運行在紫色線程;而第二個 subscribeOn() ,由於在通知過程中線程就被第一個 subscribeOn() 截斷,因此對整個流程並沒有任何影響。這裡也就回答了前面的問題:當使用了多個subscribeOn() 的時候,只有第一個 subscribeOn() 起作用。

3) 延伸:doOnSubscribe()

然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的。

在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由於在subscribe() 發生時就被調用了,因此不能指定線程,而是只能執行在 subscribe() 被調用時的線程。這就導致如果 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執行),將會有線程非法的風險,因為有時你無法預測subscribe() 將會在什麼線程執行。

而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在subscribe() 調用後而且在事件發送前執行,但區別在於它可以指定線程。默認情況下, doOnSubscribe() 執行在 subscribe() 發生的線程;而如果在 doOnSubscribe() 之後有 subscribeOn() 的話,它將執行在離它最近的 subscribeOn() 所指定的線程。

示例代碼:

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主線程執行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

如上,在 doOnSubscribe()的後面跟一個 subscribeOn() ,就能指定准備工作的線程了。

結語

此篇博文是RxJava最基本的介紹了,相信大家對RxJava有了初步的認識,接下來會對RXJava的應用場景進行分析~

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved