編輯:關於android開發
Netty是由JBOSS提供的一個Java開源框架,是一個支持TCP/UDP/HTTP等網絡協議的通信框架,和Mina類似,廣泛應用於RPC框架。RxNetty則是支持RxJava的Netty開源框架,現在我們來看一下在Android上如何使用RxNetty。
在 Android Studio 中添加 RxNetty 的依賴:
把RxNetty的tcp包加入到依賴,直接這樣編譯會有兩個問題,第一個問題是jar重復:
com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files copied in APK THIRD-PARTY
File1: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jar
File2: C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\jmh-generator-annprocess-1.11.2.jar
解決辦法:
dependencies {
...
compile('io.reactivex:rxnetty-tcp:0.5.2-RC1') {
exclude group: 'org.openjdk.jmh'
}
...
}
另一個問題是引用的netty包中META-INF/下的部分文件重復。
解決辦法:
packagingOptions {
...
exclude 'META-INF/INDEX.LIST'
exclude 'META-INF/BenchmarkList'
exclude 'META-INF/io.netty.versions.properties'
exclude 'META-INF/CompilerHints'
...
}
到這裡RxNetty就成功添加到項目模塊中了。接下來看看到底如何使用RxNetty。
拿TCP協議舉例,用過Netty的都清楚創建連接的步驟:
workerGroup = new NioEventLoopGroup();
Bootstrap boot = new Bootstrap();
boot.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("decoder", new MessageDecoder());
p.addLast("encoder", new MessageEncoder());
p.addLast("handler", new MessageHandler());
}
});
ChannelFuture f =
boot.connect("localhost", 8888).syncUninterruptibly();
channel = f.channel();
自定義的協議需要我們自己實現編碼解碼Handler,還有最後處理數據的MessageHandler
:
@Sharable
public class MessageHandler extends SimpleChannelInboundHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, Message msg)
throws Exception {
//處理消息
}
}
在RxNetty中可以不實現MessageHandler
,因為通過注冊的觀察者可以得到最終解碼後的協議對象。
下面是RxNetty創建連接的方法:
Connection mConnection;
public Observable connect(final String url, final int port) {
return Observable.create(new Observable.OnSubscribe() {
@Override public void call(final Subscriber subscriber) {
TcpClient.newClient(url, port).addChannelHandlerLast("decoder",
new Func0() {
@Override public ChannelHandler call() {
return new StringDecoder();
}
}).addChannelHandlerLast("encoder", new Func0() {
@Override public ChannelHandler call() {
return new StringEncoder();
}
}).createConnectionRequest().subscribe(new Observer>() {
@Override public void onCompleted() {
subscriber.onCompleted();
}
@Override public void onError(Throwable e) {
subscriber.onError(e);
}
@Override public void onNext(Connection connection) {
mConnection = connection;
subscriber.onNext(true);
}
});
}
});
}
上面的TCP客戶端創建了一個字符串解碼器、一個字符串編碼器,然後創建鏈接,在鏈接創建成功後把鏈接對象connection
保存到mConnection
方便後面發送數據,同時通知訂閱者socket連接成功。
在Android中不能在UI線程創建網絡鏈接,就連
InetSocketAddress
類都不能在UI線程中創建,TcpClient.newClient(url, port)...createConnectionRequest()
本身是一個Observable
,但是由於方法newClient(url, port)
中創建了InetSocketAddress
類,Android嚴苛模式會報異常,所以上面創建鏈接的TcpClient方法在外層又包裹了一個Observable
,讓它運行在IO線程等其它非UI線程才可以正常創建socket鏈接。
用來接收數據、發送數據的方法同樣返回一個Observable,代碼如下:
public Observable receive() {
if (mConnection != null) {
return mConnection.getInput();
}
return null;
}
public Observable send(String s) {
return mConnection.writeString(Observable.just(s));
}
測試上面方法的客戶端代碼:
public void rxNettyClientTest() {
connect("localhost", 60000).subscribe(new Observer() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
//reconnect
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {
@Override public void call(Long aLong) {
if (mConnection != null) mConnection.closeNow();
rxNettyClientTest();
}
});
System.out.println("reconnect");
}
@Override public void onNext(Boolean aBoolean) {
//send data
send("hello world!").subscribe(new Action1() {
@Override public void call(Void aVoid) {
System.out.println("send success!");
}
});
//receive data
receive().subscribe(new Observer() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
//reconnect
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Action1() {
@Override public void call(Long aLong) {
if (mConnection != null) mConnection.closeNow();
rxNettyClientTest();
}
});
System.out.println("reconnect");
}
@Override public void onNext(String s) {
System.out.println("receive:" + s);
}
});
}
});
}
上面的代碼包涵了讀、寫數據和重連等主要功能。
然後是創建服務端的代碼:
public void rxNettyServerTest() {
TcpServer server;
server = TcpServer.newServer(60000).addChannelHandlerLast("string-decoder",
new Func0() {
@Override public ChannelHandler call() {
return new StringDecoder();
}
}).addChannelHandlerLast("string-encoder", new Func0() {
@Override public ChannelHandler call() {
return new StringEncoder();
}
}).start(new ConnectionHandler() {
@Override public Observable handle(Connection newConnection) {
return newConnection.writeStringAndFlushOnEach(
newConnection.getInput().map(new Func1() {
@Override public String call(String s) {
System.out.println("receive:" + s);
return "echo=> " + s;
}
}));
}
});
server.awaitShutdown();
}
服務端代碼比較簡單,直接echo客戶端發來的數據。
關於線程,在Android中處理網絡需要subscribeOn(Schedulers.io())
,如果需要在UI線程展示則observeOn(AndroidSchedulers.mainThread())
。
最後,在Android上使用RxNetty大多數是因為沒有合適的socket客戶端框架,RxNetty也支持Http協議,Android上的Http協議的可選框架比較多,所以就不在這裡介紹了,想要了解的可以到這裡RxNetty。
分享一個在程序中監聽Log信息的很實用的例子。 為什麼說它實用?原因是Android的開發廠商各種修改之後手機和手機之間以後存在很多差異。比如說魅
Android動態部署五:如何從插件apk中啟動Service 經過前面幾篇文章的分析,我們了解到了Google原生是如何拆分apk的,並且我們自己可以通過解析manif
開發首屏廣告(Android)簡述 作為一個成熟的應用, 必須要有廣告. 那麼, 如何優雅地開發廣告呢? 需要注意一些細節. 本文提供一個簡單的示例, 代碼僅供參考.
android WindowManager解析與騙取QQ密碼案例分析 最近在網上看見一個人在烏雲上提了一個漏洞,應用可以開啟一個後台Service,檢測當前頂部應用