Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> 基於XMPP協議的aSmack源碼分析

基於XMPP協議的aSmack源碼分析

編輯:關於Android編程

\

在研究如何實現Pushing功能期間,收集了很多關於Pushing的資料,其中有一個androidnp開源項目用的人比較多,但是由於長時間沒有什麼人去維護,聽說bug的幾率挺多的,為了以後自己的產品穩定些,所以就打算自己研究一下asmack的源碼,自己做一個插件,androidnp移動端的源碼中包含了一個叫做asmack的jar。

Reader和Writer

在asmack中有兩個非常重要的對象PacketReader和PacketWriter,那麼從類名上看Packet + (Reader/Wirter),而TCP/IP傳輸的數據,叫做Packet(包),asmack使用的是XMPP協議,XMPP簡單講就是使用TCP/IP協議 + XML流協議的組合。所以這個了對象的作用從字面上看應該是,寫包與讀包,作用為從服務端讀寫數據。

PacketWriter中一定含有一個Writer對象,這個Writer是一個輸出流,同樣的PacketReader對象中有一個Reader,而這個Reader是一個輸入流,Writer和Reader對象就是一個簡單的讀寫器,他們是從socket對象中獲取出來後,經過裝飾變成現在這個樣子。

1 reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
2 writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "UTF-8"));

沒有什麼神奇的地方,主要看PacketWriter/Reader,這兩個對象分別把對應的Writer和Reader引用到自己的內部進行操作,下面就先看一個PacketWriter。

    /**
     * Creates a new packet writer with the specified connection.
     *
     * @param connection the connection.
     */
    protected PacketWriter(XMPPConnection connection) {
        this.queue = new ArrayBlockingQueue(500, true);
        this.connection = connection;
        init();
    }

還有就是PacketWriter初始化的時候將XMPPConnection對象傳了進來,因為在init方法中使用到了XMPPConnection對象的writer成員,我想說的是,為什麼不直接傳遞writer成員?而是將整個對象XMPPConnection傳了過來?其實這就是設計模式的好處,我們如果每次都傳遞的是自己的成員,那麼如果後期有改動,實現一個新的XMPPConnection與PacketWriter關聯,那麼老的代碼維護起來是很巨大的,如果這裡XMPPConnection和他的同事類PacketWriter都有相對應的接口,(XMPPConnection的接口是Connection)那就更完美了,而這裡用到的模式應該是中介者,不是絕對意義的中介者,由於形成中介者的條件比較高,所以實際開發中多是變形使用。PacketWriter對象在XMPPConnection中的connect方法中被初始化,它的最大作用是在其自身的內部創建了兩個消息循環,其中一個用30s的heartbeats向服務器發送空白字符,保持長連接。而第二個循環則時刻從隊列中主動取消息並發往服務器,而向外部提供的sendPacket方法則是向queue中添加消息,前面提到的循環機制都是在線程中工作,而消息的隊列用的是ArrayBlockingQueue,這個無邊界阻塞隊列可以存放任何對象,這裡存放的是Packet對象。

 1 public void sendPacket(Packet packet) {
 2         if (!done) {
 3             try {
 4                 queue.put(packet);
 5             }
 6             catch (InterruptedException ie) {
 7                 ie.printStackTrace();
 8                 return;
 9             }
10             synchronized (queue) {
11                 queue.notifyAll();
12             }
13         }
14     }
while (!done && (writerThread == thisThread)) {
                Packet packet = nextPacket();
                if (packet != null) {
                    synchronized (writer) {
                        writer.write(packet.toXML());
                        writer.flush();
                        // Keep track of the last time a stanza was sent to the server
                        lastActive = System.currentTimeMillis();
                    }
                }
            }

消息循環則是一個通過各種成員變量控制的while loop,第一行的nextPacket方法是向queue中獲取Packet消息,並且通過weiter將包發出去,這樣生產/消費的模型就搭建好了,這裡需要注意的是,我刪減了很多影響閱讀的代碼,並沒有全部貼上。關於heartbeats循環其實也是一個在線程中運行的while loop,也是通過一些成員控制。wirter向服務端寫了寫什麼?看下面的這個方法

