IoSession:在每一次連接建立成功之後都會創建一個IoSession對象出來,並且在創建該對象的時候創建一個IoFilter對象出來,通過IoSession的session id來為當前IoSession設置處理他的IoProcessor;
首先我們通過NioSocketAcceptor acceptor = new NioSocketAcceptor();創建了一個NioSocketAcceptor對象出來,那我們就得看看NioSocketAcceptor的構造函數裡面做了些什麼事了;
public NioSocketAcceptor() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); }可以看到首先調用了父類的構造函數,也就是AbstractPollingIoAcceptor的構造函數,並且傳入了NioProcessor的Class對象,這裡我們可以想象一下後面肯定會用這個NioProcessor的Class對象進行一些與反射有關的操作;
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool(processorClass), true, null); }可以看到實際上調用的是5個參數的構造函數,在看這個構造函數之前,我們看到第三個參數利用我們從NioSocketAcceptor構造函數中傳進來的NioProcessor對象,創建了一個SimpleIoProcessorPool對象,我們來看看SimpleIoProcessorPool的構造函數;
public SimpleIoProcessorPool(Class> processorType) { this(processorType, null, DEFAULT_SIZE, null); }發現他接著調用的是SimpleIoProcessorPool四個參數的構造函數,並且添加了一個DEFAULT_SIZE參數,這個值的大小等於我們CPU的核數+1,這也是我們在創建NioSocketAcceptor的時候默認創建的NioProcessor的線程個數,來看看SimpleIoProcessorPool四個參數的構造函數:
public SimpleIoProcessorPool(Class> processorType, Executor executor, int size, SelectorProvider selectorProvider) { if (processorType == null) { throw new IllegalArgumentException("processorType"); } if (size <= 0) { throw new IllegalArgumentException("size: " + size + " (expected: positive integer)"); } // Create the executor if none is provided createdExecutor = (executor == null); if (createdExecutor) { this.executor = Executors.newCachedThreadPool(); // Set a default reject handler ((ThreadPoolExecutor) this.executor).setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } else { this.executor = executor; } pool = new IoProcessor[size]; boolean success = false; Constructor> processorConstructor = null; boolean usesExecutorArg = true; try { // We create at least one processor try { try { processorConstructor = processorType.getConstructor(ExecutorService.class); pool[0] = processorConstructor.newInstance(this.executor); } catch (NoSuchMethodException e1) { // To the next step... try { if(selectorProvider==null) { processorConstructor = processorType.getConstructor(Executor.class); pool[0] = processorConstructor.newInstance(this.executor); } else { processorConstructor = processorType.getConstructor(Executor.class, SelectorProvider.class); pool[0] = processorConstructor.newInstance(this.executor,selectorProvider); } } catch (NoSuchMethodException e2) { // To the next step... try { processorConstructor = processorType.getConstructor(); usesExecutorArg = false; pool[0] = processorConstructor.newInstance(); } catch (NoSuchMethodException e3) { // To the next step... } } } } catch (RuntimeException re) { LOGGER.error("Cannot create an IoProcessor :{}", re.getMessage()); throw re; } catch (Exception e) { String msg = "Failed to create a new instance of " + processorType.getName() + ":" + e.getMessage(); LOGGER.error(msg, e); throw new RuntimeIoException(msg, e); } if (processorConstructor == null) { // Raise an exception if no proper constructor is found. String msg = String.valueOf(processorType) + " must have a public constructor with one " + ExecutorService.class.getSimpleName() + " parameter, a public constructor with one " + Executor.class.getSimpleName() + " parameter or a public default constructor."; LOGGER.error(msg); throw new IllegalArgumentException(msg); } // Constructor found now use it for all subsequent instantiations for (int i = 1; i < pool.length; i++) { try { if (usesExecutorArg) { if(selectorProvider==null) { pool[i] = processorConstructor.newInstance(this.executor); } else { pool[i] = processorConstructor.newInstance(this.executor, selectorProvider); } } else { pool[i] = processorConstructor.newInstance(); } } catch (Exception e) { // Won't happen because it has been done previously } } success = true; } finally { if (!success) { dispose(); } } }
public NioProcessor(Executor executor) { super(executor); try { // Open a new selector selector = Selector.open(); } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector.", e); } }可以注意到的是在SimpleIoProcessorPool裡面有兩種通過反射創建NioProcessor對象的方式,就是我們上面代碼的第78和80這兩種方式,兩者的區別在於如果我們在創建SimpleIoProcessorPool的時候傳入了SelectorProvider對象,那麼NioProcessor裡面的Selector將直接調用SelectorProvider的openSelector來獲得,而如果沒有傳入SelectorProvider對象的話,NioProcessor裡面的Selector將通過Selector.open方法獲得;
private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor processor, boolean createdProcessor, SelectorProvider selectorProvider) { super(sessionConfig, executor); if (processor == null) { throw new IllegalArgumentException("processor"); } this.processor = processor; this.createdProcessor = createdProcessor; try { // Initialize the selector init(selectorProvider); // The selector is now ready, we can switch the // flag to true so that incoming connection can be accepted selectable = true; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to initialize.", e); } finally { if (!selectable) { try { destroy(); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } } }首先執行了super構造函數,這個構造函數實際上執行的是AbstractIoService的構造函數;
protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { if (sessionConfig == null) { throw new IllegalArgumentException("sessionConfig"); } if (getTransportMetadata() == null) { throw new IllegalArgumentException("TransportMetadata"); } if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " + getTransportMetadata().getSessionConfigType() + ")"); } // Create the listeners, and add a first listener : a activation listener // for this service, which will give information on the service state. listeners = new IoServiceListenerSupport(this); listeners.add(serviceActivationListener); // Stores the given session configuration this.sessionConfig = sessionConfig; // Make JVM load the exception monitor before some transports // change the thread context class loader. ExceptionMonitor.getInstance(); if (executor == null) { this.executor = Executors.newCachedThreadPool(); createdExecutor = true; } else { this.executor = executor; createdExecutor = false; } threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); }
protected void init(SelectorProvider selectorProvider) throws Exception { this.selectorProvider = selectorProvider; if (selectorProvider == null) { selector = Selector.open(); } else { selector = selectorProvider.openSelector(); } }
public final void bind(Iterable localAddresses) throws IOException { if (isDisposing()) { throw new IllegalStateException("The Accpetor disposed is being disposed."); } if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } List不管你調用的是哪個bind方法,最後執行的都是這個bind方法,在這個方法中首先會進行迭代,將所有需要綁定的地址存儲到localAddressesCopy裡面,隨後在第34行調用bindInternal方法進行綁定,這個方法在AbstractIoAcceptor裡面是沒有實現的,需要到他的子類AbstractPollingIoAcceptor查看,這個類中實現了該方法:localAddressesCopy = new ArrayList (); for (SocketAddress a : localAddresses) { checkAddressType(a); localAddressesCopy.add(a); } if (localAddressesCopy.isEmpty()) { throw new IllegalArgumentException("localAddresses is empty."); } boolean activate = false; synchronized (bindLock) { synchronized (boundAddresses) { if (boundAddresses.isEmpty()) { activate = true; } } if (getHandler() == null) { throw new IllegalStateException("handler is not set."); } try { Set addresses = bindInternal(localAddressesCopy); synchronized (boundAddresses) { boundAddresses.addAll(addresses); } } catch (IOException e) { throw e; } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e); } } if (activate) { getListeners().fireServiceActivated(); } }
protected final SetbindInternal(List localAddresses) throws Exception { // Create a bind request as a Future operation. When the selector // have handled the registration, it will signal this future. AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); // adds the Registration request to the queue for the Workers // to handle registerQueue.add(request); // creates the Acceptor instance and has the local // executor kick it off. startupAcceptor(); // As we just started the acceptor, we have to unblock the select() // in order to process the bind request we just have added to the // registerQueue. try { lock.acquire(); // Wait a bit to give a chance to the Acceptor thread to do the select() Thread.sleep(10); wakeup(); } finally { lock.release(); } // Now, we wait until this request is completed. request.awaitUninterruptibly(); if (request.getException() != null) { throw request.getException(); } // Update the local addresses. // setLocalAddresses() shouldn't be called from the worker thread // because of deadlock. Set newLocalAddresses = new HashSet (); for (H handle : boundHandles.values()) { newLocalAddresses.add(localAddress(handle)); } return newLocalAddresses; }
private void startupAcceptor() throws InterruptedException { // If the acceptor is not ready, clear the queues // TODO : they should already be clean : do we have to do that ? if (!selectable) { registerQueue.clear(); cancelQueue.clear(); } // start the acceptor if not already started Acceptor acceptor = acceptorRef.get(); if (acceptor == null) { lock.acquire(); acceptor = new Acceptor(); if (acceptorRef.compareAndSet(null, acceptor)) { executeWorker(acceptor); } else { lock.release(); } } }這個方法關鍵是第10行或者14行,創建一個Acceptor類型的對象,Acceptor實現了Runnable接口,並且在第17行執行executeWorker方法,這個方法在AbstractPollingIoAcceptor中並沒有實現,具體實現是在他的間接父類AbstractIoService中的,我們查看AbstractIoService中的executeWorker方法:
protected final void executeWorker(Runnable worker) { executeWorker(worker, null); } protected final void executeWorker(Runnable worker, String suffix) { String actualThreadName = threadName; if (suffix != null) { actualThreadName = actualThreadName + '-' + suffix; } executor.execute(new NamePreservingRunnable(worker, actualThreadName)); }可以看到實際上是首先將我們上面創建的Acceptor對象放到線程池executor裡面,這裡的executor線程池是我們在創建NioSocketAcceptor的時候創建的,他是CachedThreadPool類型的,隨後執行exexcute方法,將該線程池運行起來,那麼緊接著執行的就該是Acceptor的run方法了;
public void run() { assert (acceptorRef.get() == this); int nHandles = 0; // Release the lock lock.release(); while (selectable) { try { // Detect if we have some keys ready to be processed // The select() will be woke up if some new connection // have occurred, or if the selector has been explicitly // woke up int selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port on which this class will // listen on nHandles += registerHandles(); // Now, if the number of registred handles is 0, we can // quit the loop: we don't have any socket listening // for incoming connection. if (nHandles == 0) { acceptorRef.set(null); if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { assert (acceptorRef.get() != this); break; } if (!acceptorRef.compareAndSet(null, this)) { assert (acceptorRef.get() != this); break; } assert (acceptorRef.get() == this); } if (selected > 0) { // We have some connection request, let's process // them here. processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } // Cleanup all the processors, and shutdown the acceptor. if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } }可以看到第9行首先判斷selectable的值是true還是false,這個值是在什麼時候賦值的呢?就是在AbstractPollingIoAcceptor的構造函數裡面了,只要我們在NioSocketAcceptor裡面創建了Selector對象之後就會將selectable的值設置為true,那麼我們這裡run方法裡面的while循環將是死循環了,一直等待客戶端的連接請求;第15行的select方法將處於阻塞狀態,它實際上調用的就是我們Selector的select方法,一直等待著客戶端的接入,在有客戶端連接或者Selector被明確喚醒的情況下就會返回,返回結果大於0表示有客戶端連接接入;接著執行第20行的registerHandles方法
private int registerHandles() { for (;;) { // The register queue contains the list of services to manage // in this acceptor. AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } // We create a temporary map to store the bound handles, // as we may have to remove them all if there is an exception // during the sockets opening. MapregisterHandles方法主要用於創建ServerSocketChannel,為通道創建ServerSocket並且為其綁定端口號,創建接收緩存區,並且為Selector注冊OP_ACCEPT事件;newHandles = new ConcurrentHashMap (); List localAddresses = future.getLocalAddresses(); try { // Process all the addresses for (SocketAddress a : localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } // Everything went ok, we can now update the map storing // all the bound sockets. boundHandles.putAll(newHandles); // and notify. future.setDone(); return newHandles.size(); } catch (Exception e) { // We store the exception in the future future.setException(e); } finally { // Roll back if failed to bind all addresses. if (future.getException() != null) { for (H handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } // TODO : add some comment : what is the wakeup() waking up ? wakeup(); } } } }
protected ServerSocketChannel open(SocketAddress localAddress) throws Exception { // Creates the listening ServerSocket ServerSocketChannel channel = null; if (selectorProvider != null) { channel = selectorProvider.openServerSocketChannel(); } else { channel = ServerSocketChannel.open(); } boolean success = false; try { // This is a non blocking socket channel channel.configureBlocking(false); // Configure the server socket, ServerSocket socket = channel.socket(); // Set the reuseAddress flag accordingly with the setting socket.setReuseAddress(isReuseAddress()); // and bind. try { socket.bind(localAddress, getBacklog()); } catch (IOException ioe) { // Add some info regarding the address we try to bind to the // message String newMessage = "Error while binding on " + localAddress + "\n" + "original message : " + ioe.getMessage(); Exception e = new IOException(newMessage); e.initCause(ioe.getCause()); // And close the channel channel.close(); throw e; } // Register the channel within the selector for ACCEPT event channel.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(channel); } } return channel; }可以看到這個open方法裡面其實就是我們使用NIO的經典步驟了,首先創建一個ServerSocketChannel對象,接著將ServerSocketChannel通道設置為非阻塞式,根據當前通道創建一個ServerSocket對象,並且為當前ServerSocket綁定我們傳入的參數SocketAddress,最後第42行把我們創建的通道注冊到Selector選擇器上面,同時注冊OP_ACCEPT事件;
protected Iterator可以看到實際上selectedHandles就是返回我們已經選中通道的集合而已了selectedHandles() { return new ServerSocketChannelIterator(selector.selectedKeys()); }
private void processHandles(Iterator這段代碼相對來說比較短,我們仔細看看裡面做了些什麼,首先迭代我們的ServerSocketChannel集合,從中取出一個ServerSocketChannel對象,我這裡把H的類型全部說成是ServerSocketChannel的原因在於我們主要分析的是MINA框架中關於Socket的這部分,因為MINA不僅僅支持Socket通信,同時支持UDP數據包通信,因而這裡使用的是泛型實現的,在獲得一個ServerSocketChannel對象之後,要注意將其從迭代器中刪除,避免進行重復多次處理,接著執行第8行,創建一個IoSession對象出來,具體來講我們這裡創建的是NioSocketSession對象,調用的方法是accept,這個方法的第一個參數就是我們之前在創建NioSocketAcceptor的時候創建的SimpleIoProcessorPool對象,默認情況下在他裡面是會創建CPU個數+1個NioProcessor的,這個方法在AbstractPollingIoAcceptor中是沒有實現的,因此我們查看他的子類NioSocketAcceptor裡面handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session S session = accept(processor, handle); if (session == null) { continue; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } } }
protected NioSession accept(IoProcessorprocessor, ServerSocketChannel handle) throws Exception { SelectionKey key = null; if (handle != null) { key = handle.keyFor(selector); } if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); }
public NioSocketSession(IoService service, IoProcessor首先執行的是super的構造函數,其實就是NioSession的構造函數了,我們來看看processor, SocketChannel channel) { super(processor, service, channel); config = new SessionConfigImpl(); this.config.setAll(service.getSessionConfig()); }
protected NioSession(IoProcessor首先執行super的構造函數,實際上執行的是AbstractIoSession的構造函數,裡面沒有做多少事,我們不再展開講,接著第5行創建了一個DefaultIoFilterChain對象出來,這個還是比較重要的,我們來看下裡面做了什麼事;processor, IoService service, Channel channel) { super(service); this.channel = channel; this.processor = processor; filterChain = new DefaultIoFilterChain(this); }
public DefaultIoFilterChain(AbstractIoSession session) { if (session == null) { throw new IllegalArgumentException("session"); } this.session = session; head = new EntryImpl(null, null, "head", new HeadFilter()); tail = new EntryImpl(head, null, "tail", new TailFilter()); head.nextEntry = tail; }這個構造函數中為我們創建了兩個EntryImpl類型的對象,分別封裝的是HeadFilter和TailFilter對象,這裡有必要說下DefaultIoFilterChain的作用了,在我們創建Session的時候,會為Session創建一個Filter責任鏈出來,那麼責任鏈主要是干什麼的呢?主要進行進行我們二進制與真正對象之間的轉換啦,因為我們都知道在網絡中傳輸的只能是字節,並不能傳遞對象,那麼我們就需要字節和對象之間的轉換,Filter鏈就是用來干這個的,當然你可以在客戶端將要發送的數據通過Filter鏈來進行加密,在服務端再通過Filter鏈來進行解密,這個是完全可以的,既然是鏈嘛,就需要鏈頭和鏈尾了;他們都會被封裝到EntryImpl中,至於EntryImpl裡面有什麼我們就不貼出來了,主要就是prevEntry,nextEntry,nextFilter從名字上就能明顯看出來主要是用於EntryImpl鏈拼接的實體罷了,有點類似於鏈表;
public IoProcessorgetProcessor() { return processor; }
public final void add(S session) { getProcessor(session).add(session); }可以看到在SimpleIoProcessor裡面的add方法裡,首先執行的是getProcessor,從SimpleIoProcessor裡面獲得一個Processor對象出來,具體來講這裡獲得到的Processor類型將是NioProcessor類型,我們看看getProcessor方法
private IoProcessor getProcessor(S session) { IoProcessor processor = (IoProcessor) session.getAttribute(PROCESSOR); if (processor == null) { if (disposed || disposing) { throw new IllegalStateException("A disposed processor cannot be accessed."); } processor = pool[Math.abs((int) session.getId()) % pool.length]; if (processor == null) { throw new IllegalStateException("A disposed processor cannot be accessed."); } session.setAttributeIfAbsent(PROCESSOR, processor); } return processor; }這個方法最關鍵的就是第9行,獲取到當前session的id,對其取絕對值,並且對我們創建SimpleIoProcessor的時候創建的NioProcessor數組進行取余運算,獲得數組中的一個NioProcessor對象,默認情況下這個數組的大小是CPU個數+1;最後第15行將當前Session的PROCESSOR屬性設置為獲取到的NioProcessor;
public final void add(S session) { if (disposed || disposing) { throw new IllegalStateException("Already disposed."); } // Adds the session to the newSession queue and starts the worker newSessions.add(session); startupProcessor(); }將當前NioSocketSession添加到newSession裡面,這裡的newSessions實際上就是NioSocketSession隊列,就是我們當前NioProcessor需要處理的NioSocketSession所組成的集合了,為什麼還要這個集合呢?道理很簡單嘛,剛剛你在通過getProcessor方法為NioSocketSession設置處理他的NioPrrocessor的時候,采用的方法是通過session的id對包含NioProcessor對象的數組進行取模運算的,這肯定就不能避免多個NioSocketSession同時都需要一個NioProcessor來處理的情況了,那麼為了保存這些需要NioProcessor處理的NioSocketSession,自然需要一個隊列來存儲了;
private void startupProcessor() { Processor processor = processorRef.get(); if (processor == null) { processor = new Processor(); if (processorRef.compareAndSet(null, processor)) { executor.execute(new NamePreservingRunnable(processor, threadName)); } } // Just stop the select() and start it again, so that the processor // can be activated immediately. wakeup(); }這個方法首先就是創建了一個Processor對象他實現了Runnable接口,隨後調用executor的execute方法,將封裝成NamePreservingRunnable的Processor放入線程池中,executor是CachedThreadPool類型的線程池,那麼接下來就是執行Processor線程的run方法了:
public void run() { assert (processorRef.get() == this); int nSessions = 0; lastIdleCheckTime = System.currentTimeMillis(); for (;;) { try { // This select has a timeout so that we can manage // idle session when we get out of the select every // second. (note : this is a hack to avoid creating // a dedicated thread). long t0 = System.currentTimeMillis(); int selected = select(SELECT_TIMEOUT); long t1 = System.currentTimeMillis(); long delta = (t1 - t0); if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { // Last chance : the select() may have been // interrupted because we have had an closed channel. if (isBrokenConnection()) { LOG.warn("Broken connection"); } else { LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0)); // Ok, we are hit by the nasty epoll // spinning. // Basically, there is a race condition // which causes a closing file descriptor not to be // considered as available as a selected channel, // but // it stopped the select. The next time we will // call select(), it will exit immediately for the // same // reason, and do so forever, consuming 100% // CPU. // We have to destroy the selector, and // register all the socket on a new one. registerNewSelector(); } } // Manage newly created session first nSessions += handleNewSessions(); updateTrafficMask(); // Now, if we have had some incoming or outgoing events, // deal with them if (selected > 0) { // LOG.debug("Processing ..."); // This log hurts one of // the MDCFilter test... process(); } // Write the pending requests long currentTime = System.currentTimeMillis(); flush(currentTime); // And manage removed sessions nSessions -= removeSessions(); // Last, not least, send Idle events to the idle sessions notifyIdleSessions(currentTime); // Get a chance to exit the infinite loop if there are no // more sessions on this Processor if (nSessions == 0) { processorRef.set(null); if (newSessions.isEmpty() && isSelectorEmpty()) { // newSessions.add() precedes startupProcessor assert (processorRef.get() != this); break; } assert (processorRef.get() != this); if (!processorRef.compareAndSet(null, this)) { // startupProcessor won race, so must exit processor assert (processorRef.get() != this); break; } assert (processorRef.get() == this); } // Disconnect all sessions immediately if disposal has been // requested so that we exit this loop eventually. if (isDisposing()) { boolean hasKeys = false; for (Iterator i = allSessions(); i.hasNext();) { IoSession session = i.next(); if (session.isActive()) { scheduleRemove((S)session); hasKeys = true; } } if (hasKeys) { wakeup(); } } } catch (ClosedSelectorException cse) { // If the selector has been closed, we can exit the loop // But first, dump a stack trace ExceptionMonitor.getInstance().exceptionCaught(cse); break; } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } try { synchronized (disposalLock) { if (disposing) { doDispose(); } } } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setValue(true); } } }和Acceptor的run方法類似,同樣存在一個死循環,第14行調用了Selector的select方法,但是和之前Acceptor中調用的select方法不同,我們這裡調用的是有參數的select方法,這種方式會讓我們的選擇器每隔SELECT_TIMEOUT被喚醒一次,讓他進行重新選擇,目的就是為了管理空閒的NioSocketSession,而使用無參的select的話會一直阻塞下去,直到出現需要的事件為止;接著第43行執行了handleNewSessions方法
private int handleNewSessions() { int addedSessions = 0; for (S session = newSessions.poll(); session != null; session = newSessions.poll()) { if (addNow(session)) { // A new session has been created addedSessions++; } } return addedSessions; }可以看到通過for循環不停的poll出隊列中存在的NioSocketSession對象,同時調用addNow方法對當前NioSocketSession中對應的SocketChannel進行OP_READ操作的注冊,具體我們可以看看addNow方法:
private boolean addNow(S session) { boolean registered = false; try { init(session); registered = true; // Build the filter chain of this session. IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder(); chainBuilder.buildFilterChain(session.getFilterChain()); // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here // in AbstractIoFilterChain.fireSessionOpened(). // Propagate the SESSION_CREATED event up to the chain IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners(); listeners.fireSessionCreated(session); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { destroy(session); } catch (Exception e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } finally { registered = false; } } return registered; }這個方法首先第5行執行了init方法,這個方法就是用來為當前NioSocketSession對應的SocketChannel注冊OP_READ事件的,具體實現是在NioProcessor裡面的:
@Override protected void init(NioSession session) throws Exception { SelectableChannel ch = (SelectableChannel) session.getChannel(); ch.configureBlocking(false); session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); }可以看到首先是獲得當前NioSocketSession對應的SocketChannel對應,他是SelectableChannel的子類,接著將當前獲得到的通道設置為非阻塞式,隨後為其注冊OP_READ事件;
