Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> android中非阻塞socket通信

android中非阻塞socket通信

編輯:關於Android編程

1、什麼是同步與異步,阻塞與非阻塞

首先我們要明白搞明白:同步就等於阻塞?異步就等於非阻塞?這是不對的,同步不等於阻 塞,而異步也不等於非阻塞。

1)那什麼是同步編程?

什麼是同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不返回。根據這個定義,android中絕大多數函數都是同步調用。但是一般而言,我們在談論同步、異步的時候,特指那些需要其他部件協作或者需要一定時間完成的任務。在android中,由於主線程(UI線程的不安全性),我們經常會用到handler的SendMessage函數,就是一個同步線程,它將數據傳送給某個窗口後,在對方處理完消息後,這個函數是不會返回的,當處理完畢的時候才返回相應的返回值。

2)那什麼是異步編程?

異步的概念和同步相反的。當一個調用者異步發出一個功能調用時,調用者不能立刻得到結果。實際處理這個調用的部件在完成後,通過狀態、通知和回調來通知調用者。以 android中AsyncTask類為例,顧名思義異步執行任務,在doInBackground 執行完成後,onPostExecute 方法將被UI 線程調用,後台的計算結果將通過該方法傳遞到UI 線程,並且在界面上展示給用戶.。在android或者java異步編程中需要注意以下幾個知識點:回調,監聽者模式,觀察者模式。這幾點在之後另外幾篇文章中會提及。

3)什麼是阻塞式編程?

阻塞調用是指調用結果返回之前,當前線程會被掛起。函數只有在 得到結果之後才會返回。因為這點定義跟同步編程的定義很相像,所以很多人認為同步編程就等阻塞式編程。對於同步調用來說,很多時候當前線程還是激活的,只是從邏輯上當前函數沒有返回而已。例如,我們在 socket編程中調用Receive函數,如果緩沖區中沒有數據,這個函數就會一直等待,直到有數據才返回。而此時,當前線程還會繼續處理各種各樣的消 息。如果主窗口和調用函數在同一個線程中,除非你在特殊的界面操作函數中調用,其實主界面還是應該可以刷新。但是在android中,由於主線程(UI線程)的不安全性,特別到4.0版本後,系統已經不允許在主線程中進行耗時的同步編程。所以android才出現了AsyncTask類用於異步編程。

4)什麼是非阻塞式編程?

非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前,該函數不會阻塞當前線程,而會立刻返回。從這個定義上來說,非阻塞編程可以說是異步編程的一種,但是異步編程並不等於非阻塞式編程。

5)區別大概

我們用買票的案例去理解它,當我們去買票的時候,如果還在排隊,一直排著,直到買到票再離開,這個就是同步編程(所謂同步就是當一個進程發起一個函數(任務)調用的時候,一直會到函數(任務)完成)。那還有另外一方式,你可以叫一個人(監聽者,觀察者)幫你看著,直接你買票了,再通知你,你可以先去別的事情(而異步這不會這樣,異步情況下是當一個進程發 起一個函數(任務)調用的時候,不會等函數返回)。阻塞是就是等排隊,非阻塞就是直接走開。

2、幾個關鍵知識點

1)java.net.InetSocketAddress

此類實現 IP 套接字地址(IP 地址 + 端口號)。它還可以是一個對(主機名 + 端口號),在此情況下,將嘗試解析主機名。如果解析失敗,則該地址將被視為未解析 地址,但是其在某些情形下仍然可以使用,比如通過代理連接。
需注意接口: public InetSocketAddress(InetAddress addr,int port) 根據 IP 地址和端口號創建套接字地址。
有效端口值介於 0 和 65535 之間。端口號 zero 允許系統在 bind 操作中挑選暫時的端口。

2)java.nio.channels.Selector

可通過調用此類的 open 方法創建選擇器,該方法將使用系統的默認選擇器提供者創建新的選擇器。也可通過調用自定義選擇器提供者的 openSelector 方法來創建選擇器。通過選擇器的 close 方法關閉選擇器之前,它一直保持打開狀態。
需注意接口: public static Selector open()throws IOException
打開一個選擇器。
public abstract void close()throws IOException
關閉此選擇器。
如果某個線程目前正阻塞在此選擇器的某個選擇方法中,則中斷該線程,如同調用該選擇器的 wakeup 方法那樣。
所有仍與此選擇器關聯的未取消鍵已無效、其通道已注銷,並且與此選擇器關聯的所有其他資源已釋放。
如果此選擇器已經關閉,則調用此方法無效。
關閉選擇器後,除了調用此方法或 wakeup 方法外,以任何其他方式繼續使用它都將導致拋出 ClosedSelectorException。 注:選擇器的關閉是關鍵點,特別需要注意上述第二條

3)java.nio.channels.SocketChannel

針對面向流的連接套接字的可選擇通道。
套接字通道不是連接網絡套接字的完整抽象。必須通過調用 socket 方法所獲得的關聯 Socket 對象來完成對套接字選項的綁定、關閉和操作。不可能為任意的已有套接字創建通道,也不可能指定與套接字通道關聯的套接字所使用的 SocketImpl 對象。
通過調用此類的某個 open 方法創建套接字通道。新創建的套接字通道已打開,但尚未連接。試圖在未連接的通道上調用 I/O 操作將導致拋出 NotYetConnectedException。可通過調用套接字通道的 connect 方法連接該通道;一旦連接後,關閉套接字通道之前它會一直保持已連接狀態。可通過調用套接字通道的 isConnected 方法來確定套接字通道是否已連接。
套接字通道支持非阻塞連接:可創建一個套接字通道,並且通過 connect 方法可以發起到遠程套接字的連接,之後通過 finishConnect 方法完成該連接。可通過調用 isConnectionPending 方法來確定是否正在進行連接操作。
可單獨地關閉 套接字通道的輸入端和輸出端,而無需實際關閉該通道。調用關聯套接字對象的 shutdownInput 方法來關閉某個通道的輸入端將導致該通道上的後續讀取操作返回 -1(指示流的末尾)。調用關聯套接字對象的 shutdownOutput 方法來關閉通道的輸出端將導致該通道上的後續寫入操作拋出 ClosedChannelException。
套接字通道支持異步關閉,這與 Channel 類中所指定的異步 close 操作類似。如果一個線程關閉了某個套接字的輸入端,而同時另一個線程被阻塞在該套接字通道上的讀取操作中,那麼處於阻塞線程中的讀取操作將完成,而不讀取任何字節且返回 -1。I如果一個線程關閉了某個套接字的輸出端,而同時另一個線程被阻塞在該套接字通道上的寫入操作中,那麼阻塞線程將收到 AsynchronousCloseException。
多個並發線程可安全地使用套接字通道。盡管在任意給定時刻最多只能有一個線程進行讀取和寫入操作,但數據報通道支持並發的讀寫。connect 和 finishConnect 方法是相互同步的,如果正在調用其中某個方法的同時試圖發起讀取或寫入操作,則在該調用完成之前該操作被阻塞。

3、實例代碼演示

連接核心代碼:
Selector mSelector = null;
ByteBuffer sendBuffer = null;
SocketChannel client = null;
InetSocketAddress isa = null;
SocketEventListener mSocketEventListener = null;
private boolean Connect(String site, int port)
{
        if (mSocketEventListener != null)
        {
                mSocketEventListener.OnSocketPause();
        }
        boolean ret = false;
        try
        {
                mSelector = Selector.open();
                client = SocketChannel.open();
                client.socket().setSoTimeout(5000);
                isa = new InetSocketAddress(site, port);
                boolean isconnect = client.connect(isa);
                // 將客戶端設定為異步
                client.configureBlocking(false);
                // 在輪訊對象中注冊此客戶端的讀取事件(就是當服務器向此客戶端發送數據的時候)
                client.register(mSelector, SelectionKey.OP_READ);
                
                long waittimes = 0;

                if(!isconnect)
                {
                    while (!client.finishConnect())
                    {
                            EngineLog.redLog(TAG,  "等待非阻塞連接建立....");
                            Thread.sleep(50);
                            if(waittimes < 100)
                            {
                                    waittimes++;
                            }
                            else
                            {
                                    break;
                            }
                    }
                }
                Thread.sleep(500);
                haverepaired();
                startListener();
                ret = true;
        }
        catch (Exception e)
        {
                EngineLog.redLog(TAG + " - Connect error", e != null ? e.toString() : "null");
                try
                {
                        Thread.sleep(1000 * 10);
                }
                catch (Exception e1)
                {
                        EngineLog.redLog(TAG + " - Connect error", e1 != null ? e1.toString() : "null");
                }
                ret = false;
        }
        return ret;
}
在上述代碼中,我們可以看到有一個SocketEventListener監聽接口,這個接口用於監聽socket事件,將其回調給調用者 SocketEventListener接口:
public interface SocketEventListener
        {
                /**
                 * Socket正在接收數據
                 * */
                public void OnStreamRecive();
                /**
                 * Socket接收數據完成
                 * */
                public void OnStreamReciveFinish();
                /**
                 * Socket有新的消息返回
                 * */
                public void OnStreamComing(byte[] aStreamData);
                /**
                 * Socket出現異常
                 * */
                public void OnSocketPause();
                /**
                 * Socket已修復,可用
                 * */
                public void OnSocketAvaliable();
        }
