Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> Android RxJava使用介紹(四) RxJava的操作符

Android RxJava使用介紹(四) RxJava的操作符

編輯:關於Android編程

本篇文章繼續介紹以下類型的操作符

Observable Utility Operators(Observable的功能性操作符) Conditional and Boolean Operators(Observable的條件操作符)

Observable Utility Operators(Observable的功能性操作符)

combineLatest操作符

顧名思義,Delay操作符就是讓發射數據的時機延後一段時間,這樣所有的數據都會依次延後一段時間發射。在Rxjava中將其實現為Delay和DelaySubscription。不同之處在於Delay是延時數據的發射,而DelaySubscription是延時注冊Subscriber。流程圖如下:

這裡寫圖片描述

下面我們使用Delay和DelaySubscribtion操作符來延遲兩個Observable數據的發射,調用例子如下:

private Observable delayObserver() {
        return createObserver(2).delay(2000, TimeUnit.MILLISECONDS);
    }

    private Observable delaySubscriptionObserver() {
        return createObserver(2).delaySubscription(2000, TimeUnit.MILLISECONDS);
    }

    private Observable createObserver(int index) {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                log("subscrib:" + getCurrentTime());
                for (int i = 1; i <= index; i++) {
                    subscriber.onNext(getCurrentTime());
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }

    private long getCurrentTime() {
        return System.currentTimeMillis()/1000;
    }

分別對其進行注冊

mLButton.setText("delay");
        mLButton.setOnClickListener(e -> {
            log("start subscrib:" + getCurrentTime());
            delayObserver().subscribe(i -> log("delay:" + (getCurrentTime() - i)));
        });
        mRButton.setText("delaySubscription");
        mRButton.setOnClickListener(e -> {
            log("start subscrib:" + getCurrentTime());
            delaySubscriptionObserver().subscribe(i -> log("delaySubscription:" + i));
        });

運行結果如下。可以看到兩個操作符都讓我們達到了延遲2秒後再發射數據的目的
這裡寫圖片描述

二、Do

Do操作符就是給Observable的生命周期的各個階段加上一系列的回調監聽,當Observable執行到這個階段的時候,這些回調就會被觸發。在Rxjava實現了很多的doxxx操作符。

DoOnEach可以給Observable加上這樣的樣一個回調:Observable每發射一個數據的時候就會觸發這個回調,不僅包括onNext還包括onError和onCompleted。
DoOnNext則只有onNext的時候才會被觸發。
這裡寫圖片描述這裡寫圖片描述

doOnSubscribe和doOnUnSubscribe則會在Subscriber進行訂閱和反訂閱的時候觸發回調。當一個Observable通過OnError或者OnCompleted結束的時候,會反訂閱所有的Subscriber。
這裡寫圖片描述這裡寫圖片描述
DoOnErrZ喎?/kf/ware/vc/" target="_blank" class="keylink">vcrvh1NpPbkVycm9yt6LJ+rXEyrG68rSlt6K72LX3o6yyor2rVGhyb3dhYmxlttTP89f3zqqyzsr9tKu9+LvYtfe6r8r9wO+ju0RvT25Db21wbGV0Zbvh1NpPbkNvbXBsZXRlZLeiyfq1xMqxuvK0pbeiu9i196GjPGJyIC8+DQo8aW1nIGFsdD0="這裡寫圖片描述" src="/uploadfile/Collfiles/20160525/20160525091227419.png" title="\" />這裡寫圖片描述
DoOnTerminate會在Observable結束前觸發回調,無論是正常還是異常終止;finallyDo會在Observable結束後觸發回調,無論是正常還是異常終止。
這裡寫圖片描述這裡寫圖片描述
好了,介紹了這麼多do的操作符,我們接下來創建兩個Observable對象,並分別用上面的一系列do操作符進行注冊回調

private Observable doOnEachObserver() {
        return Observable.just(1, 2, 3)
                .doOnEach(notification -> log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()))
                .doOnNext(aInteger -> log("doOnNext send " + aInteger))
                .doOnSubscribe(() -> log("on subscribe"))
                .doOnUnsubscribe(() -> log("on unsubscribe\n"))
                .doOnCompleted(() -> log("onCompleted"));

    }

    private Observable doOnErrorObserver() {
        return createObserver()
                .doOnEach(notification -> log("doOnEach send " + notification.getValue() + " type:" + notification.getKind()))
                .doOnError(throwable -> log("OnError:" + throwable.getMessage()))
                .doOnTerminate(() -> log("OnTerminate"))
                .finallyDo(() -> log("finallyDo"));
    }

    private Observable createObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                for (int i = 1; i <= 5; i++) {
                    if (i <= 3) {
                        subscriber.onNext(i);
                    } else {
                        subscriber.onError(new Throwable("num>3"));
                    }
                }
            }
        });
    }

分別進行訂閱

mLButton.setText("do");
        mLButton.setOnClickListener(e -> doOnEachObserver().subscribe(i -> log("do:" + i)));
        mRButton.setText("doOnError");
        mRButton.setOnClickListener(e -> doOnErrorObserver().subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("subscriber onError:" + e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                log("subscriber onNext:" + integer);
            }
        }));

運行結果如下所示。可以看到各個回調的監聽都被依次觸發。
這裡寫圖片描述

三、Meterialize

Meterialize操作符將OnNext/OnError/OnComplete都轉化為一個Notification對象並按照原來的順序發射出來,而DeMeterialize則是執行相反的過程。

下面我們使用這兩個操作符來處理兩個Observable對象

private Observable> meterializeObserver() {
        return Observable.just(1, 2, 3).materialize();
    }

    private Observable deMeterializeObserver() {
        return eterializeObserver().dematerialize();
    }

分別進行訂閱

mLButton.setText("meterialize");
mLButton.setOnClickListener(e -> meterializeObserver().subscribe(i -> log("meterialize:" + i.getValue() + " type" + i.getKind())));
mRButton.setText("deMeterialize");
mRButton.setOnClickListener(e -> deMeterializeObserver().subscribe(i->log("deMeterialize:"+i)));

運行結果如下所示,可以看到onComplete也被meterialize包裝後發射了出來,onError也同樣。
這裡寫圖片描述

四、SubscribOn/ObserverOn

這兩個操作符在前面的例子中我們已經使用過多次了,使用起來十分方便。在android開發中,相信大家一定都遇到過不能在主線程修改UI的問題,所以不得不使用Handler、AsyncTask等來更新UI界面。使用SubscribOn和ObserverOn操作符,各種線程的問題都將變得十分地簡單。

SubscribOn用來指定Observable在哪個線程上運行,我們可以指定在IO線程上運行也可以讓其新開一個線程運行,當然也可以在當前線程上運行。一般來講會指定在各種後台線程而不是主線程上運行,就如同AsyncTask的doInBackground一樣。
這裡寫圖片描述
ObserverOn用來指定觀察者所運行的線程,也就是發射出的數據在那個線程上使用。在android中,如果我們要修改UI界面,觀察者就必須在主線程上運行,就如同AsyncTask的onPostExecute。
這裡寫圖片描述
下面創建兩個Observable並使用ObserverOn和SubscribOn使Observable和觀察者運行在不同的線程上。

private Observable observerOnObserver() {
        return createObserver()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.newThread());
    }

    private Observable subscribeOnObserver() {
        return createObserver()
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.immediate());
    }

    private Observable createObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                log("on subscrib:" + Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        });
    }

分別進行訂閱

mLButton.setText("observerOn");
mLButton.setOnClickListener(e -> observerOnObserver().subscribe(i -> log("observerOn:" + Thread.currentThread().getName())));
mRButton.setText("subscribeOn");
mRButton.setOnClickListener(e -> subscribeOnObserver().subscribe(i -> log("subscribeOn:" + Thread.currentThread().getName())));

運行結果如下
這裡寫圖片描述

五、TimeInterval\TimeStamp

TimeInterval會攔截發射出來的數據,取代為前後兩個發射兩個數據的間隔時間。對於第一個發射的數據,其時間間隔為訂閱後到首次發射的間隔。
這裡寫圖片描述
TimeStamp會將每個數據項給重新包裝一下,加上了一個時間戳來標明每次發射的時間
這裡寫圖片描述
下面使用這兩個操作符來處理兩個Observable對象

 private Observable> timeIntervalObserver() {
        return createObserver().timeInterval();
    }

    private Observable> timeStampObserver() {
        return createObserver().timestamp();
    }

    private Observable createObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i <= 3; i++) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread());
    }
mLButton.setText("timeInterval");
mLButton.setOnClickListener(e -> timeIntervalObserver().subscribe(i -> log("timeInterval:" + i.getValue()+"-"+i.getIntervalInMilliseconds())));
mRButton.setText("timeStamp");
mRButton.setOnClickListener(e -> timeStampObserver().subscribe(i -> log("timeStamp:" + i.getValue()+"-"+i.getTimestampMillis())));

運行結果如下所示。
這裡寫圖片描述

六、Timeout
Timeout操作符給Observable加上超時時間,每發射一個數據後就重置計時器,當超過預定的時間還沒有發射下一個數據,就拋出一個超時的異常。
Rxjava將Timeout實現為很多不同功能的操作符,比如說超時後用一個備用的Observable繼續發射數據等。
這裡寫圖片描述這裡寫圖片描述
下面我們創建一個Observable,逐漸加大間隔地發射數據,並使用timeout加上超時的限制。

private Observable timeoutObserver() {
        return createObserver().timeout(200, TimeUnit.MILLISECONDS);
    }

    private Observable timeoutobserverObserver() {
        return createObserver().timeout(200, TimeUnit.MILLISECONDS, Observable.just(5, 6));
    }

    private Observable createObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                for (int i = 0; i <= 3; i++) {
                    try {
                        Thread.sleep(i * 100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    subscriber.onNext(i);
                }
                subscriber.onCompleted();
            }
        });
    }
mLButton.setText("timeout");
        mLButton.setOnClickListener(e -> timeoutObserver().subscribe(new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {
                log(e);
            }

            @Override
            public void onNext(Integer integer) {
                log("timeout:" + integer);
            }
        }));
        mRButton.setText("timeoutobserver");
        mRButton.setOnClickListener(e -> timeoutobserverObserver().subscribe(i -> log(i)));

運行結果
這裡寫圖片描述

七、Using

Using操作符創建一個在Observable生命周期內存活的資源,也可以這樣理解:我們創建一個資源並使用它,用一個Observable來限制這個資源的使用時間,當這個Observable終止的時候,這個資源就會被銷毀。

Using需要使用三個參數,分別是:
1.創建這個一次性資源的函數
2.創建Observable的函數
這裡寫圖片描述
下面我們定義了一個Animal類,並使用Using來控制其創建和釋放。

private Observable usingObserver() {
        return Observable.using(() -> new Animal(), i -> Observable.timer(5000,TimeUnit.MILLISECONDS), o -> o.relase());
    }

    private class Animal {
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {
                log("animal eat");
            }
        };

        public Animal() {
            log("create animal");
            Observable.interval(1000, TimeUnit.MILLISECONDS)
                    .subscribe(subscriber);
        }

        public void relase() {
            log("animal released");
            subscriber.unsubscribe();
        }
    }

訂閱

(此處)折疊或打開
Observable observable = usingObserver();
        Subscriber subscriber = new Subscriber() {
            @Override
            public void onCompleted() {
                log("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                log("onError");
            }

            @Override
            public void onNext(Object o) {
                log("onNext"+o);
            }
        };
        mLButton.setText("using");
        mLButton.setOnClickListener(e -> observable.subscribe(subscriber));
        mRButton.setText("unSubscrib");
        mRButton.setOnClickListener(e -> subscriber.unsubscribe());

運行結果如下。在訂閱了幾秒之後,對其進行反訂閱,Observable就會終止從而觸發Animal的釋放。
這裡寫圖片描述
關於輔助操作符就到這裡了,本文中的demo程序見JavaDemo">https://github.com/Chaoba/RxJavaDemo

上面內容參考:http://blog.chinaunix.net/uid-20771867-id-5206187.html


Conditional and Boolean Operators(Observable的條件操作符)

一、All/Amb

All操作符根據一個函數對源Observable發射的所有數據進行判斷,最終返回的結果就是這個判斷結果。這個函數使用發射的數據作為參數,內部判斷所有的數據是否滿足我們定義好的判斷條件,如果全部都滿足則返回true,否則就返回false。
這裡寫圖片描述
Amb操作符可以將至多9個Observable結合起來,讓他們競爭。哪個Observable首先發射了數據(包括onError和onComplete)就會繼續發射這個Observable的數據,其他的Observable所發射的數據都會別丟棄。
這裡寫圖片描述
下面使用這兩個操作符,對於all操作符我們做了這樣的限制,初次使用的時候tag為false,會創建6個數字的Observable,以後都會創建5個數字的Observable。

private Observable allObserver() {
        Observable just;
        if (tag) {
            just = Observable.just(1, 2, 3, 4, 5);
        } else {
            just = Observable.just(1, 2, 3, 4, 5, 6);
        }
        tag = true;
        return just.all(integer -> integer < 6);
    }

    private Observable ambObserver() {
        Observable delay3 = Observable.just(1, 2, 3).delay(3000, TimeUnit.MILLISECONDS);
        Observable delay2 = Observable.just(4, 5, 6).delay(2000, TimeUnit.MILLISECONDS);
        Observable delay1 = Observable.just(7, 8, 9).delay(1000, TimeUnit.MILLISECONDS);
        return Observable.amb(delay1, delay2, delay3);
    }

分別進訂閱

mLButton.setText("all");
mLButton.setOnClickListener(e -> allObserver().subscribe(i -> log("all:" + i)));
mRButton.setText("amb");
mRButton.setOnClickListener(e -> ambObserver().subscribe(i -> log("amb:" + i)));

運行結果如下所示。第一次返回的6個數據的Observable不滿足所有都小於6的條件,所以結果是false,以後的都滿足條件,所以結果是true。使用amb操作符的Observable,第一個發射的數據的是7,所以輸出了7,8,9,其他的數據都丟棄了。
這裡寫圖片描述

二、Contains、IsEmpty
Contains操作符用來判斷源Observable所發射的數據是否包含某一個數據,如果包含會返回true,如果源Observable已經結束了卻還沒有發射這個數據則返回false。
IsEmpty操作符用來判斷源Observable是否發射過數據,如果發射過就會返回false,如果源Observable已經結束了卻還沒有發射這個數據則返回true。
這裡寫圖片描述這裡寫圖片描述
用這兩個操作符來判斷一下兩個Observable對象是否包含某個數據及是否為空

private Observable containsObserver() {
        if (tag) {
            return Observable.just(1, 2, 3).contains(3);
        }
        tag = true;
        return Observable.just(1, 2, 3).contains(4);
    }

    private Observable defaultObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onCompleted();
            }
        }).isEmpty();
    }

分別進行訂閱

mLButton.setText("contains");
mLButton.setOnClickListener(e -> containsObserver().subscribe(i -> log("contains:" + i)));
mRButton.setText("isEmpty");
mRButton.setOnClickListener(e -> defaultObserver().subscribe(i -> log("isEmpty:" + i)));

運行結果如下
這裡寫圖片描述

三、DefaultIfEmpty
DefaultIfEmpty操作符會判斷源Observable是否發射數據,如果源Observable發射了數據則正常發射這些數據,如果沒有則發射一個默認的數據
這裡寫圖片描述
下面我們用這個操作符來處理一個空的和一個非空的Observable,如果為空的話就返回默認值10

private Observable emptyObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(10);
    }

    private Observable notEmptyObserver() {
        return Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        }).defaultIfEmpty(10);
    }

分別進行訂閱

mLButton.setText("empty");
mLButton.setOnClickListener(e -> emptyObserver().subscribe(i -> log("empty:" + i)));
mRButton.setText("notEmpty");
mRButton.setOnClickListener(e -> notEmptyObserver().subscribe(i -> log("notEmpty:" + i)));

運行結果如下
這裡寫圖片描述

四、SequenceEqual
SequenceEqual操作符用來判斷兩個Observable發射的數據序列是否相同(發射的數據相同,數據的序列相同,結束的狀態相同),如果相同返回true,否則返回false
這裡寫圖片描述
下面用SequenceEqual分別來判斷兩個相同的和不相同的Observable

private Observable equalObserver() {
        return Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3));
    }

    private Observable notEqualObserver() {
        return Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2));
    }

分別進行訂閱

mLButton.setText("equal");
mLButton.setOnClickListener(e -> equalObserver().subscribe(i -> log("equal:" + i)));
mRButton.setText("notequal");
mRButton.setOnClickListener(e -> notEqualObserver().subscribe(i -> log("notequal:" + i)));

運行結果如下
這裡寫圖片描述

五、SkipUntil、SkipWhile

這兩個操作符都是根據條件來跳過一些數據,不同之處在於SkipUnitl是根據一個標志Observable來判斷的,當這個標志Observable沒有發射數據的時候,所有源Observable發射的數據都會被跳過;當標志Observable發射了一個數據,則開始正常地發射數據。SkipWhile則是根據一個函數來判斷是否跳過數據,當函數返回值為true的時候則一直跳過源Observable發射的數據;當函數返回false的時候則開始正常發射數據。
這裡寫圖片描述這裡寫圖片描述
下面使用這兩個操作符來跳過一些數據項。

private Observable skipUntilObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).skipUntil(Observable.timer(3, TimeUnit.SECONDS));
    }

    private Observable skipWhileObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).skipWhile(aLong -> aLong < 5);
    }

分別進行訂閱

mLButton.setText("skipUntil");
mLButton.setOnClickListener(e -> skipUntilObserver().subscribe(i -> log("skipUntil:" + i)));
mRButton.setText("skipWhile");
mRButton.setOnClickListener(e -> skipWhileObserver().subscribe(i -> log("skipWhile:" + i)));

運行結果如下
這裡寫圖片描述

六、TakeUntil、TakeWhile

TakeUntil和TakeWhile操作符可以說和SkipUnitl和SkipWhile操作符是完全相反的功能。TakeUntil也是使用一個標志Observable是否發射數據來判斷,當標志Observable沒有發射數據時,正常發射數據,而一旦標志Observable發射過了數據則後面的數據都會被丟棄。TakeWhile則是根據一個函數來判斷是否發射數據,當函數返回值為true的時候正常發射數據;當函數返回false的時候丟棄所有後面的數據。

這裡寫圖片描述這裡寫圖片描述

下面使用這兩個操作符來take兩個Observable發射的數據

private Observable takeUntilObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).takeUntil(Observable.timer(3, TimeUnit.SECONDS));
    }

    private Observable takeWhileObserver() {
        return Observable.interval(1, TimeUnit.SECONDS).takeWhile(aLong -> aLong < 5);
    }

分別進行訂閱

mLButton.setText("takeUntil");
mLButton.setOnClickListener(e -> takeUntilObserver().subscribe(i -> log("takeUntil:" + i)));
mRButton.setText("takeWhile");
mRButton.setOnClickListener(e -> takeWhileObserver().subscribe(i -> log("takeWhile:" + i)));

運行結果如下
這裡寫圖片描述

關於條件和布爾操作符就到這了,本文中所有的demo程序見:https://github.com/Chaoba/RxJavaDemo

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved