Android教程網
  1. 首頁
  2. Android 技術
  3. Android 手機
  4. Android 系統教程
  5. Android 游戲
 Android教程網 >> Android技術 >> 關於Android編程 >> tomcat源碼解析(三)--請求過程之數據的接收

tomcat源碼解析(三)--請求過程之數據的接收

編輯:關於Android編程

本章只分析Http11NioProtocol處理請求的過程,該方法也是目前我分析的版本默認的處理方式.
根據第一章的分析知道會在StandardService類的startInternal方法方法裡面啟動監聽,部分代碼如下:

   @Override
    protected void startInternal() throws LifecycleException {

        ......

        synchronized (executors) {
            for (Executor executor: executors) {
                executor.start();
            }
        }

        ......

        synchronized (connectorsLock) {
            for (Connector connector: connectors) {
                ......
                connector.start();
                ......
            }
        }
    }

在這裡需要注意的是也啟動的線程池,該線程城池將用於處理tomcat下的所有連接請求.好了接著看到Connector 裡面的start方法,因為Connector繼承了LifecycleMBeanBase,所以最終會調用它的startInternal方法,部分代碼如下:

  @Override
    protected void startInternal() throws LifecycleException {
        ......
        protocolHandler.start();
        ......
    }

這裡的protocolHandler是什麼呢,好了接著看它的定義.找到Connector的構造方法,部分代碼如下:

  public Connector() {
        this(null);
    }
    public Connector(String protocol) {
        setProtocol(protocol);
        ......
        ProtocolHandler p = null;
        try {
            Class clazz = Class.forName(protocolHandlerClassName);
            p = (ProtocolHandler) clazz.newInstance();
        } catch (Exception e) {
            ......
        } finally {
            this.protocolHandler = p;
        }
        ......
    }

從構造函數this.protocolHandler = p;在這裡通過反射創建了protocolHandler 的實例,那麼現在要去找protocolHandlerClassName的值.看到setProtocol方法,部分代碼如下:

   public void setProtocol(String protocol) {
       ......

        if ("HTTP/1.1".equals(protocol) || protocol == null) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
            }
        } else if ("AJP/1.3".equals(protocol)) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
            }
        } else {
            setProtocolHandlerClassName(protocol);
        }

    }

注意通過Server.xml創建的時候,

是沒有調用setProtocolHandlerClassName設置任何值的因此,Connector 的protocolHandlerClassName屬性使用的是默認值,

protected String protocolHandlerClassName =
        "org.apache.coyote.http11.Http11NioProtocol";

好了接著分析setProtocol方法,因為該方法的protocol等於null,並且aprConnector的值為false,所以最後protocolHandlerClassName的值還是設置為org.apache.coyote.http11.Http11NioProtocol.

那麼現在就去看一下Http11NioProtocol的做了什麼吧.
看到該類的構造方法:

public Http11NioProtocol() {
        super(new NioEndpoint());
    }

好了回到Connector類的startInternal方法中,知道該方法調用了Http11NioProtocol的start方法,分析該類的時候它沒有start方法,於是乎找到了它的父類AbstractProtocol的start方法裡面,部分代碼如下:

 @Override
    public void start() throws Exception {
       ......
        endpoint.start();
        ......

        // 這個是1秒檢測servlet的異步請求有沒有死掉或者超時
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        timeoutThread.setPriority(endpoint.getThreadPriority());
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }

注意到endpoint.start();剛才在分析Http11NioProtocol的構造方法的時候,有這麼一句super(new NioEndpoint());所以很容易知道,這個endpoint就是NioEndpoint類啦.那麼接著看到它的start方法,該方法的定義是在其父類AbstractEndpoint裡面定義的,代碼如下:

public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

該方法主要調用了bind和startInternal方法.先看一下bind方法,bind方法是在NioEndpoint裡面定義的,部分代碼如下:

 @Override
    public void bind() throws Exception {

        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
        serverSock.socket().bind(addr,getBacklog());
        serverSock.configureBlocking(true); //mimic APR behavior
        serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

        if (acceptorThreadCount == 0) {
            acceptorThreadCount = 1; // 說明accptor的線程只有一個
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        stopLatch = new CountDownLatch(pollerThreadCount);

        // Initialize SSL if needed
        initialiseSsl();  // 這個是支持SSL的

        selectorPool.open(); // 開啟一個selector池
    }

好啦,熟悉java socket編程的話,看到該方法是不是特別熟悉呢.其實該方法就是綁定監口的.還有這裡處理accept請求的線程只有一個,也許有人會問只有一條線程處理accept請求,性能會不會跟不上.其實,根據我分析多個有名網絡庫基本都是一條線程,更甚者把收發數據的工作也放在處理accept請求的線程裡面,但是性能還是超高的.其實只要了解socket收發數據的過程,就很容易理解為什麼要這麼做.好啦,這方面的知識,大家可以去搜索一下.回到正題.
selectorPool.open();這個open的代碼如下:

public void open() throws IOException {
        enabled = true;
        getSharedSelector();
        if (SHARED) {
            blockingSelector = new NioBlockingSelector();
            blockingSelector.open(getSharedSelector());
        }

    }

這裡SHARED的意思是,如果為真的話,那麼全部的請求共用一個selector.繼續分析startInternal方法,

    @Override
    public void startInternal() throws Exception {
        ......
            // 這部分主要創建一些類的緩存池的
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // 創建線程池
            if ( getExecutor() == null ) {
                createExecutor();
            }
            initializeConnectionLatch();

           ......
           // getPollerThreadCount()方法返回的值就是在bind方法裡面提到的pollerThreadCount 
            pollers = new Poller[getPollerThreadCount()]; 
            for (int i=0; i

從該方知道,根據pollerThreadCount 的值創建了對應數量的Poller線程,該線程主要是用來做什麼的呢,具體過程後面分析,在這裡就說一下它的作用:主要用來接收客戶端發送過來的數據的,也就是read事件.
接著看到startAcceptorThreads,該方法的部分代碼如下:

 protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

從這裡知道在bind方法裡面知道acceptorThreadCount 的值為1,因此int count = getAcceptorThreadCount()的值為1,就是只創建了一條Acceptor的線程,那麼看到 createAcceptor方法,內部主要是創建了Acceptor的實例,Acceptor是Runnable的子類,那麼看到run方法,部分代碼如下:

@Override
        public void run() {
           ......
            while (running) {

                ......
                state = AcceptorState.RUNNING;

                ......
                SocketChannel socket = null;
                ......

                socket = serverSock.accept();
                ......

                if (running && !paused) {

                    if (!setSocketOptions(socket)) {// 在這裡處理accept請求的
                        countDownConnection();
                        closeSocket(socket);
                    }
                } else {
                    countDownConnection();
                    closeSocket(socket);
                }
            ......
            }
        }

是不是看到熟悉的代碼了呢.這個就是接受socket的連接的請求的了,接著看到setSocketOptions方法,因為該類主要處理的就是連接進來之後的處理,部分代碼如下:

  protected boolean setSocketOptions(SocketChannel socket) {
        ......
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
            getPoller0().register(channel); // 注冊到之前的poller
        ......
    }

這個方法的實現,大家應該也很熟悉吧.在這裡看到NioChannel channel = nioChannels.pop,還記得startInternal方法裡面創建一些類的緩存吧,就是復用了裡面的實例了.接著看到getPoller0().register(channel),看一下getPoller0()方法,部分代碼如下:

  public Poller getPoller0() {
        int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
        return pollers[idx];
    }

該方法就是獲取startInternal創建的Poller實例,該方法目的就是把連接進來的socket均衡的分配到每一個Poller線程裡面去,接著看到Poller類的register方法,代碼如下:

public void register(final NioChannel socket) {
            socket.setPoller(this);
            NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
            socket.setSocketWrapper(ka);
            ka.setPoller(this);
            ka.setReadTimeout(getSocketProperties().getSoTimeout());
            ka.setWriteTimeout(getSocketProperties().getSoTimeout());
            ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            ka.setSecure(isSSLEnabled());
            ka.setReadTimeout(getSoTimeout());
            ka.setWriteTimeout(getSoTimeout());
            PollerEvent r = eventCache.pop();
            ka.interestOps(SelectionKey.OP_READ);// 這裡就是設置selector的監聽類型為read
            if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
            else r.reset(socket,ka,OP_REGISTER);
            addEvent(r);
        }

其實該方法主要是做一些配置,並把請求包裝成一個PollerEvent 類,扔到Poller的處理任務隊列裡面.

執行完register方法之後,accptor就返回繼續監聽端口了,後面的socket的數據接收就交給Poller線程了.

PollerEvent 也是runnable的一個具體實現,看到run方法,部分代碼如下:

   @Override
        public void run() {
            if (interestOps == OP_REGISTER) {
                ......
                socket.getIOChannel().register(
                socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
                ......
            } else {
               ......
            }
        }

該方法主要作用就是把socket注冊到一個selector裡面去,並設置OP_READ.
到這Connector的startInternal方法就分析完啦.
下面接著分析有數據到來的時候是怎麼處理的.從Poller的構造方法知道,每一個Poller都會擁有自己的一個Selector,代碼如下

 public Poller() throws IOException {
            this.selector = Selector.open();
        }

接著看到Poller的run方法,部分代碼如下:

  @Override
        public void run() {
            while (true) {
                ......
                hasEvents = events();//  這個event全部處理完所有的任務
                if (wakeupCounter.getAndSet(-1) > 0) {

                    keyCount = selector.selectNow();
                } else { 
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0); // 任務隊列為0,後面添加的話需要喚醒一下

                Iterator iterator =

                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();

                    if (attachment == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        processKey(sk, attachment);
                    }
                }

                ......
            }
        }

在這裡events這個方法,就處理了register添加到該隊列裡面的PollerEvent 任務了,過程較簡單就不具體分析了.直接看到processKey的方法,部分代碼如下:

  protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
            ......
            if ( isWorkerAvailable() ) {
                unreg(sk, attachment, sk.readyOps());
                boolean closeSocket = false;

               if (sk.isReadable()) {
                    if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                        closeSocket = true;
                    }
                }
                if (!closeSocket && sk.isWritable()) {
                    if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                        closeSocket = true;
                    }
                }
                ......
            }
    }
unreg(sk, attachment, sk.readyOps());

為什麼要注銷OP_READ的監聽呢,因為每次有數據來的時候,都會把接收數據的請求放在之前創建的線程池裡面接收數據,如果不注銷的時候,就會造成並發接收數據的情況.
看到processSocket的方法,部分代碼如下:

  protected boolean processSocket(NioSocketWrapper attachment, SocketEvent status, boolean dispatch) {    
        ......
        SocketProcessor sc = processorCache.pop(); // 這裡是解析器
        if ( sc == null ) sc = new SocketProcessor(attachment, status);
        else sc.reset(attachment, status);
        Executor executor = getExecutor(); // 這裡沒一個請求都會放在線程池裡面執行
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
       ......
    }

在這裡看到了吧,接收數據的時候就把任務包裝成SocketProcessor 放到線程池裡面處理,並沒有在poller線程裡面處理.接著看到SocketProcessor 類的run方法,部分代碼如下:

 @Override
        public void run() {
            NioChannel socket = ka.getSocket();
            ......
            SelectionKey key = socket.getIOChannel().keyFor(
                    socket.getPoller().getSelector());
            synchronized (socket) {
                try {
                    ......
                    if (handshake == 0) {
                        SocketState state = SocketState.OPEN;
                        if (status == null) {
                            state = getHandler().process(ka, SocketEvent.OPEN_READ); // 在這裡處理請求了
                        } else {
                            state = getHandler().process(ka, status);
                        }
                        ......
                    } 
                    ......
            }
        }
    }

因為handshake 第一次有數據的時候會為0的,因此看到getHandler().process的方法.getHandler獲取到的是ConnectionHandler的實例,看到該類的process方法,部分代碼如下:

   @Override 
        public SocketState process(SocketWrapperBase wrapper, SocketEvent status) {
           ......
            Processor processor = connections.get(socket); 

            try {
               ......
                SocketState state = SocketState.CLOSED;
                do {
                    state = processor.process(wrapper, status);//----------處理請求-------------------
                    ......
                } while ( state == SocketState.UPGRADING);

                if (state == SocketState.LONG) {
                    longPoll(wrapper, processor); 
                    if (processor.isAsync()) { 
                        getProtocol().addWaitingProcessor(processor);   
                    }
                } else if (state == SocketState.OPEN) {
                    connections.remove(socket);
                    release(processor);
                    wrapper.registerReadInterest(); 
                } else if (state == SocketState.SENDFILE) {
                    connections.remove(socket);
                    release(processor);
                } 
                ......
                return state; // 返回LONG
            } 
            ......
        }

這裡注意到processor.process(wrapper, status)的status的值為SocketEvent.OPEN_READ,接著分析process方法,部分代碼如下:

 @Override
    public SocketState process(SocketWrapperBase socketWrapper, SocketEvent status)
            throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator dispatches = null;
        do { 
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {      
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                state = SocketState.LONG;
            } else {
                state = service(socketWrapper);
            }

            if (state != SocketState.CLOSED && isAsync()) {
                state = asyncPostProcess(); // 判斷是不是異步處理額
            }
           ......
            if (dispatches == null || !dispatches.hasNext()) {
                dispatches = getIteratorAndClearDispatches();
            }
        } while (state == SocketState.ASYNC_END ||dispatches != null && state != SocketState.CLOSED);

        return state; // 返回LONG
    }

在這裡因為傳進來的status 的值為SocketEvent.OPEN_READ,使用直接看到

 else {
        state = service(socketWrapper);
}

分析service方法,該方法的實現是在Http11Processor的,部分代碼如下:

 @Override
    public SocketState service(SocketWrapperBase socketWrapper)
        throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        ......

        // 對發送和接收緩沖區做設置
        setSocketWrapper(socketWrapper);
        inputBuffer.init(socketWrapper);
        outputBuffer.init(socketWrapper);


        keepAlive = true;
        openSocket = false;
        readComplete = true;
        boolean keptAlive = false;

        while (!getErrorState().isError() && keepAlive && !isAsync() &&
             upgradeToken == null && !endpoint.isPaused()) {

            if (!inputBuffer.parseRequestLine(keptAlive)) { // 在這裡只要分析請求頭的第一行
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) { 
                   break;
                }
            }

            ......
            keptAlive = true; 

            request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
            if (!inputBuffer.parseHeaders()) { // 該方法是解析請求頭第一行意外的其他請求
                // 這兩個值需要注意一下
                openSocket = true; 
                readComplete = false; // 第一次連接進來是false
                break;
            }

            ......
            if (!getErrorState().isError()) {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                getAdapter().service(request, response); // 在這分配進去----主機匹配也是在這裡----------------
               ......

            }

            ......

        }

        if (getErrorState().isError() || endpoint.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync()) {
            return SocketState.LONG;
        } else if (isUpgrade()) {
            return SocketState.UPGRADING;
        } else {
            if (sendfileData != null) {
                return SocketState.SENDFILE;
            } else {
                if (openSocket) { 
                    if (readComplete) {
                        return SocketState.OPEN;
                    } else {
                        return SocketState.LONG;// readComplete = false;
                    }
                } else {
                    return SocketState.CLOSED;
                }
            }
        }
    }

該方法主要做了:
1,inputBuffer.parseRequestLine(keptAlive)解析請求頭的第一行信息,如GET 請求路徑如/index ,還有當前的http協議.如HTTP1.1等.
2,inputBuffer.parseHeaders()解析除第一行以外的請求信息.
3,getAdapter().service(request, response);這裡就是處理完整的請求了.
如果發送過來的數據不完整會怎麼辦呢.比如說解析第一行數據不完整的時候回怎麼辦

if (!inputBuffer.parseRequestLine(keptAlive)) { // 在這裡只要分析請求頭的第一行
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) { 
                   break;
                }
            }

parseRequestLine該方法只要沒有解析完第一行數據就會返回false,至於該方法的具體分析過程,後面有時間的時候話再寫.因為第一次調用parseRequestLine的話getParsingRequestLinePhase()會返回1,接著就會執行handleIncompleteRequestLineRead方法了,部分代碼如下:

private boolean handleIncompleteRequestLineRead() {
        openSocket = true;
        if (inputBuffer.getParsingRequestLinePhase() > 1) {
            if (endpoint.isPaused()) {
               ......
                return false;
            } else {
                readComplete = false;
            }
        }
        return true;
    }

注意到該方法把 openSocket 設置為 true了,有因為如果是第一次的話getParsingRequestLinePhase()返回的值為1,使用該方法返回的值就是true了,接著就跳出while循環了,然後service方法的返回值是什麼呢?

if (getErrorState().isError() || endpoint.isPaused()) {
            return SocketState.CLOSED;
        } else if (isAsync()) {
            return SocketState.LONG;
        } else if (isUpgrade()) {
            return SocketState.UPGRADING;
        } else {
            if (sendfileData != null) {
                return SocketState.SENDFILE;
            } else {
                if (openSocket) { 
                    if (readComplete) {
                        return SocketState.OPEN;
                    } else {
                        return SocketState.LONG;// readComplete = false;
                    }
                } else {
                    return SocketState.CLOSED;
                }
            }
        }

看到這裡,以為openSocket在handleIncompleteRequestLineRead裡面為設置為true了,然後readComplete為false,那返回的就是SocketState.LONG了.那麼回到ConnectionHandler類的process方法看到

state = processor.process(wrapper, status);

因此這時候state 的值為SocketState.LONG,使用

if (state == SocketState.LONG) {
                    longPoll(wrapper, processor); 
                    if (processor.isAsync()) { 
                        getProtocol().addWaitingProcessor(processor);   
                    }
                } else if (state == SocketState.OPEN) {
                    connections.remove(socket);
                    release(processor);
                    wrapper.registerReadInterest(); 
                } else if (state == SocketState.SENDFILE) {
                    connections.remove(socket);
                    release(processor);
                } 

走的是

longPoll(wrapper, processor);

這個分支.
longPoll的代碼如下:

protected void longPoll(SocketWrapperBase socket, Processor processor) {
            if (!processor.isAsync()) { // 
                socket.registerReadInterest();
            }
        }

接著看到registerReadInterest,代碼如下

 @Override
        public void registerReadInterest() {
            getPoller().add(getSocket(), SelectionKey.OP_READ);
        }

看到了吧,如果讀取的數據沒有讀完,那麼從新在之前的poller線程重新監聽數據.然後有數據的話就重新之前的動作.

對tomcat處理請求過程總結一下吧,就是在poller裡面監聽selector事件,如何有數據要讀的話就放在線程池裡面處理.

好了本章就先分析到這裡了,然後下一篇分析就是tomcat如何解析請求頭以及如何把對應的請求發送到對應的servlet裡面去.

  1. 上一頁:
  2. 下一頁:
熱門文章
閱讀排行版
Copyright © Android教程網 All Rights Reserved