編輯:關於Android編程
上一篇我們通過實例學習了MINA框架的用法,發現用起來還是挺方便的,就那麼幾步就可以了,本著學東西必知其原理的觀念,決定看看MINA的源碼實現,好了,我們開始吧!
MINA源碼對於客戶端和服務端來說基本上差別不是很大的,所以我計劃主要還是分析服務端的源碼,在正式分析之前,我們需要對MINA有一個整體的了解;
MINA中涉及到了這麼幾個對象:
IoService:用於提供連接,他是IoAcceptor和IoConnector的父接口;
IoBuffer:消息緩存區;
IoSession:在每一次連接建立成功之後都會創建一個IoSession對象出來,並且在創建該對象的時候創建一個IoFilter對象出來,通過IoSession的session id來為當前IoSession設置處理他的IoProcessor;
IoProcessor:用於檢查是否有數據在通道上面進行讀寫,在我們創建Acceptor或者Connector的時候,默認會創建一個線程池,裡面存儲的就是IoProcessor線程,該線程裡面是擁有自己的Selector的,這個是MINA為我們做的一點優化,我們通常使用NIO的話是只有一個Selector的,而MINA中的
IoFilter:用於定義攔截器,這些攔截器可以包括日志輸出、數據編解碼等等,只要用於二進制數據和對象之間的轉換;
IoHandler:處於IoFilter的尾部,用於真正的業務邏輯處理,所以我們在使用MINA的時候是必須要提供IoHandler對象的,因為是靠他來進行真正業務處理的;
接下來我們看看上篇博客中我們用到的MINA中涉及到的這幾個對象的類結構圖:
NioSocketAcceptor類結構圖:
NioSocketConnector類結構圖:
NioSocketSession類結構圖:
NioProcessor類結構圖:
好了,開始我們真正的源碼分析了(服務端);
首先我們通過NioSocketAcceptor acceptor = new NioSocketAcceptor();創建了一個NioSocketAcceptor對象出來,那我們就得看看NioSocketAcceptor的構造函數裡面做了些什麼事了;
NioSocketAcceptor$NioSocketAcceptor()
public NioSocketAcceptor() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); }可以看到首先調用了父類的構造函數,也就是AbstractPollingIoAcceptor的構造函數,並且傳入了NioProcessor的Class對象,這裡我們可以想象一下後面肯定會用這個NioProcessor的Class對象進行一些與反射有關的操作;
AbstractPollingIoAcceptor$AbstractPollingIoAcceptor()
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool(processorClass), true, null); }可以看到實際上調用的是5個參數的構造函數,在看這個構造函數之前,我們看到第三個參數利用我們從NioSocketAcceptor構造函數中傳進來的NioProcessor對象,創建了一個SimpleIoProcessorPool對象,我們來看看SimpleIoProcessorPool的構造函數;
public SimpleIoProcessorPool(Class> processorType) { this(processorType, null, DEFAULT_SIZE, null); }發現他接著調用的是SimpleIoProcessorPool四個參數的構造函數,並且添加了一個DEFAULT_SIZE參數,這個值的大小等於我們CPU的核數+1,這也是我們在創建NioSocketAcceptor的時候默認創建的NioProcessor的線程個數,來看看SimpleIoProcessorPool四個參數的構造函數:
public SimpleIoProcessorPool(Class> processorType, Executor executor, int size, SelectorProvider selectorProvider) { if (processorType == null) { throw new IllegalArgumentException("processorType"); } if (size <= 0) { throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); } // Create the executor if none is provided createdExecutor = (executor == null); if (createdExecutor) { this.executor = Executors.newCachedThreadPool(); // Set a default reject handler ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } else { this.executor = executor; } pool = new IoProcessor[size]; boolean success = false; Constructor> processorConstructor = null; boolean usesExecutorArg = true; try { // We create at least one processor try { try { processorConstructor = processorType.getConstructor(ExecutorService.class); pool[0] = processorConstructor.newInstance(this.executor); } catch (NoSuchMethodException e1) { // To the next step... try { if(selectorProvider==null) { processorConstructor = processorType.getConstructor(Executor.class); pool[0] = processorConstructor.newInstance(this.executor); } else { processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class); pool[0] = processorConstructor.newInstance(this.executor,selectorProvider); } } catch (NoSuchMethodException e2) { // To the next step... try { processorConstructor = processorType.getConstructor(); usesExecutorArg = false; pool[0] = processorConstructor.newInstance(); } catch (NoSuchMethodException e3) { // To the next step... } } } } catch (RuntimeException re) { LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage()); throw re; } catch (Exception e) { String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage(); LOGGER.error(msg, e); throw new RuntimeIoException(msg, e); } if (processorConstructor == null) { // Raise an exception if no proper constructor is found. String msg = String.valueOf(processorType) + " must have a public constructor with one " + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one " + Executor.class.getSimpleName() + " parameter or a public default constructor."; LOGGER.error(msg); throw new IllegalArgumentException(msg); } // Constructor found now use it for all subsequent instantiations for (int i = 1; i < pool.length; i++) { try { if (usesExecutorArg) { if(selectorProvider==null) { pool[i] = processorConstructor.newInstance(this.executor); } else { pool[i] = processorConstructor.newInstance(this.executor, selectorProvider); } } else { pool[i] = processorConstructor.newInstance(); } } catch (Exception e) { // Won't happen because it has been done previously } } success = true; } finally { if (!success) { dispose(); } } }
這段代碼相對來說比較長,可以看到在第14行判斷傳入SimpleIoProcessorPool的executor是否為null,為null的話執行第15行,創建一個CachedThreadPool類型的線程池,隨後在第32行通過反射獲取到processorType參數為ExecutorService的構造函數,我們這裡的processType實際上就是NioProcessor,隨後33行通過反射創建一個NioProcessor對象出來,調用的是他的下面這個構造函數:
public NioProcessor(Executor executor) { super(executor); try { // Open a new selector selector = Selector.open(); } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector.", e); } }可以注意到的是在SimpleIoProcessorPool裡面有兩種通過反射創建NioProcessor對象的方式,就是我們上面代碼的第78和80這兩種方式,兩者的區別在於如果我們在創建SimpleIoProcessorPool的時候傳入了SelectorProvider對象,那麼NioProcessor裡面的Selector將直接調用SelectorProvider的openSelector來獲得,而如果沒有傳入SelectorProvider對象的話,NioProcessor裡面的Selector將通過Selector.open方法獲得;
到此,我們創建出來了CPU個數+1個NioProcessor,每個NioProcessor裡面都會有一個Selector對象;
我們回到AbstractPollingIoAcceptor的構造函數
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor processor, boolean createdProcessor, SelectorProvider selectorProvider) { super(sessionConfig, executor); if (processor == null) { throw new IllegalArgumentException("processor"); } this.processor = processor; this.createdProcessor = createdProcessor; try { // Initialize the selector init(selectorProvider); // The selector is now ready, we can switch the // flag to true so that incoming connection can be accepted selectable = true; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to initialize.", e); } finally { if (!selectable) { try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } }首先執行了super構造函數,這個構造函數實際上執行的是AbstractIoService的構造函數;
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { if (sessionConfig == null) { throw new IllegalArgumentException("sessionConfig"); } if (getTransportMetadata() == null) { throw new IllegalArgumentException("TransportMetadata"); } if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " + getTransportMetadata().getSessionConfigType() + ")"); } // Create the listeners, and add a first listener : a activation listener // for this service, which will give information on the service state. listeners = new IoServiceListenerSupport(this); listeners.add(serviceActivationListener); // Stores the given session configuration this.sessionConfig = sessionConfig; // Make JVM load the exception monitor before some transports // change the thread context class loader. ExceptionMonitor.getInstance(); if (executor == null) { this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); }
這個構造函數會判斷我們的executor是否為null,為null的話會創建一個CachedThreadPool出來,這裡我們傳入給AbstractPollingIoAcceptor的參數值為null,因此會創建一個Executor出來;
可以看到第14行執行了init方法,傳入的參數是SelectorProvider類型對象,這個方法在AbstractPollingIoAcceptor裡面並沒有實現,因此查看AbstractPollingIoAcceptor的子類NioSocketAcceptor的init(SelectorProvider)方法
protected void init(SelectorProvider selectorProvider) throws Exception { this.selectorProvider = selectorProvider; if (selectorProvider == null) { selector = Selector.open(); } else { selector = selectorProvider.openSelector(); } }
這個方法所做的事還是比較簡單的,就是創建根據SelectorProvider是否為空創建Selector對象而已,注意這個Selector對象是屬於NioSocketAcceptor的;
在init執行結束之後,AbstractPollingIoAcceptor構造函數第18行會將selectable設置為true,表示我們NioSocketAcceptor裡面的Selector對象已經創建結束了,我們可以處理隨後客戶端到來的連接請求了;
至此,NioSocketAcceptor的構造方法執行結束了,在這個構造方法中為我們創建出了CPU個數+1個NioProcess對象,每個對象裡面都包含一個Selector對象,同時也為NioSocketAcceptor創建了一個Selector對象,同時從上面可以發現我們的NioSocketAcceptor和SimpleIoProcessorPool裡的線程池可以是同一個也可以不是同一個,具體就在你創建NioSocketAcceptor和SimpleIoProcessorPool是否傳入同一個Executor就可以啦;
有了NioSocketAcceptor對象之後,我們通過有了NioSocketAcceptor的bind方法將他和某一個端口綁定起來,因此查看NioSocketAcceptor的bind方法,你會發現根本不存在,那麼根據前面NioSocketAcceptor的類結構圖,去他的父類AbstractPollingIoAcceptor查找,還是沒有,那只能繼續向上找,找到AbstractIoAcceptor裡面,終於找到了;
AbstractIoAcceptor$bind()
public final void bind(Iterable localAddresses) throws IOException { if (isDisposing()) { throw new IllegalStateException("The Accpetor disposed is being disposed."); } if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } List不管你調用的是哪個bind方法,最後執行的都是這個bind方法,在這個方法中首先會進行迭代,將所有需要綁定的地址存儲到localAddressesCopy裡面,隨後在第34行調用bindInternal方法進行綁定,這個方法在AbstractIoAcceptor裡面是沒有實現的,需要到他的子類AbstractPollingIoAcceptor查看,這個類中實現了該方法:localAddressesCopy = new ArrayList (); for (SocketAddress a : localAddresses) { checkAddressType(a); localAddressesCopy.add(a); } if (localAddressesCopy.isEmpty()) { throw new IllegalArgumentException("localAddresses is empty."); } boolean activate = false; synchronized (bindLock) { synchronized (boundAddresses) { if (boundAddresses.isEmpty()) { activate = true; } } if (getHandler() == null) { throw new IllegalStateException("handler is not set."); } try { Set addresses = bindInternal(localAddressesCopy); synchronized (boundAddresses) { boundAddresses.addAll(addresses); } } catch (IOException e) { throw e; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e); } } if (activate) { getListeners().fireServiceActivated(); } }
AbstractPollingIoAcceptor$bindInternal
protected final SetbindInternal(List localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. try { lock.acquire(); // Wait a bit to give a chance to the Acceptor thread to do the select() Thread.sleep(10); wakeup(); } finally { lock.release(); } // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set newLocalAddresses = new HashSet (); for (H handle : boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; }
首先創建了一個AcceptorOperationFuture類型的對象,當NioSocketAcceptor裡面的Selector已經處理了該注冊請求後,就會給AcceptorOperationFuture對象發送一個信號,至於什麼地方會發送信號後面會講到,接著會將創建的AcceptorOperationFuture對象添加到registerQueue中,他是一個AcceptorOperationFuture類型的隊列,保存著我們所有注冊到NioSocketAcceptor上面的服務端address組成的AcceptorOperationFuture,也就是說上面的requestQueue實際上存儲的是服務端需要注冊到NioSocketAcceptor裡面的Selector的集合;接著第12行執行了startupAcceptor方法,我們來看看這個方法做了些什麼;
private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }這個方法關鍵是第10行或者14行,創建一個Acceptor類型的對象,Acceptor實現了Runnable接口,並且在第17行執行executeWorker方法,這個方法在AbstractPollingIoAcceptor中並沒有實現,具體實現是在他的間接父類AbstractIoService中的,我們查看AbstractIoService中的executeWorker方法:
AbstractIoService$executeWorker
protected final void executeWorker(Runnable worker) { executeWorker(worker, null); } protected final void executeWorker(Runnable worker, String suffix) { String actualThreadName = threadName; if (suffix != null) { actualThreadName = actualThreadName + '-' + suffix; } executor.execute(new NamePreservingRunnable(worker, actualThreadName)); }可以看到實際上是首先將我們上面創建的Acceptor對象放到線程池executor裡面,這裡的executor線程池是我們在創建NioSocketAcceptor的時候創建的,他是CachedThreadPool類型的,隨後執行exexcute方法,將該線程池運行起來,那麼緊接著執行的就該是Acceptor的run方法了;
AbstractPollingIoAcceptor$Acceptor$run()
public void run() { assert (acceptorRef.get() == this); int nHandles = 0; // Release the lock lock.release(); while (selectable) { try { // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up int selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on nHandles += registerHandles(); // Now, if the number of registred handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert (acceptorRef.get() != this); break; } if (!acceptorRef.compareAndSet(null, this)) { assert (acceptorRef.get() != this); break; } assert (acceptorRef.get() == this); } if (selected > 0) { // We have some connection request, let's process // them here. processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } }可以看到第9行首先判斷selectable的值是true還是false,這個值是在什麼時候賦值的呢?就是在AbstractPollingIoAcceptor的構造函數裡面了,只要我們在NioSocketAcceptor裡面創建了Selector對象之後就會將selectable的值設置為true,那麼我們這裡run方法裡面的while循環將是死循環了,一直等待客戶端的連接請求;第15行的select方法將處於阻塞狀態,它實際上調用的就是我們Selector的select方法,一直等待著客戶端的接入,在有客戶端連接或者Selector被明確喚醒的情況下就會返回,返回結果大於0表示有客戶端連接接入;接著執行第20行的registerHandles方法
AbstractPollingIoAcceptor$registerHandles
private int registerHandles() { for (;;) { // The register queue contains the list of services to manage // in this acceptor. AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } // We create a temporary map to store the bound handles, // as we may have to remove them all if there is an exception // during the sockets opening. MapregisterHandles方法主要用於創建ServerSocketChannel,為通道創建ServerSocket並且為其綁定端口號,創建接收緩存區,並且為Selector注冊OP_ACCEPT事件;newHandles = new ConcurrentHashMap (); List localAddresses = future.getLocalAddresses(); try { // Process all the addresses for (SocketAddress a : localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } // Everything went ok, we can now update the map storing // all the bound sockets. boundHandles.putAll(newHandles); // and notify. future.setDone(); return newHandles.size(); } catch (Exception e) { // We store the exception in the future future.setException(e); } finally { // Roll back if failed to bind all addresses. if (future.getException() != null) { for (H handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } // TODO : add some comment : what is the wakeup() waking up ? wakeup(); } } } }
首先第5行從我們的registerQueue服務端請求注冊隊隊列中取出隊首元素,第14行創建了一個臨時的Map來存儲我們已經綁定的請求地址對應的SocketAddress,為什麼要這個臨時的Map呢?原因就在於如果我們在Socket開啟的狀態下發生異常的話,我們需要移出掉這些已經綁定的請求地址,有點類似於數據庫中的事務操作,如果有一個失敗,那麼就需要全部回滾,具體我們可以看到發生異常之後執行的是第33行代碼,為future設置了異常,隨後finally中進行了回滾操作;緊接著第15行獲得可該AcceptorOperationFuture裡面對應的SocketAddress列表,接著執行了第20行的open方法,為我們的每個SocketAddress創建一個ServerSocketChannel及其對應的ServerSocket,同時將通道注冊到Selector上面,並且為當前通道注冊OP_ACCEPT事件;我們來看看open方法,這個方法是在AbstractPollingIoAcceptor的子類NioSocketAcceptor中實現的;
NioSocketAcceptor$open()
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception { // Creates the listening ServerSocket ServerSocketChannel channel = null; if (selectorProvider != null) { channel = selectorProvider.openServerSocketChannel(); } else { channel = ServerSocketChannel.open(); } boolean success = false; try { // This is a non blocking socket channel channel.configureBlocking(false); // Configure the server socket, ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting socket.setReuseAddress(isReuseAddress()); // and bind. try { socket.bind(localAddress, getBacklog()); } catch (IOException ioe) { // Add some info regarding the address we try to bind to the // message String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " + ioe.getMessage(); Exception e = new IOException(newMessage); e.initCause(ioe.getCause()); // And close the channel channel.close(); throw e; } // Register the channel within the selector for ACCEPT event channel.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(channel); } } return channel; }可以看到這個open方法裡面其實就是我們使用NIO的經典步驟了,首先創建一個ServerSocketChannel對象,接著將ServerSocketChannel通道設置為非阻塞式,根據當前通道創建一個ServerSocket對象,並且為當前ServerSocket綁定我們傳入的參數SocketAddress,最後第42行把我們創建的通道注冊到Selector選擇器上面,同時注冊OP_ACCEPT事件;
open方法執行結束之後,registerHandles也算結束了,registerHandles中其他部分代碼可以略過,至此,我們將服務端需要創建的ServerSocketChannel及其對應綁定了指定SocketAddress的ServerSocket注冊到了Selector選擇器中,同時注冊了OP_ACCEPT事件;
回到我們Acceptor裡面的run方法,注意registerHandles方法的返回值實際上就是我們已經創建ServerSocketChannel的個數,接著就是執行第25行,如果我們創建的ServerSocketChannel個數為0的話,就會退出這個while死循環,因為我們沒有任何ServerSocket來監聽客戶端連接的到來,避免資源的浪費;隨後就是第41行,當有通道被選擇的時候,selected的值將會是大於0的,那麼就會執行第44行的processHandles方法,這個方法的參數是由selectedHandles獲得的,他的實現是在NioSocketAcceptor裡面的
NioSocketAcceptor$selectedHandles
protected Iterator可以看到實際上selectedHandles就是返回我們已經選中通道的集合而已了selectedHandles() { return new ServerSocketChannelIterator(selector.selectedKeys()); }
接下來我們看看processHandles做了些什麼
AbstractPollingIoAcceptor$processHandles
private void processHandles(Iterator這段代碼相對來說比較短,我們仔細看看裡面做了些什麼,首先迭代我們的ServerSocketChannel集合,從中取出一個ServerSocketChannel對象,我這裡把H的類型全部說成是ServerSocketChannel的原因在於我們主要分析的是MINA框架中關於Socket的這部分,因為MINA不僅僅支持Socket通信,同時支持UDP數據包通信,因而這裡使用的是泛型實現的,在獲得一個ServerSocketChannel對象之後,要注意將其從迭代器中刪除,避免進行重復多次處理,接著執行第8行,創建一個IoSession對象出來,具體來講我們這裡創建的是NioSocketSession對象,調用的方法是accept,這個方法的第一個參數就是我們之前在創建NioSocketAcceptor的時候創建的SimpleIoProcessorPool對象,默認情況下在他裡面是會創建CPU個數+1個NioProcessor的,這個方法在AbstractPollingIoAcceptor中是沒有實現的,因此我們查看他的子類NioSocketAcceptor裡面handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } } }
NioSocketAcceptor$accept()
protected NioSession accept(IoProcessorprocessor, ServerSocketChannel handle) throws Exception { SelectionKey key = null; if (handle != null) { key = handle.keyFor(selector); } if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); }
這個方法裡首先獲得被選中ServerSocketChannel的key,接著對該key進行一系列的判斷,接著第14行獲取到和當前ServerSocketChannel有關聯的SocketChannel,這裡需要補充一點的就是ServerSocketChannel和Selector是通過SelectionKey來發生關聯的,SelectionKey標志了我們當前ServerSocketChannel的狀態,而如果說某一客戶端想要和服務器某一端口服務發生關聯的話,那麼它實際上是和與該端口綁定的ServerSocketChannel發生聯系的,因此我們就可以通過ServerSocketChannel獲取與他有關聯了客戶端SocketChannel啦;最後執行第20行創建一個NioSocketSession對象,我們來看看他的構造函數;
NioSocketSession$NioSocketSession()
public NioSocketSession(IoService service, IoProcessor首先執行的是super的構造函數,其實就是NioSession的構造函數了,我們來看看processor, SocketChannel channel) { super(processor, service, channel); config = new SessionConfigImpl(); this.config.setAll(service.getSessionConfig()); }
protected NioSession(IoProcessor首先執行super的構造函數,實際上執行的是AbstractIoSession的構造函數,裡面沒有做多少事,我們不再展開講,接著第5行創建了一個DefaultIoFilterChain對象出來,這個還是比較重要的,我們來看下裡面做了什麼事;processor, IoService service, Channel channel) { super(service); this.channel = channel; this.processor = processor; filterChain = new DefaultIoFilterChain(this); }
DefaultIoFilterChain$DefaultIoFilterChain()
public DefaultIoFilterChain(AbstractIoSession session) { if (session == null) { throw new IllegalArgumentException("session"); } this.session = session; head = new EntryImpl(null, null, "head", new HeadFilter()); tail = new EntryImpl(head, null, "tail", new TailFilter()); head.nextEntry = tail; }這個構造函數中為我們創建了兩個EntryImpl類型的對象,分別封裝的是HeadFilter和TailFilter對象,這裡有必要說下DefaultIoFilterChain的作用了,在我們創建Session的時候,會為Session創建一個Filter責任鏈出來,那麼責任鏈主要是干什麼的呢?主要進行進行我們二進制與真正對象之間的轉換啦,因為我們都知道在網絡中傳輸的只能是字節,並不能傳遞對象,那麼我們就需要字節和對象之間的轉換,Filter鏈就是用來干這個的,當然你可以在客戶端將要發送的數據通過Filter鏈來進行加密,在服務端再通過Filter鏈來進行解密,這個是完全可以的,既然是鏈嘛,就需要鏈頭和鏈尾了;他們都會被封裝到EntryImpl中,至於EntryImpl裡面有什麼我們就不貼出來了,主要就是prevEntry,nextEntry,nextFilter從名字上就能明顯看出來主要是用於EntryImpl鏈拼接的實體罷了,有點類似於鏈表;
到此呢,我們的NioSocketSession就創建成功啦,創建NioSocketSession其實主要就是在它裡面創建一個IoFilter責任鏈出來,用於處理當前Session的一些編解碼工作,這樣我們的NioSocketAcceptor的accept方法就執行結束了,返回了一個NioSocketSession對象,繼續回到AbstractPollingIoAcceptor裡面的processHandles方法,在第8行創建完NioSocketSession之後,執行第17行,將我們的NioSocketSession對象放到NioProcessor中,具體實現過程見下:
首先執行的是session的getProcessor方法,這裡的session類型是NioSocketSession,所以我們去NioSocketSession裡面查看getProcessor,你會發現它裡面不存在這個方法,那就要去他的父類NioSession裡面找了,果然我們找到了:
public IoProcessorgetProcessor() { return processor; }
getProcessor裡面的方法體非常簡單,就是返回processor而已了,那麼這個processor是在哪裡賦值的呢?就是在創建NioSession的構造函數裡面,我們在創建NioSocketSession的時候是會調用super來調用NioSession的構造函數的,也就是我們這裡的processor就是我們在創建NioSocketAcceptor的時候創建的SimpleIoProcessorPool對象,接下來調用的就是它裡面的add方法啦:
public final void add(S session) { getProcessor(session).add(session); }可以看到在SimpleIoProcessor裡面的add方法裡,首先執行的是getProcessor,從SimpleIoProcessor裡面獲得一個Processor對象出來,具體來講這裡獲得到的Processor類型將是NioProcessor類型,我們看看getProcessor方法
private IoProcessor getProcessor(S session) { IoProcessor processor = (IoProcessor) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }這個方法最關鍵的就是第9行,獲取到當前session的id,對其取絕對值,並且對我們創建SimpleIoProcessor的時候創建的NioProcessor數組進行取余運算,獲得數組中的一個NioProcessor對象,默認情況下這個數組的大小是CPU個數+1;最後第15行將當前Session的PROCESSOR屬性設置為獲取到的NioProcessor;
那麼到這裡,實際上add操作執行的就是NioProcessor的add操作啦,我們查看NioProcessor裡面會發現不存在這個方法,那麼需要去他的父類AbstractPollingIoProcessor查看,代碼見下:
AbstractPollingIoProcessor$add()
public final void add(S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker newSessions.add(session); startupProcessor(); }將當前NioSocketSession添加到newSession裡面,這裡的newSessions實際上就是NioSocketSession隊列,就是我們當前NioProcessor需要處理的NioSocketSession所組成的集合了,為什麼還要這個集合呢?道理很簡單嘛,剛剛你在通過getProcessor方法為NioSocketSession設置處理他的NioPrrocessor的時候,采用的方法是通過session的id對包含NioProcessor對象的數組進行取模運算的,這肯定就不能避免多個NioSocketSession同時都需要一個NioProcessor來處理的情況了,那麼為了保存這些需要NioProcessor處理的NioSocketSession,自然需要一個隊列來存儲了;
緊接著執行了startupProcessor方法,如果你還記得上面的源碼分析過程的話,會發現上面有調用過startupAcceptor方法,這兩個方法不同之處在於一個是用於開啟Processor線程執行它裡面NioSocketSession請求的,一個是用於開啟Acceptor來進行ServerSocketChannel的事件注冊的,並且startupAcceptor只會執行一次,而startupProcessor會執行多次,默認情況下最多執行CPU個數+1次;
我們來看看startupProcessor方法:
AbstractPollingIoProcessor$startupProcessor
private void startupProcessor() { Processor processor = processorRef.get(); if (processor == null) { processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup(); }這個方法首先就是創建了一個Processor對象他實現了Runnable接口,隨後調用executor的execute方法,將封裝成NamePreservingRunnable的Processor放入線程池中,executor是CachedThreadPool類型的線程池,那麼接下來就是執行Processor線程的run方法了:
public void run() { assert (processorRef.get() == this); int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. if (isBrokenConnection()) { LOG.warn("Broken connection"); } else { LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. registerNewSelector(); } } // Manage newly created session first nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions nSessions -= removeSessions(); // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); // Get a chance to exit the infinite loop if there are no // more sessions on this Processor if (nSessions == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert (processorRef.get() != this); break; } assert (processorRef.get() != this); if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert (processorRef.get() != this); break; } assert (processorRef.get() == this); } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { boolean hasKeys = false; for (Iterator i = allSessions(); i.hasNext();) { IoSession session = i.next(); if (session.isActive()) { scheduleRemove((S)session); hasKeys = true; } } if (hasKeys) { wakeup(); } } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop // But first, dump a stack trace ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { doDispose(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } } }和Acceptor的run方法類似,同樣存在一個死循環,第14行調用了Selector的select方法,但是和之前Acceptor中調用的select方法不同,我們這裡調用的是有參數的select方法,這種方式會讓我們的選擇器每隔SELECT_TIMEOUT被喚醒一次,讓他進行重新選擇,目的就是為了管理空閒的NioSocketSession,而使用無參的select的話會一直阻塞下去,直到出現需要的事件為止;接著第43行執行了handleNewSessions方法
private int handleNewSessions() { int addedSessions = 0; for (S session = newSessions.poll(); session != null; session = newSessions.poll()) { if (addNow(session)) { // A new session has been created addedSessions++; } } return addedSessions; }可以看到通過for循環不停的poll出隊列中存在的NioSocketSession對象,同時調用addNow方法對當前NioSocketSession中對應的SocketChannel進行OP_READ操作的注冊,具體我們可以看看addNow方法:
AbstractPollingIoProcessor$addNow()
private boolean addNow(S session) { boolean registered = false; try { init(session); registered = true; // Build the filter chain of this session. IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); chainBuilder.buildFilterChain(session.getFilterChain()); // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). // Propagate the SESSION_CREATED event up to the chain IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); listeners.fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { destroy(session); } catch (Exception e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } finally { registered = false; } } return registered; }這個方法首先第5行執行了init方法,這個方法就是用來為當前NioSocketSession對應的SocketChannel注冊OP_READ事件的,具體實現是在NioProcessor裡面的:
NioProcessor$init()
@Override protected void init(NioSession session) throws Exception { SelectableChannel ch = (SelectableChannel) session.getChannel(); ch.configureBlocking(false); session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); }可以看到首先是獲得當前NioSocketSession對應的SocketChannel對應,他是SelectableChannel的子類,接著將當前獲得到的通道設置為非阻塞式,隨後為其注冊OP_READ事件;
這樣的話,addNow方法執行結束了,由於這篇篇幅已經比較長了,所以決定在下一篇繼續分析,未完,請繼續查看下一篇;
我們總是出門後才發現忘了關電腦,或是忘了從電腦中拷貝重要數據,難道還得跑回家一趟?嘿嘿,想在外網環境下遠程控制PC,其實沒有那麼復雜。如果某位童鞋擔心遇到上
本文主要介紹使用Google自帶的FaceDetectionListener進行人臉檢測,並將檢測到的人臉用矩形框繪制出來。本文代碼基於PlayCameraV1.0.0,
下面正式開始Fragment使用講解:一.准備工作1.引入類庫(導包)步驟:復制jar包--->project視圖下--->工程--->app---&g
效果圖:使用了 一個時間相關的工具類 package com.yqy.yqy_date;import android.util.Log;import java.