編輯:關於Android編程
本章只分析Http11NioProtocol處理請求的過程,該方法也是目前我分析的版本默認的處理方式.
根據第一章的分析知道會在StandardService類的startInternal方法方法裡面啟動監聽,部分代碼如下:
@Override
protected void startInternal() throws LifecycleException {
......
synchronized (executors) {
for (Executor executor: executors) {
executor.start();
}
}
......
synchronized (connectorsLock) {
for (Connector connector: connectors) {
......
connector.start();
......
}
}
}
在這裡需要注意的是也啟動的線程池,該線程城池將用於處理tomcat下的所有連接請求.好了接著看到Connector 裡面的start方法,因為Connector繼承了LifecycleMBeanBase,所以最終會調用它的startInternal方法,部分代碼如下:
@Override
protected void startInternal() throws LifecycleException {
......
protocolHandler.start();
......
}
這裡的protocolHandler是什麼呢,好了接著看它的定義.找到Connector的構造方法,部分代碼如下:
public Connector() {
this(null);
}
public Connector(String protocol) {
setProtocol(protocol);
......
ProtocolHandler p = null;
try {
Class clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.newInstance();
} catch (Exception e) {
......
} finally {
this.protocolHandler = p;
}
......
}
從構造函數this.protocolHandler = p;在這裡通過反射創建了protocolHandler 的實例,那麼現在要去找protocolHandlerClassName的值.看到setProtocol方法,部分代碼如下:
public void setProtocol(String protocol) {
......
if ("HTTP/1.1".equals(protocol) || protocol == null) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
}
} else if ("AJP/1.3".equals(protocol)) {
if (aprConnector) {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
} else {
setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
}
} else {
setProtocolHandlerClassName(protocol);
}
}
注意通過Server.xml創建的時候,
是沒有調用setProtocolHandlerClassName設置任何值的因此,Connector 的protocolHandlerClassName屬性使用的是默認值,
protected String protocolHandlerClassName =
"org.apache.coyote.http11.Http11NioProtocol";
好了接著分析setProtocol方法,因為該方法的protocol等於null,並且aprConnector的值為false,所以最後protocolHandlerClassName的值還是設置為org.apache.coyote.http11.Http11NioProtocol.
那麼現在就去看一下Http11NioProtocol的做了什麼吧.
看到該類的構造方法:
public Http11NioProtocol() {
super(new NioEndpoint());
}
好了回到Connector類的startInternal方法中,知道該方法調用了Http11NioProtocol的start方法,分析該類的時候它沒有start方法,於是乎找到了它的父類AbstractProtocol的start方法裡面,部分代碼如下:
@Override
public void start() throws Exception {
......
endpoint.start();
......
// 這個是1秒檢測servlet的異步請求有沒有死掉或者超時
asyncTimeout = new AsyncTimeout();
Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
timeoutThread.setPriority(endpoint.getThreadPriority());
timeoutThread.setDaemon(true);
timeoutThread.start();
}
注意到endpoint.start();剛才在分析Http11NioProtocol的構造方法的時候,有這麼一句super(new NioEndpoint());所以很容易知道,這個endpoint就是NioEndpoint類啦.那麼接著看到它的start方法,該方法的定義是在其父類AbstractEndpoint裡面定義的,代碼如下:
public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}
該方法主要調用了bind和startInternal方法.先看一下bind方法,bind方法是在NioEndpoint裡面定義的,部分代碼如下:
@Override
public void bind() throws Exception {
serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
serverSock.socket().bind(addr,getBacklog());
serverSock.configureBlocking(true); //mimic APR behavior
serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());
if (acceptorThreadCount == 0) {
acceptorThreadCount = 1; // 說明accptor的線程只有一個
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
stopLatch = new CountDownLatch(pollerThreadCount);
// Initialize SSL if needed
initialiseSsl(); // 這個是支持SSL的
selectorPool.open(); // 開啟一個selector池
}
好啦,熟悉java socket編程的話,看到該方法是不是特別熟悉呢.其實該方法就是綁定監口的.還有這裡處理accept請求的線程只有一個,也許有人會問只有一條線程處理accept請求,性能會不會跟不上.其實,根據我分析多個有名網絡庫基本都是一條線程,更甚者把收發數據的工作也放在處理accept請求的線程裡面,但是性能還是超高的.其實只要了解socket收發數據的過程,就很容易理解為什麼要這麼做.好啦,這方面的知識,大家可以去搜索一下.回到正題.
selectorPool.open();這個open的代碼如下:
public void open() throws IOException {
enabled = true;
getSharedSelector();
if (SHARED) {
blockingSelector = new NioBlockingSelector();
blockingSelector.open(getSharedSelector());
}
}
這裡SHARED的意思是,如果為真的話,那麼全部的請求共用一個selector.繼續分析startInternal方法,
@Override
public void startInternal() throws Exception {
......
// 這部分主要創建一些類的緩存池的
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// 創建線程池
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
......
// getPollerThreadCount()方法返回的值就是在bind方法裡面提到的pollerThreadCount
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i
從該方知道,根據pollerThreadCount 的值創建了對應數量的Poller線程,該線程主要是用來做什麼的呢,具體過程後面分析,在這裡就說一下它的作用:主要用來接收客戶端發送過來的數據的,也就是read事件.
接著看到startAcceptorThreads,該方法的部分代碼如下:
protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];
for (int i = 0; i < count; i++) {
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}
從這裡知道在bind方法裡面知道acceptorThreadCount 的值為1,因此int count = getAcceptorThreadCount()的值為1,就是只創建了一條Acceptor的線程,那麼看到 createAcceptor方法,內部主要是創建了Acceptor的實例,Acceptor是Runnable的子類,那麼看到run方法,部分代碼如下:
@Override
public void run() {
......
while (running) {
......
state = AcceptorState.RUNNING;
......
SocketChannel socket = null;
......
socket = serverSock.accept();
......
if (running && !paused) {
if (!setSocketOptions(socket)) {// 在這裡處理accept請求的
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
......
}
}
是不是看到熟悉的代碼了呢.這個就是接受socket的連接的請求的了,接著看到setSocketOptions方法,因為該類主要處理的就是連接進來之後的處理,部分代碼如下:
protected boolean setSocketOptions(SocketChannel socket) {
......
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
getPoller0().register(channel); // 注冊到之前的poller
......
}
這個方法的實現,大家應該也很熟悉吧.在這裡看到NioChannel channel = nioChannels.pop,還記得startInternal方法裡面創建一些類的緩存吧,就是復用了裡面的實例了.接著看到getPoller0().register(channel),看一下getPoller0()方法,部分代碼如下:
public Poller getPoller0() {
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}
該方法就是獲取startInternal創建的Poller實例,該方法目的就是把連接進來的socket均衡的分配到每一個Poller線程裡面去,接著看到Poller類的register方法,代碼如下:
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getSoTimeout());
ka.setWriteTimeout(getSoTimeout());
PollerEvent r = eventCache.pop();
ka.interestOps(SelectionKey.OP_READ);// 這裡就是設置selector的監聽類型為read
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);
}
其實該方法主要是做一些配置,並把請求包裝成一個PollerEvent 類,扔到Poller的處理任務隊列裡面.
執行完register方法之後,accptor就返回繼續監聽端口了,後面的socket的數據接收就交給Poller線程了.
PollerEvent 也是runnable的一個具體實現,看到run方法,部分代碼如下:
@Override
public void run() {
if (interestOps == OP_REGISTER) {
......
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
......
} else {
......
}
}
該方法主要作用就是把socket注冊到一個selector裡面去,並設置OP_READ.
到這Connector的startInternal方法就分析完啦.
下面接著分析有數據到來的時候是怎麼處理的.從Poller的構造方法知道,每一個Poller都會擁有自己的一個Selector,代碼如下
public Poller() throws IOException {
this.selector = Selector.open();
}
接著看到Poller的run方法,部分代碼如下:
@Override
public void run() {
while (true) {
......
hasEvents = events();// 這個event全部處理完所有的任務
if (wakeupCounter.getAndSet(-1) > 0) {
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0); // 任務隊列為0,後面添加的話需要喚醒一下
Iterator iterator =
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}
......
}
}
在這裡events這個方法,就處理了register添加到該隊列裡面的PollerEvent 任務了,過程較簡單就不具體分析了.直接看到processKey的方法,部分代碼如下:
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
......
if ( isWorkerAvailable() ) {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
......
}
}
unreg(sk, attachment, sk.readyOps());
為什麼要注銷OP_READ的監聽呢,因為每次有數據來的時候,都會把接收數據的請求放在之前創建的線程池裡面接收數據,如果不注銷的時候,就會造成並發接收數據的情況.
看到processSocket的方法,部分代碼如下:
protected boolean processSocket(NioSocketWrapper attachment, SocketEvent status, boolean dispatch) {
......
SocketProcessor sc = processorCache.pop(); // 這裡是解析器
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor(); // 這裡沒一個請求都會放在線程池裡面執行
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
......
}
在這裡看到了吧,接收數據的時候就把任務包裝成SocketProcessor 放到線程池裡面處理,並沒有在poller線程裡面處理.接著看到SocketProcessor 類的run方法,部分代碼如下:
@Override
public void run() {
NioChannel socket = ka.getSocket();
......
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());
synchronized (socket) {
try {
......
if (handshake == 0) {
SocketState state = SocketState.OPEN;
if (status == null) {
state = getHandler().process(ka, SocketEvent.OPEN_READ); // 在這裡處理請求了
} else {
state = getHandler().process(ka, status);
}
......
}
......
}
}
}
因為handshake 第一次有數據的時候會為0的,因此看到getHandler().process的方法.getHandler獲取到的是ConnectionHandler的實例,看到該類的process方法,部分代碼如下:
@Override
public SocketState process(SocketWrapperBase wrapper, SocketEvent status) {
......
Processor processor = connections.get(socket);
try {
......
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);//----------處理請求-------------------
......
} while ( state == SocketState.UPGRADING);
if (state == SocketState.LONG) {
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
connections.remove(socket);
release(processor);
}
......
return state; // 返回LONG
}
......
}
這裡注意到processor.process(wrapper, status)的status的值為SocketEvent.OPEN_READ,接著分析process方法,部分代碼如下:
@Override
public SocketState process(SocketWrapperBase socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else {
state = service(socketWrapper);
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess(); // 判斷是不是異步處理額
}
......
if (dispatches == null || !dispatches.hasNext()) {
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||dispatches != null && state != SocketState.CLOSED);
return state; // 返回LONG
}
在這裡因為傳進來的status 的值為SocketEvent.OPEN_READ,使用直接看到
else {
state = service(socketWrapper);
}
分析service方法,該方法的實現是在Http11Processor的,部分代碼如下:
@Override
public SocketState service(SocketWrapperBase socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
......
// 對發送和接收緩沖區做設置
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
while (!getErrorState().isError() && keepAlive && !isAsync() &&
upgradeToken == null && !endpoint.isPaused()) {
if (!inputBuffer.parseRequestLine(keptAlive)) { // 在這裡只要分析請求頭的第一行
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
......
keptAlive = true;
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
if (!inputBuffer.parseHeaders()) { // 該方法是解析請求頭第一行意外的其他請求
// 這兩個值需要注意一下
openSocket = true;
readComplete = false; // 第一次連接進來是false
break;
}
......
if (!getErrorState().isError()) {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response); // 在這分配進去----主機匹配也是在這裡----------------
......
}
......
}
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileData != null) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;// readComplete = false;
}
} else {
return SocketState.CLOSED;
}
}
}
}
該方法主要做了:
1,inputBuffer.parseRequestLine(keptAlive)解析請求頭的第一行信息,如GET 請求路徑如/index ,還有當前的http協議.如HTTP1.1等.
2,inputBuffer.parseHeaders()解析除第一行以外的請求信息.
3,getAdapter().service(request, response);這裡就是處理完整的請求了.
如果發送過來的數據不完整會怎麼辦呢.比如說解析第一行數據不完整的時候回怎麼辦
if (!inputBuffer.parseRequestLine(keptAlive)) { // 在這裡只要分析請求頭的第一行
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}
parseRequestLine該方法只要沒有解析完第一行數據就會返回false,至於該方法的具體分析過程,後面有時間的時候話再寫.因為第一次調用parseRequestLine的話getParsingRequestLinePhase()會返回1,接著就會執行handleIncompleteRequestLineRead方法了,部分代碼如下:
private boolean handleIncompleteRequestLineRead() {
openSocket = true;
if (inputBuffer.getParsingRequestLinePhase() > 1) {
if (endpoint.isPaused()) {
......
return false;
} else {
readComplete = false;
}
}
return true;
}
注意到該方法把 openSocket 設置為 true了,有因為如果是第一次的話getParsingRequestLinePhase()返回的值為1,使用該方法返回的值就是true了,接著就跳出while循環了,然後service方法的返回值是什麼呢?
if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileData != null) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;// readComplete = false;
}
} else {
return SocketState.CLOSED;
}
}
}
看到這裡,以為openSocket在handleIncompleteRequestLineRead裡面為設置為true了,然後readComplete為false,那返回的就是SocketState.LONG了.那麼回到ConnectionHandler類的process方法看到
state = processor.process(wrapper, status);
因此這時候state 的值為SocketState.LONG,使用
if (state == SocketState.LONG) {
longPoll(wrapper, processor);
if (processor.isAsync()) {
getProtocol().addWaitingProcessor(processor);
}
} else if (state == SocketState.OPEN) {
connections.remove(socket);
release(processor);
wrapper.registerReadInterest();
} else if (state == SocketState.SENDFILE) {
connections.remove(socket);
release(processor);
}
走的是
longPoll(wrapper, processor);
這個分支.
longPoll的代碼如下:
protected void longPoll(SocketWrapperBase socket, Processor processor) {
if (!processor.isAsync()) { //
socket.registerReadInterest();
}
}
接著看到registerReadInterest,代碼如下
@Override
public void registerReadInterest() {
getPoller().add(getSocket(), SelectionKey.OP_READ);
}
看到了吧,如果讀取的數據沒有讀完,那麼從新在之前的poller線程重新監聽數據.然後有數據的話就重新之前的動作.
對tomcat處理請求過程總結一下吧,就是在poller裡面監聽selector事件,如何有數據要讀的話就放在線程池裡面處理.
好了本章就先分析到這裡了,然後下一篇分析就是tomcat如何解析請求頭以及如何把對應的請求發送到對應的servlet裡面去.
一、首先說明:藍牙通信必須用手機測試,因為avd裡沒有相關的硬件,會報錯!好了,看看最後的效果圖:二、概述:1.判斷是否支持BluetoothBluetoothAdapt
按照大神的思路寫出了一個流式布局,所有的東西都是難者不會會者不難,當自己能自定義流式布局的時候就會覺得這東西原來很簡單了。如果各位小伙伴也看過那篇文章的話,應該知道自定義
1 背景Android系統提供了很多豐富的API去實現UI的2D與3D動畫,最主要的劃分可以分為如下幾類:View Animation: 視圖動畫在古老的Android版
Android NougatAndroid 7.0 經過5個開發者預覽版本的改善,終於在8.22日正式推送,並確定版本名為Nougat(牛軋糖)根據官方的介紹,Andro