void openStream() throws IOException {
        StringBuilder stream = new StringBuilder();
        stream.append("");
        writer.write(stream.toString());
        writer.flush();
    }

XML,沒錯,這也是符合XMPP協議規范的一種表現吧,至於更多XMPP協議的好處,由於本人的經驗有限,就不多做點評,希望後續會對其深入了解。

下面看一個PacketReader這個類都包含了什麼職責。

PacketReader

PacketReader所有的核心邏輯都在一個線程中完成的,PacketReader的工作很專注,同樣的在一個while loop中 不停的解析、刷新reader對象、同時作為事件源發送解析過後的各種Packet,解析這裡用的是Android獨特的Pull解析,Pull解析的特點事件驅動,在這裡被完全的利用了起來,隨著不同的標簽,PacketReader都會做出不同的處理,處理完這些數據用不同Pocket對象封裝,最後,分發出去,由監聽者做最後的業務處理。

readerThread = new Thread() {
public void run() {
parsePackets(this);
}
};

由於解析過程的代碼量過於多,我寫到什麼地方就分解什麼地方,大家有時間最好自己看源碼。

一、初始化/重置解析器

private void resetParser() {
try {
//用的是Pull解析
parser = XmlPullParserFactory.newInstance().newPullParser();
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
parser.setInput(connection.reader);
}
catch (XmlPullParserException xppe) {
xppe.printStackTrace();
}
}

上面這個resetParser方法還會在解析的過程中碰到不同的業務需求會不斷的被調用,有用和業務邏輯比較緊密,沒什麼技術含量,關鍵是要看解析的方式和同時作為事件源發送解析過後的各種Packet,這兩部分的設計,是非常的迷人的。

二、解析

do {
if (eventType == XmlPullParser.START_TAG) {
if (parser.getName().equals("message")) {
processPacket(PacketParserUtils.parseMessage(parser));
}
else if (parser.getName().equals("iq")) {
processPacket(PacketParserUtils.parseIQ(parser, connection));
}
else if (parser.getName().equals("presence")) {
processPacket(PacketParserUtils.parsePresence(parser));
}

PacketParserUtils是一個工具類,各個靜態方法傳入的還是Parser對象,內部同樣的使用Pull的方式進行解析,但是由於Pull是驅動解析,不會無故的浪費資源只會加載感興趣的內容,試想一下,如果這裡用Dom解析……PacketParserUtils的這些靜態解析方法返回的實例對象也不一樣,從方法名可以看出有IQ、message、presence等,他們的父類為Packet,這些對象又被執行processPacket方法的時候傳入

private void processPacket(Packet packet) {
if (packet == null) {
return;
}

// Loop through all collectors and notify the appropriate ones.
for (PacketCollector collector: connection.getPacketCollectors()) {
collector.processPacket(packet);
}

// Deliver the incoming packet to listeners.
listenerExecutor.submit(new ListenerNotification(packet));
}

processPacket方法內部有一個循環來轉調collector.processPacket(packet);方法,前提是connection.getPacketCollectors()內部有貨,到目前位置都沒有涉及到PacketCollector這個接口的內容,他的作用其實是一個觀察者模式中的執行者的作用,也就是傳說中的監聽器,凡是注冊了它的對象,都可以通過processPacket這個抽象方法,監聽packet的變化。可是到現在任何對象都沒有注冊它,所以這個Loop還沒有作用,因為目前我們還處在連接的步驟(還沒繞出來)。

listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable
/**
* A runnable to notify all listeners of a packet.
*/
private classListenerNotification implements Runnable {

privatePacket packet;

publicListenerNotification(Packet packet) {
this.packet = packet;
}

public voidrun() {
for(ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
listenerWrapper.notifyListener(packet);
}
}
}

我們上面看到listenerExecutor是一個線程池,在線程池中執行了一個凡是注冊了ListenerWrapper的對象,都將接收到packet,同樣的,到目前為止沒有對象注冊,(在RegisterTask過程中ListenerWrapper被注冊)

else if(eventType ==XmlPullParser.END_TAG) {
if(parser.getName().equals("stream")) {
// Disconnect the connection
connection.disconnect();
}
}

當文檔讀取結束是將斷開連接

voidcleanup() {
connection.recvListeners.clear();
connection.collectors.clear();
}

看到了嗎,只是將監聽器接口集合清空而已,並沒有斷開連接,或者取消消息循環

PacketReader對象的startup方法比較復雜,大體上執行了讀取流,並將解析好的Packet對象發送給觀察者,由觀察者繼續後續操作,目前觀察者還沒有出現,還有就是使用了線程池和令牌來操作執行線程,而且維護了一個connectionID成員,這個成員的作用還需要再看,這就不多說了。
關於Packet對象,packet對象有很多子類,上面舉例了3個,其實還有很多,都是在parser時封裝的
AuthMechanism\Challenge\Failure\IQ\Message\Presence\Response\Success
還有就是Pull解析的優點體現了出來,可以一個parser對象包含了很多信息,但可能沒到一個時刻我們需要的信息只是一小部分,這樣用Pull解析的驅動式就大大減少了冗余的過程,PacketReader對象使用了2個監聽器集合對象,PacketCollector、listenerWrapper,還是那句話,還沒看到觀察者,所以還不知道什麼情況下需要注冊這兩個監聽。
到目前位置packetReader.startup()方法終於告一個段落了。

 

register過程分析

RegisterTask這個task在運行中,添加了一個監聽,上面說道的PacketReader中有一個消息機制,在不停的解析服務器返回的結果,然後將解析過後的包分發給各個監聽器(觀察者),而register中就注冊了一個監聽器,比較有意思的是,監聽器被注冊時還加了一個過濾器,這個過濾器的目的是監聽器只接收自己感興趣的內容,這個設計真的很贊。這樣就不必在數據源頭PacketReader中對數據進行過濾了,只要後期擴展自己Packet和自己的過濾器,就能達到排除自己不關心的信息的功能。

Registrationregistration =newRegistration();
PacketFilterpacketFilter =newAndFilter(newPacketIDFilter(registration.getPacketID()),newPacketTypeFilter(IQ.class));

其中Registration的類型其實一個IQ的子類,IQ是Packet的子類。
AndFilter是PacketFilter的子類,PacketFilter的種類型有很多,也可以自己擴展,AndFilter就是其中一個、PacketTypeFilter也是、PacketIDFilter也是,
其中PacketTypeFilter的構造方法傳入一個IQ.class,其實就是通過這個類文件來過濾packet,這個PacketTypeFilter就是要設置關心的Packet,這裡面它告訴監聽器,只接收類型為IQ的Packet,這些Filter中都有一個關鍵方法,accept(Packet packet).這個accept方法每個Filter的實現方式都不一樣,我們可可以擴展自己的Filter並且重寫這個方法,最有意思的是AndFilter這個類,他的構造方法傳入的是一個動態數組,類型為PacketFilter,你可以傳入你需要的過濾器,將他們當成組合條件使用來過濾Packet,這個就是典型的裝飾設計模式和職責鏈模式的組合使用。

注冊監聽器

 1 PacketListener packetListener = new PacketListener() {
 2     //這一部分就是監聽器接收到Packet後執行的後續操作
 3     public void processPacket(Packet packet) {
 4         Log.d("RegisterTask.PacketListener", "processPacket().....");
 5         Log.d("RegisterTask.PacketListener", "packet=" + packet.toXML());
 6 
 7         if (packet instanceof IQ) {
 8             IQ response = (IQ) packet;
 9             if (response.getType() == IQ.Type.ERROR) {
10                 if (!response.getError().toString().contains("409")) {
11                     Log.e(LOGTAG,
12                             "Unknown error while registering XMPP account! "
13                                     + response.getError()
14                                             .getCondition());
15                 }
16             } else if (response.getType() == IQ.Type.RESULT) {
17                 xmppManager.setUsername(newUsername);
18                 xmppManager.setPassword(newPassword);
19                 Log.d(LOGTAG, "username=" + newUsername);
20                 Log.d(LOGTAG, "password=" + newPassword);
21 
22                 Editor editor = sharedPrefs.edit();
23                 editor.putString(Constants.XMPP_USERNAME,
24                         newUsername);
25                 editor.putString(Constants.XMPP_PASSWORD,
26                         newPassword);
27                 editor.commit();
28                 Log
29                         .i(LOGTAG,
30                                 "Account registered successfully");
31                 //執行task
32                 xmppManager.runTask();
33             }
34         }
35     }
36 };

addPacketListener方法傳入一個監聽器和過濾器,看一下內部

/**
 * Registers a packet listener with this connection. A packet filter determines
 * which packets will be delivered to the listener. If the same packet listener
 * is added again with a different filter, only the new filter will be used.
 * 
 * @param packetListener the packet listener to notify of new received packets.
 * @param packetFilter   the packet filter to use.
 */
public void addPacketListener(PacketListener packetListener, PacketFilter packetFilter) {
    if (packetListener == null) {
        throw new NullPointerException("Packet listener is null.");
    }
    ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter);
    recvListeners.put(packetListener, wrapper);
}

可以看到,監聽器和過濾器被 ListenerWrapper 再次封裝,後續的recvListeners這個集合將ListenerWrapper收入囊中,好整個注冊過程完畢,就等待接收信息了,那麼發送信息的地方在什麼地方呢?分析connect過程時,上面的PacketReader中已經開始循環發送了,代碼如下

listenerExecutor.submit(new ListenerNotification(packet));其中ListenerNotification是個Runnable

/**
 * A runnable to notify all listeners of a packet.
 */
private class ListenerNotification implements Runnable {

    private Packet packet;

    public ListenerNotification(Packet packet) {
        this.packet = packet;
    }

    public void run() {
        for (ListenerWrapper listenerWrapper : connection.recvListeners.values()) {
            listenerWrapper.notifyListener(packet);
        }
    }
}

而listenerWrapper的notifyListener(packet)內部,使用了傳入的過濾器對Packet進行了過濾

/**
 * Notify and process the packet listener if the filter matches the packet.
 * 
 * @param packet the packet which was sent or received.
 */
public void notifyListener(Packet packet) {
    if (packetFilter == null || packetFilter.accept(packet)) {
        packetListener.processPacket(packet);
    }

而具體的過濾機制還是轉調了傳入的過濾器本身的過濾方式accept,非常的靈活。過濾完的Packet將被發送出去

這個方法connection.sendPacket(registration);將一個Registration對象發了出去,

public void sendPacket(Packet packet) {
    if (!isConnected()) {
        throw new IllegalStateException("Not connected to server.");
    }
    if (packet == null) {
        throw new NullPointerException("Packet is null.");
    }
    packetWriter.sendPacket(packet);
}

內部轉調的是 packetWriter.sendPacket(packet);以前提到過PacketWirter中有兩個循環機制,其中一個就是在不停的訪問隊列來獲取Packet,而這個sendPacket方法就是將消息寫入隊列中供消費者使用。

/**
 * Sends the specified packet to the server.
 *
 * @param packet the packet to send.
 */
public void sendPacket(Packet packet) {
    if (!done) {
        // Invoke interceptors for the new packet that is about to be sent. Interceptors
        // may modify the content of the packet.
        //內部執行了一個發送數據源的動作,也是為某些監聽器對象服務的interceptorWrapper.notifyListener(packet);
        connection.firePacketInterceptors(packet);

        try {
            //將一個Packet對象放入到阻塞隊列中,在上面的witerPacket方法中的wile循環中發送出去
            queue.put(packet);
        }
        catch (InterruptedException ie) {
            ie.printStackTrace();
            return;
        }
        synchronized (queue) {
            queue.notifyAll();
        }

        // Process packet writer listeners. Note that we're using the sending
        // thread so it's expected that listeners are fast.
        connection.firePacketSendingListeners(packet);
    }
}   

其實,注冊的過程就是在注冊監聽,這樣在有消息發出時,才可以根據業務需求對消息進行接收和處理。

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved