編輯:關於Android編程
在addNow方法執行結束之後,我們已經為當前NioProcessor裡面所有的NioSocketSession對應的SocketChannel注冊了OP_READ事件,接下來繼續查看Processor的run方法,源碼在上一篇中有,執行到第49行,判斷如果selected大於0執行第52行的process方法,selected的值其實就是Selector的select方法返回值,表示客戶端存在和服務端交互的請求,那麼我們看看process做了些什麼事:
AbstractPollingIoProcessor$process()
private void process() throws Exception { for (Iterator i = selectedSessions(); i.hasNext();) { S session = i.next(); process(session); i.remove(); } }可以發現他就是遍歷那些已經發生注冊事件的NioSocketSession集合,並且調用process(S session)方法:
AbstractPollingIoProcessor$process()
private void process(S session) { // Process Reads if (isReadable(session) && !session.isReadSuspended()) { read(session); } // Process writes if (isWritable(session) && !session.isWriteSuspended()) { // add the session to the queue, if it's not already there if (session.setScheduledForFlush(true)) { flushingSessions.add(session); } } }首先通過isReadable方法判斷當前NioSocketSession對應的SocketChannel中是否注冊過OP_READ事件,如果注冊過的話,執行read(session)方法;
AbstractPollingIoProcessor$read()
private void read(S session) { IoSessionConfig config = session.getConfig(); int bufferSize = config.getReadBufferSize(); IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); try { int readBytes = 0; int ret; try { if (hasFragmentation) { while ((ret = read(session, buf)) > 0) { readBytes += ret; if (!buf.hasRemaining()) { break; } } } else { ret = read(session, buf); if (ret > 0) { readBytes = ret; } } } finally { buf.flip(); } if (readBytes > 0) { IoFilterChain filterChain = session.getFilterChain(); filterChain.fireMessageReceived(buf); buf = null; if (hasFragmentation) { if (readBytes << 1 < config.getReadBufferSize()) { session.decreaseReadBufferSize(); } else if (readBytes == config.getReadBufferSize()) { session.increaseReadBufferSize(); } } } if (ret < 0) { // scheduleRemove(session); IoFilterChain filterChain = session.getFilterChain(); filterChain.fireInputClosed(); } } catch (Exception e) { if (e instanceof IOException) { if (!(e instanceof PortUnreachableException) || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { scheduleRemove(session); } } IoFilterChain filterChain = session.getFilterChain(); filterChain.fireExceptionCaught(e); } }
這部分代碼比較長,我們了解主干就可以了;
首先第4行創建了一個IoBuffer對象,其實這就是我們java NIO中的Buffer角色,接著看到調用了read(session,buf)方法,這個方法返回值大於0表示讀取數據成功,具體這個方法裡面執行了些什麼我們可以到NioProcessor裡面的read方法看看:
protected int read(NioSession session, IoBuffer buf) throws Exception { ByteChannel channel = session.getChannel(); return channel.read(buf.buf()); }其實很簡單了,就是將通道中的數據寫到我們的緩存中罷了,這就是NIO本身的用法;
如果我們讀取到了數據,就會執行第33行的if語句,在if語句塊中會執行IoFilterChain的fireMessageReceived方法,其實呢,IoFilterChain就是我們的責任鏈,前面分析源碼的過程中我們知道在創建NioSocketSession的時候會創建一個DefaultIoFilterChain出來,並且會在它裡面創建一個EntryImpl鏈,默認情況下會創建一個HeadFilter鏈頭和TailFilter鏈尾,那麼這裡的IoFilterChain其實就是對DefaultIoFilterChain進行轉換過來的,默認情況下也就值存在鏈頭和鏈尾了,我們在使用MINA的時候可以通過NioSocketAcceptor的getFilterChain獲得其對應的IoFilterChain,其實getFilterChain的真正實現是在AbstarctIoService裡面的,有了這個IoFilterChain之後,我們可以調用他的addLast方法為其添加我們自定義或者MINA自帶的Filter對象,addLast的真正實現是在DefaultIoFilterChain裡面的,我們可以看看:
public synchronized void addLast(String name, IoFilter filter) { checkAddable(name); register(tail.prevEntry, name, filter); }間接調用了register方法,來看看register
private void register(EntryImpl prevEntry, String name, IoFilter filter) { EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter); try { filter.onPreAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e); } prevEntry.nextEntry.prevEntry = newEntry; prevEntry.nextEntry = newEntry; name2entry.put(name, newEntry); try { filter.onPostAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { deregister0(newEntry); throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e); } }如果你鏈表操作很熟的話,會發現其實這裡進行的就是鏈表插入操作了,在第10行和11行可以體現出來,那麼我們這裡有個疑問了,鏈表操作的時候,我們只需要一個鏈頭就可以了,沒必要給鏈尾啊,這裡的鏈尾是干嘛的呀,我來告訴你答案吧,鏈尾其實就是用來鏈接我們的IoHandler對象的,IoHandler是我們整個責任鏈的結束部分,我們真正的業務邏輯的處理都是在它裡面完成的,所以你會發現在你使用MINA框架的時候,如果不給NioSocketAcceptor設置IoHandler的話是會報異常的,因為他是要進行業務邏輯處理的,沒有他你整個程序是沒法處理的,既然他是鏈接在鏈尾後面的,那麼我們就該看看TailFilter的實現了:
他是DefaultIoFilterChain的靜態內部類,代碼比較長,我就截取兩個方法,其他的方法類似啦:
private static class TailFilter extends IoFilterAdapter { @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { try { session.getHandler().sessionCreated(session); } finally { // Notify the related future. ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE); if (future != null) { future.setSession(session); } } } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { session.getHandler().sessionOpened(session); } }可以看到在TailFiler裡面執行的方法實際上都是執行的IoHandler中對應的方法啦,也就是這樣我們把Filter責任鏈和IoHandler聯系到了一起;
好了,扯得有點遠了,繼續回到我們的AbstractPollingIoProcessor裡面的read方法,第33行在我們獲取到數據之後首先會獲得我們的DefaultIoFilterChain責任鏈,並且調用fireMessageReceived方法,我們來看看fireMessageReceived方法:
這個方法位於DefaultIoFilterChain中
public void fireMessageReceived(Object message) { if (message instanceof IoBuffer) { session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis()); } callNextMessageReceived(head, session, message); }可以看到他調用的是callNextMessageReceived方法
private void callNextMessageReceived(Entry entry, IoSession session, Object message) { try { IoFilter filter = entry.getFilter(); NextFilter nextFilter = entry.getNextFilter(); filter.messageReceived(nextFilter, session, message); } catch (Exception e) { fireExceptionCaught(e); } catch (Error e) { fireExceptionCaught(e); throw e; } }在callNextMessageReceived方法中首先會獲得當前Filter對象,接著獲得當前Filter的nextFilter對象,接著調用filter的messageReceived方法,這個方法其實上執行的是DefaultIoFilterChain的messageReceived方法:
public void messageReceived(IoSession session, Object message) { Entry nextEntry = EntryImpl.this.nextEntry; callNextMessageReceived(nextEntry, session, message); }可以看到他還是執行的callNextMessageReceived方法,這樣層層遞歸的執行,直到Filter的鏈尾,那麼接下來就是執行IoHandler裡面對應的messageReceived方法進行具體的業務邏輯操作喽!這樣的話,整個read過程中涉及到的關鍵部分就結束啦!
接下來分析下write過程,如果我們想要給服務端發送消息內容的話,首先我們需要獲取到IoSession對象,這裡我們以NioSocketSession為例,發送消息調用的將是他的write方法,查看NioSocketSession發現他裡面沒有write方法,到他的父類NioSession查看也不存在,最後在AbstractIoSession找到啦;
AbstractIoSession$write()
public WriteFuture write(Object message) { return write(message, null); }也就是說他執行的是兩個參數的write方法,
public WriteFuture write(Object message, SocketAddress remoteAddress) { if (message == null) { throw new IllegalArgumentException("Trying to write a null message : not allowed"); } // We can't send a message to a connected session if we don't have // the remote address if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) { throw new UnsupportedOperationException(); } // If the session has been closed or is closing, we can't either // send a message to the remote side. We generate a future // containing an exception. if (isClosing() || !isConnected()) { WriteFuture future = new DefaultWriteFuture(this); WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress); WriteException writeException = new WriteToClosedSessionException(request); future.setException(writeException); return future; } FileChannel openedFileChannel = null; // TODO: remove this code as soon as we use InputStream // instead of Object for the message. try { if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) { // Nothing to write : probably an error in the user code throw new IllegalArgumentException("message is empty. Forgot to call flip()?"); } else if (message instanceof FileChannel) { FileChannel fileChannel = (FileChannel) message; message = new DefaultFileRegion(fileChannel, 0, fileChannel.size()); } else if (message instanceof File) { File file = (File) message; openedFileChannel = new FileInputStream(file).getChannel(); message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size()); } } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); return DefaultWriteFuture.newNotWrittenFuture(this, e); } // Now, we can write the message. First, create a future WriteFuture writeFuture = new DefaultWriteFuture(this); WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress); // Then, get the chain and inject the WriteRequest into it IoFilterChain filterChain = getFilterChain(); filterChain.fireFilterWrite(writeRequest); // TODO : This is not our business ! The caller has created a // FileChannel, // he has to close it ! if (openedFileChannel != null) { // If we opened a FileChannel, it needs to be closed when the write // has completed final FileChannel finalChannel = openedFileChannel; writeFuture.addListener(new IoFutureListener這部分源碼比較長,我們挑重點看,在第45行創建了一個WriteFuture對象,接著把write的消息內容以及WriteFuture對象對象作為參數封裝出來一個WriteRequest對象,第49行獲得了我們的責任鏈,和read過程一樣,我們也可以通過DefaultIoFilterChain的addLast方法添加自己創建的Filter對象,接著第50行調用DefaultIoFilterChain的fireFilterWrite方法() { public void operationComplete(WriteFuture future) { try { finalChannel.close(); } catch (IOException e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } }); } // Return the WriteFuture. return writeFuture; }
public void fireFilterWrite(WriteRequest writeRequest) { callPreviousFilterWrite(tail, session, writeRequest); }可以看到這個方法執行的是callPreviousFilterWrite方法
private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) { try { IoFilter filter = entry.getFilter(); NextFilter nextFilter = entry.getNextFilter(); filter.filterWrite(nextFilter, session, writeRequest); } catch (Exception e) { writeRequest.getFuture().setException(e); fireExceptionCaught(e); } catch (Error e) { writeRequest.getFuture().setException(e); fireExceptionCaught(e); throw e; } }和之前的read方法中責任鏈的執行過程一樣,也是首先獲取filter對象,同時獲取該filter對象的下一個nextFilter對象,調用他的filterWrite方法
@Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { nextFilter.filterWrite(session, writeRequest); }它裡面調用的是filterWrite方法,這個方法裡面會繼續調用filterWrite方法,這樣層層遞歸,直到到達責任鏈的鏈頭,也就是HeadFilter為止,調用HeadFilter的filterWrite方法,HeadFilter是DefaultIoFilterChain的靜態內部類:
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { AbstractIoSession s = (AbstractIoSession) session; // Maintain counters. if (writeRequest.getMessage() instanceof IoBuffer) { IoBuffer buffer = (IoBuffer) writeRequest.getMessage(); // I/O processor implementation will call buffer.reset() // it after the write operation is finished, because // the buffer will be specified with messageSent event. buffer.mark(); int remaining = buffer.remaining(); if (remaining > 0) { s.increaseScheduledWriteBytes(remaining); } } else { s.increaseScheduledWriteMessages(); } WriteRequestQueue writeRequestQueue = s.getWriteRequestQueue(); if (!s.isWriteSuspended()) { if (writeRequestQueue.isEmpty(session)) { // We can write directly the message s.getProcessor().write(s, writeRequest); } else { s.getWriteRequestQueue().offer(s, writeRequest); s.getProcessor().flush(s); } } else { s.getWriteRequestQueue().offer(s, writeRequest); } }在HeadFilter的filterWrite方法裡面,你會看到有這麼一句代碼s.getProcessor(),他其實上就是獲得處理我們當前NioSocketSession的NioProcessor對象而已,那麼接下來的write操作是包含兩個參數的,我們的NioProcessor裡面並沒有實現這個方法,需要到他的父類AbstractPollingIoProcessor查看,代碼如下:
public void write(S session, WriteRequest writeRequest) { WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue(); writeRequestQueue.offer(session, writeRequest); if (!session.isWriteSuspended()) { this.flush(session); } }做的事還是比較少的,就是將當前的寫請求加入到我們當前NioSocketSession的寫請求隊列中,同時通過AbstractPollingIoProcessor的flush方法將NioSocketSession放入到flushingSessions隊列中,這個隊列主要存儲的是那些將要被flush的IoSession集合;我們來看看flush方法
AbstractPollingIoProcessor$flush
public final void flush(S session) { // add the session to the queue if it's not already // in the queue, then wake up the select() if (session.setScheduledForFlush(true)) { flushingSessions.add(session); wakeup(); } }第5行執行了將當前NioSocketSession加入到flushingSession的操作,隨後調用了wakeup方法,wakeup方法會喚醒我們阻塞的select方法,這樣的話,我們的服務端就可以收到客戶端發送的消息了,接著讀取過程就和上面的源碼講解一樣了;
至此,MINA中主要的服務端源碼分析結束了,注意我們只分析了NioSocketAcceptor部分的源碼,沒有涉及NioSocketConnector部分,其實NioSocketConnector部分的源碼和NioSocketAcceptor部分基本上是類似的分析過程,在這裡我就不細細分析了,下一篇我會對MINA框架做一個小結,包括它裡面涉及到的一些線程模型結構;
1)首先先將gitlab上的開發項目clone到本地(可以使用命令行或者管理工具,具體操作在GitLab中已經涉及,這裡不再贅述),然後導入到AndroidStudio中
現在微信理財通相對於余額寶收益高一些,即使理財通的收益較高,但很多網友都在擔心微信理財通是否安全的問題,是否能保障用戶的錢不怕被盜。我們一起來探究下吧!微信
在android-sdk\tools目錄下,有一個名為emulator.exe的可執行程序,望名知義,emulator即為仿真器或模擬器,但很多人可能會發現,通過AVD
概述認識MVP模式MVP 模式實際上指的是 Model-View-Presenter 主要的目的是為了劃分各個模塊的負責區域,分工明確,使代碼清晰了很多。也是為了減少 A