監聽接口的使用:
rivate void startListener()
        {
                if (mReadThread == null || mReadThread.isInterrupted())
                {
                        mReadThread = null;
                        mReadThread = new Thread()
                        {
                                @Override
                                public void run()
                                {
                                        while (!this.isInterrupted() && mRunRead)
                                        {
                                                MyLineLog.redLog(TAG,"startListener:" + mSendMsgTime);
                                                try
                                                {
                                                     // 如果客戶端連接沒有打開就退出循環
                                                        if (!client.isOpen())
                                                                break;
                                                        // 此方法為查詢是否有事件發生如果沒有就阻塞,有的話返回事件數量
                                                        int eventcount = mSelector.select();
                                                        // 如果沒有事件返回循環
                                                        if (eventcount > 0)
                                                        {
                                                        	starttime = CommonClass.getCurrentTime();
                                                                // 遍例所有的事件
                                                                for (SelectionKey key : mSelector.selectedKeys())
                                                                {
                                                                        // 刪除本次事件
                                                                        mSelector.selectedKeys().remove(key);
                                                                        // 如果本事件的類型為read時,表示服務器向本客戶端發送了數據
                                                                        if (key.isValid() && key.isReadable())
                                                                        {
                                                                                if (mSocketEventListener != null)
                                                                                {
                                                                                        mSocketEventListener.OnStreamRecive();
                                                                                }
                                                                                boolean readresult = ReceiveDataBuffer((SocketChannel) key.channel());

                                                                                if (mSocketEventListener != null)
                                                                                {
                                                                                        mSocketEventListener.OnStreamReciveFinish();
                                                                                }
                                                                                
                                                                                if(readresult)
                                                                                {
                                                                                        key.interestOps(SelectionKey.OP_READ);
                                                                                        sleep(200);
                                                                                }
                                                                                else
                                                                                {
                                                                                        throw new Exception();
                                                                                }
                                                                        }
                                                                        key = null;
                                                                }
                                                                mSelector.selectedKeys().clear();
                                                        }
                                                }
                                                catch (Exception e)
                                                {
                                                        mRunRead = false;
                                                        mReadThread = null;
                                                        if(e instanceof InterruptedException)
                                                        {
                                                                MyLineLog.redLog(TAG, "startListener:" + e.toString());
                                                        }
                                                        else
                                                        {
                                                                break;
                                                        }
                                                }
                                        }
                                }
                        };
                        mReadThread.setName(TAG + " Listener, " + CommonClass.getCurrentTime());
                        mRunRead = true;
                        mReadThread.start();
                }
        }

連接完之後就是發送數據和接收數據,下面是發送數據的核心代碼:
public boolean SendSocketMsg(byte[] aMessage) throws IOException
        {
                boolean ret = false;
                try
                {
                        sendBuffer.clear();
                        sendBuffer = ByteBuffer.wrap(aMessage);
                        int sendsize = client.write(sendBuffer);
                        sendBuffer.flip();
                        sendBuffer.clear();
                        mSendMsgTime = CommonClass.getCurrentTime();
                        MyLineLog.redLog(TAG, "SendSocketMsg:" + mSendMsgTime + ", sendsize:" + sendsize);
                        ret = true;
                }
                catch (Exception e)
                {
                        MyLineLog.redLog(TAG,  "發送數據失敗。");

                        if (mSocketEventListener != null)
                        {
                                mSocketEventListener.OnSocketPause();
                        }
//                        crash();
                }
                return ret;
        }

因為實際工作需要,我們需要經常會碰到兩個問題,無效數據和大數據,如何去解決這個問題呢,無效數據用過濾,大數據用分塊接收,下面是接收數據的方法:
private boolean ReceiveDataBuffer(SocketChannel aSocketChannel)
        {
//              n 有數據的時候返回讀取到的字節數。
//              0 沒有數據並且沒有達到流的末端時返回0。
//              -1 當達到流末端的時候返回-1。
                boolean ret = false;
                
                ByteArrayBuffer bab = new ByteArrayBuffer(8*1024);
                while(true)
                {
                        try
                        {
                                ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 1);
                                readBuffer.clear();
                                int readsize = aSocketChannel.read(readBuffer);
                                
                                if(readsize > 0)
                                {
                                    MyLineLog.redLog(TAG, "aSocketChannel.read=>" + readsize);
                                        byte[] readbytes = readBuffer.array();
                                        bab.append(readbytes, 0, readsize);
                                        readBuffer.clear();
                                        readBuffer.flip();
                                        ret = true;
                                }
                                else if(readsize == 0)
                                {
                                        int buffersize = bab.length();                                        
                                        byte[] readdata = bab.buffer();
                                        int readdataoffset = 0;
                                        boolean parsedata = true;
                                        
                                        while(readdataoffset < buffersize && parsedata)
                                        {
                                                byte datatype = readdata[readdataoffset];
                                                if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA)
                                                {
                                                        byte[] blockdata = new byte[] { datatype };
                                                        ReceiveData(blockdata);
                                                        readdataoffset += 1;
                                                        blockdata = null;            
                                                }
                                                else
                                                {
                                                        byte[] blocklength = new byte[4];
                                                        System.arraycopy(readdata, readdataoffset + 5, blocklength, 0, 4);
                                                        int blocksize = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(blocklength));
                                                        blocklength = null;
                                                        
                                                        int blockdatasize = 5 + blocksize + 4;
                                                        
                                                        if(blockdatasize <= buffersize)
                                                        {
                                                                MyLineLog.redLog(TAG, "塊數據大小:" + blockdatasize);
                                                                byte[] blockdata = new byte[blockdatasize];
                                                                System.arraycopy(readdata, readdataoffset, blockdata, 0, blockdatasize);

                                                                long starttime = CommonClass.getCurrentTime();
                                                                ReceiveData(blockdata);
                                                                long endtime = CommonClass.getCurrentTime();
                                                                MyLineLog.redLog(TAG, "解析數據用時:" + (endtime - starttime) + "ms");
                                                                readdataoffset += blockdatasize;
                                                                blockdata = null;
                                                        }
                                                        else if(blockdatasize < 10240)
                                                        {//小於10k,則屬於正常包
                                                                MyLineLog.redLog(TAG, "塊數據大小:" + blockdatasize + ",小於10k,說明數據不完整,繼續獲取。");
                                                                //將未解析數據存到臨時buffer
                                                                int IncompleteSize = buffersize - readdataoffset;
                                                                if(IncompleteSize > 0)
                                                                {
                                                                        byte[] Incompletedata = new byte[IncompleteSize];
                                                                        System.arraycopy(readdata, readdataoffset, Incompletedata, 0, IncompleteSize);
                                                                        bab.clear();
                                                                        bab.append(Incompletedata, 0, IncompleteSize);
                                                                        parsedata = false;
                                                                        Incompletedata = null;
                                                                }
                                                        }
                                                        else
                                                        {//異常包
                                                                MyLineLog.yellowLog(TAG, "塊數據錯誤大小:" + blockdatasize);
                                                                MyLineLog.redLog(TAG,"blockdatasize error:" + blockdatasize);
                                                                ret = true;
                                                                break;
                                                        }
                                                }
                                        }  
                                        
                                        if(parsedata)
                                        {
                                                ret = true;
                                                break;
                                        }
                                }
                                else if(readsize == -1)
                                {
                                        ret = false;
                                        break;
                                }
                                else
                                {
                                        ret = true;
                                        break;
                                }
                        }
                        catch (IOException e)
                        {
                            MyLineLog.redLog(TAG, "aSocketChannel IOException=>" + e.toString());
                                ret = false;
                                break;
                        }
                }
                bab.clear();
                bab = null;
                return ret;
        }

如果數據量過大的話,還會使用壓縮方法進行傳輸,那應該如何接收呢,下面是一段接收壓縮數據的方法:
private void ReceiveData(byte[] aDataBlock)
        {
                try
                {
                        MyLineLog.redLog(TAG, "ReceiveData:" + mSendMsgTime);
                        if (mSendMsgTime != 0)
                        {
                                mSendMsgTime = 0;
                        }
                        
                        byte[] ret = null;
                        
                        int offset = 0;

                        byte datatype = aDataBlock[offset];
                        offset += 1;

                        if (datatype != -1)
                        {
                                if (datatype == PushUtils.PACKAGETYPE_HEARTBEAT)
                                {
                                        ret = new byte[] { datatype };
                                }
                                else if (datatype == PushUtils.PACKAGETYPE_HEARTBEAR_NODATA)
                                {
                                        ret = new byte[] { datatype };
                                }
                                else if (datatype == PushUtils.PACKAGETYPE_NORMAL || datatype == PushUtils.PACKAGETYPE_HEARTBEAR_HAVEDATA)
                                {
                                        byte[] databytelength = new byte[4];
                                        System.arraycopy(aDataBlock, offset, databytelength, 0, 4);
                                        offset += 4;
                                        int header = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(databytelength));
                                        databytelength = null;

                                        if (header == PushUtils.PACKAGEHEADER)
                                        {
                                                byte[] datalengthbyte = new byte[4];
                                                System.arraycopy(aDataBlock, offset, datalengthbyte, 0, 4);
                                                offset += 4;

                                                int datalength = CommonClass.bytes2int(CommonClass.LitteEndian_BigEndian(datalengthbyte));
                                                datalengthbyte = null;

                                                if (datalength > 4)
                                                {
                                                        // compressed bit 暫時不壓縮
                                                        byte compressed = aDataBlock[offset];
                                                        offset += 1;

                                                        if (compressed == 1)
                                                        {//解壓縮
                                                                //跳過頭4個字節,此處用於解壓縮後的數據大小,暫時不需要
                                                                offset += 4;
                                                                int contentlength = datalength - 1 - 4;
                                                                byte[] datacontentbyte = new byte[contentlength];
                                                                System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
                                                                offset += contentlength;                                                               

                                                                byte[] compressdata = new byte[contentlength - 4];
                                                                System.arraycopy(datacontentbyte, 0, compressdata, 0, contentlength - 4);

                                                                long starttime = CommonClass.getCurrentTime();
                                                                byte[] decompressdatacontentbyte = CommonClass.decompress(compressdata);
                                                                long endtime = CommonClass.getCurrentTime();
                                                                MyLineLog.redLog(TAG, "解壓縮數據用時:" + (endtime - starttime) + "ms");
                                                                int decompressdatacontentbytelength = decompressdatacontentbyte.length;
                                                                compressdata = null;
                                                                int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);

                                                                if (footer == PushUtils.PACKAGEFOOTER)
                                                                {
                                                                        ret = new byte[decompressdatacontentbytelength + 1];
                                                                        ret[0] = datatype;
                                                                        System.arraycopy(decompressdatacontentbyte, 0, ret, 1, decompressdatacontentbytelength);
                                                                        datacontentbyte = null;
                                                                        decompressdatacontentbyte = null;
                                                                }
                                                        }
                                                        else
                                                        {//數據未壓縮
                                                                int contentlength = datalength - 1;
                                                                byte[] datacontentbyte = new byte[contentlength];
                                                                System.arraycopy(aDataBlock, offset, datacontentbyte, 0, contentlength);
                                                                offset += contentlength;

                                                                int footer = PushUtils.getInt(datacontentbyte, contentlength - 4);

                                                                if (footer == PushUtils.PACKAGEFOOTER)
                                                                {
                                                                        ret = new byte[contentlength + 1 - 4];
                                                                        ret[0] = datatype;
                                                                        System.arraycopy(datacontentbyte, 0, ret, 1, contentlength - 4);
                                                                        datacontentbyte = null;
                                                                }                                                                
                                                        }
                                                }
                                        }
                                }

                                if (mSocketEventListener != null)
                                {
                                        mSocketEventListener.OnStreamComing(ret);
                                }
                        } 
                }
                catch (Exception e)
                {
                        MyLineLog.redLog(TAG + " - ReceiveData error", e.toString());
                }
        }

在介紹SocketChannel的時候,api提到關閉需要注意事項,下面一段關閉SocketChannel的示例代碼:
public void closeSocket()
        {
                mRunRead = false;
                if (mReadThread != null)
                {
                        if (!mReadThread.isInterrupted())
                        {
                                mReadThread.interrupt();
                                mReadThread = null;
                        }
                }

                if (mSelector != null && mSelector.isOpen())
                {
                        try
                        {
                                mSelector.close();
                        }
                        catch (IOException e)
                        {
                                MyLineLog.redLog(TAG + " - closeSocket error", e.toString());
                        }
                        mSelector = null;
                }
                
                if (client != null)
                {
                        try
                        {
                                client.close();
                                client = null;
                        }
                        catch (IOException e)
                        {
                                MyLineLog.redLog(TAG + " - closeSocket2 error", e.toString());
                        }
                }

                System.gc();
        }

這篇文章講解部分大量參照JavaApi,其實很多問題的答案就在Api裡面,當你不知道如何去做的時候,回頭看一下Api,仔細思考一下,就能解決大部分問題。 Ps:感謝我大學捨友阿鋼為我提供的代碼





  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved