Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> 在 Android 上使用 RxNetty

在 Android 上使用 RxNetty

編輯:關於Android編程

在 Android 上使用 RxNetty

Netty是由JBOSS提供的一個Java開源框架,是一個支持TCP/UDP/HTTP等網絡協議的通信框架,和Mina類似,廣泛應用於RPC框架。RxNetty則是支持RxJava的Netty開源框架,現在我們來看一下在Android上如何使用RxNetty。

添加RxNetty

在 Android Studio 中添加 RxNetty 的依賴:
這裡寫圖片描述
把RxNetty的tcp包加入到依賴,直接這樣編譯會有兩個問題,第一個問題是jar重復:<喎?/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxwcmUgY2xhc3M9"brush:java;"> com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK THIRD-PARTY File1: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jar File2: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\jmh-generator-annprocess-1.11.2.jar

解決辦法:

dependencies {
  ...
  compile('io.reactivex:rxnetty-tcp:0.5.2-RC1') {
    exclude group: 'org.openjdk.jmh'
  }
  ...
}

另一個問題是引用的netty包中META-INF/下的部分文件重復。
解決辦法:

  packagingOptions {
    ...
    exclude 'META-INF/INDEX.LIST'
    exclude 'META-INF/BenchmarkList'
    exclude 'META-INF/io.netty.versions.properties'
    exclude 'META-INF/CompilerHints'
    ...
  }

到這裡RxNetty就成功添加到項目模塊中了。接下來看看到底如何使用RxNetty。

如何使用

拿TCP協議舉例,用過Netty的都清楚創建連接的步驟:

        workerGroup = new NioEventLoopGroup();
        Bootstrap boot = new Bootstrap();
        boot.group(workerGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer() {
              @Override public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                p.addLast("decoder", new MessageDecoder());
                p.addLast("encoder", new MessageEncoder());
                p.addLast("handler", new MessageHandler());
              }
            });
        ChannelFuture f =
            boot.connect("localhost", 8888).syncUninterruptibly();
        channel = f.channel();

自定義的協議需要我們自己實現編碼解碼Handler,還有最後處理數據的MessageHandler

@Sharable
public class MessageHandler extends SimpleChannelInboundHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, Message msg)
            throws Exception {
            //處理消息
    }
}

在RxNetty中可以不實現MessageHandler,因為通過注冊的觀察者可以得到最終解碼後的協議對象。
下面是RxNetty創建連接的方法:

  Connection mConnection;

  public Observable connect(final String url, final int port) {
    return Observable.create(new Observable.OnSubscribe() {
      @Override public void call(final Subscriber subscriber) {
        TcpClient.newClient(url, port).addChannelHandlerLast("decoder",
            new Func0() {
              @Override public ChannelHandler call() {
                return new StringDecoder();
              }
            }).addChannelHandlerLast("encoder", new Func0() {
          @Override public ChannelHandler call() {
            return new StringEncoder();
          }
        }).createConnectionRequest().subscribe(new Observer>() {
          @Override public void onCompleted() {
            subscriber.onCompleted();
          }

          @Override public void onError(Throwable e) {
            subscriber.onError(e);
          }

          @Override public void onNext(Connection connection) {
            mConnection = connection;
            subscriber.onNext(true);
          }
        });
      }
    });
  }

上面的TCP客戶端創建了一個字符串解碼器、一個字符串編碼器,然後創建鏈接,在鏈接創建成功後把鏈接對象connection保存到mConnection方便後面發送數據,同時通知訂閱者socket連接成功。

在Android中不能在UI線程創建網絡鏈接,就連InetSocketAddress類都不能在UI線程中創建,TcpClient.newClient(url, port)...createConnectionRequest()本身是一個Observable,但是由於方法newClient(url, port)中創建了InetSocketAddress類,Android嚴苛模式會報異常,所以上面創建鏈接的TcpClient方法在外層又包裹了一個Observable,讓它運行在IO線程等其它非UI線程才可以正常創建socket鏈接。

用來接收數據、發送數據的方法同樣返回一個Observable,代碼如下:

  public Observable receive() {
    if (mConnection != null) {
      return mConnection.getInput();
    }
    return null;
  }

  public Observable send(String s) {
    return mConnection.writeString(Observable.just(s));
  }

測試上面方法的客戶端代碼:

  public void rxNettyClientTest() {
    connect("localhost", 60000).subscribe(new Observer() {
      @Override public void onCompleted() {

      }

      @Override public void onError(Throwable e) {
        //reconnect
        Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {
          @Override public void call(Long aLong) {
            if (mConnection != null) mConnection.closeNow();
            rxNettyClientTest();
          }
        });
        System.out.println("reconnect");
      }

      @Override public void onNext(Boolean aBoolean) {
        //send data
        send("hello world!").subscribe(new Action1() {
          @Override public void call(Void aVoid) {
            System.out.println("send success!");
          }
        });
        //receive data
        receive().subscribe(new Observer() {
          @Override public void onCompleted() {

          }

          @Override public void onError(Throwable e) {
            //reconnect
            Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {
              @Override public void call(Long aLong) {
                if (mConnection != null) mConnection.closeNow();
                rxNettyClientTest();
              }
            });
            System.out.println("reconnect");
          }

          @Override public void onNext(String s) {
            System.out.println("receive:" + s);
          }
        });
      }
    });
  }

上面的代碼包涵了讀、寫數據和重連等主要功能。
然後是創建服務端的代碼:

  public void rxNettyServerTest() {
    TcpServer server;
    server = TcpServer.newServer(60000).addChannelHandlerLast("string-decoder",
        new Func0() {
          @Override public ChannelHandler call() {
            return new StringDecoder();
          }
        }).addChannelHandlerLast("string-encoder", new Func0() {
      @Override public ChannelHandler call() {
        return new StringEncoder();
      }
    }).start(new ConnectionHandler() {
      @Override public Observable handle(Connection newConnection) {
        return newConnection.writeStringAndFlushOnEach(
            newConnection.getInput().map(new Func1() {
              @Override public String call(String s) {
                System.out.println("receive:" + s);
                return "echo=> " + s;
              }
            }));
      }
    });
    server.awaitShutdown();
  }

服務端代碼比較簡單,直接echo客戶端發來的數據。
關於線程,在Android中處理網絡需要subscribeOn(Schedulers.io()),如果需要在UI線程展示則observeOn(AndroidSchedulers.mainThread())

最後,在Android上使用RxNetty大多數是因為沒有合適的socket客戶端框架,RxNetty也支持Http協議,Android上的Http協議的可選框架比較多,所以就不在這裡介紹了,想要了解的可以到這裡RxNetty。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved