Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> Android開發 >> 關於android開發 >> RxJava漫談-RxAndroid使用,rxjava-rxandroid

RxJava漫談-RxAndroid使用,rxjava-rxandroid

編輯:關於android開發

RxJava漫談-RxAndroid使用,rxjava-rxandroid


RxJava在github上的地址:https://github.com/ReactiveX/RxJava

RxAndroid在github上的地址:https://github.com/ReactiveX/RxAndroid

 

本文主要介紹RxAndroid的使用,如果對於RxJava還不熟悉的可以先看一下RxJava的介紹文章。

Android的程序是用Java書寫的,Android也有一些自己的線程模型,例如AsyncTask和Handler等。RxJava正是結合了前面的這幾項,在此基礎上推出了RxAndroid。下面介紹使用。

首先,我們在項目中引入RxAndroid,主要是在gradle腳本中引入下面兩句話即可。

dependencies {
    compile fileTree(dir: 'libs', include: ['*.jar'])
    testCompile 'junit:junit:4.12'
    compile 'com.android.support:appcompat-v7:23.1.1'
    compile 'com.android.support:design:23.1.1'
    //引入RxAndroid----begin
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'io.reactivex:rxjava:1.1.0'
    //引入RxAndroid----end
}

這樣就可以在Android代碼中使用RxAndroid了,下面的例子展示了一段使用RxAndroid書寫的代碼:

Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(/* an Observer */);

這一段代碼是在RxAndroid的官網上寫的一段示例的代碼。這裡可以看出來,與RxJava相比較,其主要增加了如下的幾個java文件:

    AndroidSchedulers
    BuildConfig
    HandlerScheduler
    MainThreadSubscription
    RxAndroidPlugins
    RxAndroidSchedulersHook

下面分別對這幾個文件的源碼看一下:

1.AndroidSchedulers類的源碼

public final class AndroidSchedulers {
    private AndroidSchedulers() {
        throw new AssertionError("No instances");
    }

    private static class MainThreadSchedulerHolder {
        static final Scheduler MAIN_THREAD_SCHEDULER =
                new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    public static Scheduler mainThread() {
        Scheduler scheduler =
                RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
        return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
    }
}

這個類主要提供了MainThread調度器,在RxAndroid中需要在主線程中處理的食物都需要用到這個類的mainThread。

2.BuildConfig類的源碼:

public final class BuildConfig {
  public static final boolean DEBUG = false;
  public static final String APPLICATION_ID = "rx.android";
  public static final String BUILD_TYPE = "release";
  public static final String FLAVOR = "";
  public static final int VERSION_CODE = -1;
  public static final String VERSION_NAME = "";
}

主要是一些常量配置。

3.HandlerScheduler類的源碼:

public final class HandlerScheduler extends Scheduler {

    public static HandlerScheduler from(Handler handler) { //從一個Handler中創建一個Scheduler
        if (handler == null) throw new NullPointerException("handler == null");
        return new HandlerScheduler(handler);
    }

    private final Handler handler;

    HandlerScheduler(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Worker createWorker() {//覆蓋Scheduler的createWorker函數,創建基於Handler的Worker
        return new HandlerWorker(handler);
    }

    static class HandlerWorker extends Worker {

        private final Handler handler;

        private final CompositeSubscription compositeSubscription = new CompositeSubscription();

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public void unsubscribe() {
            compositeSubscription.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return compositeSubscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {//覆蓋Worker的調度函數schedule
            if (compositeSubscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

            final ScheduledAction scheduledAction = new ScheduledAction(action);
            scheduledAction.addParent(compositeSubscription);
            compositeSubscription.add(scheduledAction);

            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));//使用Handler處理這個調度動作ScheduleAction

            scheduledAction.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    handler.removeCallbacks(scheduledAction);//這句話保證當調度動作被取消的時候,能夠及時把這個action從Handler中移除
                }
            }));

            return scheduledAction;
        }

        @Override
        public Subscription schedule(final Action0 action) {
            return schedule(action, 0, TimeUnit.MILLISECONDS);
        }
    }
}
HandlerScheduler 類就是使用Handler作為處理核心的Scheduler類。

4.MainThreadSubscription類的源碼:

public abstract class MainThreadSubscription implements Subscription {

  public static void verifyMainThread() { //靜態方法,判斷當前線程是否是主線程
    if (Looper.myLooper() != Looper.getMainLooper()) {
      throw new IllegalStateException(
          "Expected to be called on the main thread but was " + Thread.currentThread().getName());
    }
  }

  private final AtomicBoolean unsubscribed = new AtomicBoolean();

  @Override public final boolean isUnsubscribed() {
    return unsubscribed.get();
  }

  @Override public final void unsubscribe() {//主線程的取消訂閱
    if (unsubscribed.compareAndSet(false, true)) {
      if (Looper.myLooper() == Looper.getMainLooper()) {//如果是主線程直接進行
        onUnsubscribe();
      } else {
        AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {//如果不是主線程,就創建創建一個Action放到主線程中去執行
          @Override public void call() {
            onUnsubscribe();
          }
        });
      }
    }
  }

  protected abstract void onUnsubscribe();
}
MainThreadSubscription類主要在意的就是unsubscribe的執行線程,這裡采取一切方式保證其在主線程中執行。

5.RxAndroidPlugins類的源碼:

public final class RxAndroidPlugins {//這個類的主要作用就是維護了一個RxAndroidSchedulersHook
    private static final RxAndroidPlugins INSTANCE = new RxAndroidPlugins();

    public static RxAndroidPlugins getInstance() {
        return INSTANCE;
    }

    private final AtomicReference<RxAndroidSchedulersHook> schedulersHook =
            new AtomicReference<RxAndroidSchedulersHook>();

    RxAndroidPlugins() {
    }

    @Beta
    public void reset() {
        schedulersHook.set(null);
    }

    public RxAndroidSchedulersHook getSchedulersHook() {
        if (schedulersHook.get() == null) {
            schedulersHook.compareAndSet(null, RxAndroidSchedulersHook.getDefaultInstance());//如果原來是null,就設置一個RxAndroidSchedulersHook
            // We don't return from here but call get() again in case of thread-race so the winner will
            // always get returned.
        }
        return schedulersHook.get();
    }

    public void registerSchedulersHook(RxAndroidSchedulersHook impl) {
        if (!schedulersHook.compareAndSet(null, impl)) {//如果原來的RxAndroidSchedulerHook是空,則直接持有,否則拋出異常
            throw new IllegalStateException(
                    "Another strategy was already registered: " + schedulersHook.get());
        }
    }
}

可以看出RxAndroidSchedulerHook必須在使用前注冊,一旦使用就不能再注冊了。

6.RxAndroidSchedulersHook類的源碼:

public class RxAndroidSchedulersHook {
    private static final RxAndroidSchedulersHook DEFAULT_INSTANCE = new RxAndroidSchedulersHook();

    public static RxAndroidSchedulersHook getDefaultInstance() {
      return DEFAULT_INSTANCE;
    }

    public Scheduler getMainThreadScheduler() {
        return null;
    }

    public Action0 onSchedule(Action0 action) {
        return action;
    }
}

這是RxAndroid提供的一個默認的RxAndroidSchedulerHook類,程序員也可以自己定義一個這樣的類注冊到RxAndroidPlugins中,但是必須在使用RxAndroidPlugins之前注冊。

自定義的RxAndroidSchedulerHook類可以覆蓋onSchedule函數,在這裡進行一些處理,例如日志記錄等。

 

以上只是說明了RxAndroid與RxJava中不一樣的地方,並沒有嘗試說明RxJava是什麼,在閱讀本文之前,讀者應該先弄明白這個問題。

現在再來看之前的兩個例子就很明白了:

public class ReactiveFragment extends Fragment {//在UI線程中的例子
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(/* an Observer */);
    }
new Thread(new Runnable() {//在其他線程中的例子
    @Override
    public void run() {
        final Handler handler = new Handler(); //綁定到這個線程的Handler
        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(HandlerScheduler.from(handler))
                .subscribe(/* an Observer */)

        // perform work, ...
    }
}, "custom-thread-1").start();

 

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved