网络通信

Soul Lv2

今天我们来聊聊网络通信的部分,我们知道的是Spring框架使用的是一个名为DispatchServlet的类作为网络通信的处理器,而这个类实际上来自于TomCat中的Servlet,我们先一层层推进看看这些东西到底是怎么实现的

DispatchServlet分析

从流程上看,首先存在一个简单的服务器用于监听端口,一般是TomCat,在TomCat发现请求后会转发给DisPatchServlet中的doService方法,其实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected void doService(HttpServletRequest request, HttpServletResponse response) throws Exception {  
this.logRequest(request);
Map<String, Object> attributesSnapshot = null;
if (WebUtils.isIncludeRequest(request)) {
attributesSnapshot = new HashMap();
Enumeration<?> attrNames = request.getAttributeNames();
while(attrNames.hasMoreElements()) {
String attrName = (String)attrNames.nextElement();
if (this.cleanupAfterInclude || attrName.startsWith("org.springframework.web.servlet")) {
attributesSnapshot.put(attrName, request.getAttribute(attrName));
}
}
}
request.setAttribute(WEB_APPLICATION_CONTEXT_ATTRIBUTE, this.getWebApplicationContext());
request.setAttribute(LOCALE_RESOLVER_ATTRIBUTE, this.localeResolver);
request.setAttribute(THEME_RESOLVER_ATTRIBUTE, this.themeResolver);
request.setAttribute(THEME_SOURCE_ATTRIBUTE, this.getThemeSource());
if (this.flashMapManager != null) {
FlashMap inputFlashMap = this.flashMapManager.retrieveAndUpdate(request, response);
if (inputFlashMap != null) {
request.setAttribute(INPUT_FLASH_MAP_ATTRIBUTE, Collections.unmodifiableMap(inputFlashMap));
}
request.setAttribute(OUTPUT_FLASH_MAP_ATTRIBUTE, new FlashMap());
request.setAttribute(FLASH_MAP_MANAGER_ATTRIBUTE, this.flashMapManager);
}
RequestPath previousRequestPath = null;
if (this.parseRequestPath) {
previousRequestPath = (RequestPath)request.getAttribute(ServletRequestPathUtils.PATH_ATTRIBUTE);
ServletRequestPathUtils.parseAndCache(request);
}
//上面的一大堆都是缓存机制和纠错机制,有兴趣的可以自己学习,其实很简单
try {
this.doDispatch(request, response);//这里是重点,开始分发
} finally {
if (!WebAsyncUtils.getAsyncManager(request).isConcurrentHandlingStarted() && attributesSnapshot != null) {
this.restoreAttributesAfterInclude(request, attributesSnapshot);
}
if (this.parseRequestPath) {
ServletRequestPathUtils.setParsedRequestPath(previousRequestPath, request);
}
}
}

也就是说真正的分发出现在doDispatch方法中,我们再去这个方法看一看发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {  
HttpServletRequest processedRequest = request;
HandlerExecutionChain mappedHandler = null;
boolean multipartRequestParsed = false;
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
try {
try {
ModelAndView mv = null;
Exception dispatchException = null;
try {
processedRequest = this.checkMultipart(request);//检查是否是Multipart类型的请求
multipartRequestParsed = processedRequest != request;
//在这一步获得一个包含了我们自定义的Controller和拦截器的处理器
mappedHandler = this.getHandler(processedRequest);
if (mappedHandler == null) {
this.noHandlerFound(processedRequest, response);
return;
}
HandlerAdapter ha = this.getHandlerAdapter(mappedHandler.getHandler());
String method = request.getMethod();
boolean isGet = HttpMethod.GET.matches(method);//判断是否的Get类型的请求
if (isGet || HttpMethod.HEAD.matches(method)) {
//对方法为Get的请求,使用getLastModified方法检查锁请求的资源是否发生变动,如果资源没有发生修改则返回304,说明资源没有发生变动,要求使用客户端缓存的资源
long lastModified = ha.getLastModified(request, mappedHandler.getHandler());
if ((new ServletWebRequest(request, response)).checkNotModified(lastModified) && isGet) {
return;
}
}
//这里开始调用拦截器的prehandle方法,如果方法返回false那么直接返回拒绝执行
if (!mappedHandler.applyPreHandle(processedRequest, response)) {
return;
}
//在这里真正执行了处理器中的内容,获得了一个ModeAndView对象
mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
if (asyncManager.isConcurrentHandlingStarted()) {
return;
}
//这一步用于检查是否包含View视图,如果包含那么将视图用request命名
this.applyDefaultViewName(processedRequest, mv);
//这一步开始再次调用拦截器,处理拦截器中的postHandle
mappedHandler.applyPostHandle(processedRequest, response, mv);
} catch (Exception ex) {
dispatchException = ex;
} catch (Throwable err) {
dispatchException = new ServletException("Handler dispatch failed: " + err, err);
}
//这一步实际上将结果返回
this.processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException);
} catch (Exception ex) {
triggerAfterCompletion(processedRequest, response, mappedHandler, ex);
} catch (Throwable err) {
triggerAfterCompletion(processedRequest, response, mappedHandler, new ServletException("Handler processing failed: " + err, err));
}
} finally {
if (asyncManager.isConcurrentHandlingStarted()) {
if (mappedHandler != null) {
mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);
}
} else if (multipartRequestParsed) {
this.cleanupMultipart(processedRequest);
}
}
}

好的,我们现在发现追到可processDispatchResult方法,我们继续查看,看看这个方法干了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void processDispatchResult(HttpServletRequest request, HttpServletResponse response, @Nullable HandlerExecutionChain mappedHandler, @Nullable ModelAndView mv, @Nullable Exception exception) throws Exception {  
boolean errorView = false;
//公共的异常处理流程
if (exception != null) {
if (exception instanceof ModelAndViewDefiningException) {
ModelAndViewDefiningException mavDefiningException = (ModelAndViewDefiningException)exception;
this.logger.debug("ModelAndViewDefiningException encountered", exception);
mv = mavDefiningException.getModelAndView();
} else {
Object handler = mappedHandler != null ? mappedHandler.getHandler() : null;
mv = this.processHandlerException(request, response, handler, exception);
errorView = mv != null;
}
}
//尝试对可能存在的视图进行渲染
if (mv != null && !mv.wasCleared()) {
//调用渲染方法,如果存在视图名则进行渲染否则什么都不做,将渲染结果写到response中
this.render(mv, request, response);
if (errorView) {
WebUtils.clearErrorRequestAttributes(request);
}
} else if (this.logger.isTraceEnabled()) {
this.logger.trace("No view rendering, null ModelAndView returned.");
}
//在不存在异步处理机制的情况下执行,异步处理有一套独立的机制
if (!WebAsyncUtils.getAsyncManager(request).isConcurrentHandlingStarted()) {
if (mappedHandler != null) {
//调用拦截器中的AfterCompletion方法
mappedHandler.triggerAfterCompletion(request, response, (Exception)null);
}
}
}

到这里Spring的处理流程就已经结束了,剩下的部分由TomCat完成,将对应的相应返回。

TomCat机制分析

通过上面的那些分析,我们已经走完了Spring在网络通信中负责的任务,接下来我们就要去TomCat中看一看了,研究一下在实际的网络通信流程中TomCat到底在干什么。

接下来的内容需要你对 java 的异步 IO 机制有一定的了解,可以参考
NIO 机制

TomCat有几个核心组件,分别是Connector,Server,Service.当我们通过脚本启动TomCat时这几个组件都会开始运行,其中Connecter负责监听端口,所以我们先从这个组件开始

在启动这个组件时会调用下面的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected void startInternal() throws LifecycleException {  
String id = this.protocolHandler != null ? this.protocolHandler.getId() : null;
//一连串的异常检查
if (id == null && this.getPortWithOffset() < 0) {
throw new LifecycleException(sm.getString("coyoteConnector.invalidPort", new Object[]{this.getPortWithOffset()}));
} else {
this.setState(LifecycleState.STARTING);
if (this.protocolHandler != null && this.service != null) {
this.protocolHandler.setUtilityExecutor(this.service.getServer().getUtilityExecutor());//获取一个用于处理非核心任务的线程池
}
try {
this.protocolHandler.start();//启动协议处理器
} catch (Exception e) {
throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
}
}
}

我们注意到这个方法的实质就是在进行了几个基本的检查之后启动了一个叫做 protocolHandler 的对象,这个对象被称为协议处理器,这个处理器有多个实现用来适配不同的协议如HTTP1.0,HTTP1.1,AJP等,我们这里以当下比较常用的HTTP1.1为例,HTTP1.1对应的协议处理器叫做 HTTP11NIOProtocol,但你会发现里面并没有start方法的实现,往上找几层继承后你会来到一个叫做 AbstractProtocol 的类,这个类中实现了start方法

1
2
3
4
5
6
7
8
public void start() throws Exception {  
if (this.getLog().isInfoEnabled()) {
this.getLog().info(sm.getString("abstractProtocolHandler.start", new Object[]{this.getName()}));
this.logPortOffset();
}//获取日志对象记录日志
this.endpoint.start();//endpoint是TomCat自己实现的线程池,启动线程池
this.monitorFuture = this.getUtilityExecutor().scheduleWithFixedDelay(() -> this.startAsyncTimeout(), 0L, 60L, TimeUnit.SECONDS);//调用之前Connecter设置的非核心任务的线程池,添加一个定时任务,作用是每60秒启动检查是否有异步任务超时
}

这部分的核心是启动了线程池,我们继续前进看看这个线程池的启动方法

1
2
3
4
5
6
7
8
//start方法实现在AbstractEndPoint类
public final void start() throws Exception {
if (this.bindState == AbstractEndpoint.BindState.UNBOUND) {//检查端口绑定状态,如果等于没有绑定
this.bindWithCleanup();//进行绑定并执行必要的清理工作
this.bindState = AbstractEndpoint.BindState.BOUND_ON_START;
}
this.startInternal();//内部启动
}

我们先看看这个绑定端口的方法,这个方法的实现有两个版本,分别是NioEndPointNio2EndPoint,这两个类分别使用了java的1.0和2.0的NIO API,我们这里统一看2.0版本的API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//上面那个有清理的bindWithCleanup就是在下面这个方法外套了一个异常捕获
public void bind() throws Exception {
if (this.getExecutor() == null) {
this.createExecutor();//如果线程池不存在那么创建一个新的线程池
}
if (this.getExecutor() instanceof ExecutorService) {
this.threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService)this.getExecutor());
}//创建一个异步服务套接字组共享上面创建的线程池,负责接下来的实际网络通信
if (!this.internalExecutor) {
log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));//检查线程池是否是TomCat内部创建的,如果不是则做出警告
}
this.serverSock = AsynchronousServerSocketChannel.open(this.threadGroup);//打开上面的套接字组
this.socketProperties.setProperties(this.serverSock);//配置套接字相关的属性
InetSocketAddress addr = new InetSocketAddress(this.getAddress(), this.getPortWithOffset());//创建监听端口的相关信息
this.serverSock.bind(addr, this.getAcceptCount());//绑定端口
this.initialiseSsl();//初始化ssl相关内容
}

我们暂时就到这一步,有兴趣的话可以自己进一步看看异步套接字组的实现,接下来我们看看另一个内部启动方法,我们继续选择NIO2版本的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void startInternal() throws Exception {  
if (!this.running) {
this.allClosed = false;
this.running = true;
this.paused = false;
if (this.socketProperties.getProcessorCache() != 0) {
this.processorCache = new SynchronizedStack(128, this.socketProperties.getProcessorCache());
}//创建一个线程安全的栈作为处理器缓存
int actualBufferPool = this.socketProperties.getActualBufferPool(this.isSSLEnabled() ? this.getSniParseLimit() * 2 : 0);
if (actualBufferPool != 0) {
this.nioChannels = new SynchronizedStack(128, actualBufferPool);
}//创建缓存池来缓存网络IO中的建立的通道
if (this.getExecutor() == null) {
this.createExecutor();
}
this.initializeConnectionLatch();//创建一个连接计数器
this.startAcceptorThread();//初始化监听线程
}
}

我们继续往下追查监听线程的任务,

1
2
3
4
5
6
7
8
protected void startAcceptorThread() {  
if (this.acceptor == null) {
this.acceptor = new Nio2Acceptor(this);
this.acceptor.setThreadName(this.getName() + "-Acceptor");
}
this.acceptor.state = AcceptorState.RUNNING;
this.getExecutor().execute(this.acceptor);
}//看来下一个目标是acceptor

我直接贴对应的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {  
if (!Nio2Endpoint.this.isPaused()) {
try {
Nio2Endpoint.this.countUpOrAwaitConnection();//计数或等待连接,如果连接数没有达到最大那么计数器加一,如果达到最大则当前线程等待
} catch (InterruptedException var2) {
}
if (!Nio2Endpoint.this.isPaused()) {
Nio2Endpoint.this.serverSock.accept((Object)null, this);// 监听端口然后处理信息
} else {
this.state = AcceptorState.PAUSED;
}
} else {
this.state = AcceptorState.PAUSED;
}
}

好吧,我们的下一步是accept方法,这个方法在叠了几层继承后有这样的实现,看起来有点复杂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Future<AsynchronousSocketChannel> implAccept(Object att, CompletionHandler<AsynchronousSocketChannel, Object> handler) {  
if (!this.isOpen()) {//如果没有打开,那么直接进入异常处理流程
Throwable e = new ClosedChannelException();
if (handler == null) {
return CompletedFuture.withFailure(e);
} else {
Invoker.invoke(this, handler, att, (Object)null, e);
return null;
}
} else if (this.localAddress == null) {
throw new NotYetBoundException();
} else if (this.isAcceptKilled()) {
throw new RuntimeException("Accept not allowed due cancellation");
} else if (!this.accepting.compareAndSet(false, true)) {
throw new AcceptPendingException();
} else {//从这里开始进入正常情况的处理流程
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
Throwable exc = null;
try {
this.begin();//这里只是简单的为接下来的操作加了一个读锁
int n = Net.accept(this.fd, newfd, isaa);//这个方法是使用C++实现的,看不了源码,比较遗憾,但就是这个方法实现了对端口的监听,其中this.fd就是对之前绑定的端口的描述,返回的n是接收到的可用连接数
if (n == -2) {//如果n是-2,当前不存在可用的连接事件
PendingFuture<AsynchronousSocketChannel, Object> result = null;
synchronized(this.updateLock) {
if (handler == null) {
this.acceptHandler = null;
result = new PendingFuture(this);
this.acceptFuture = result;
} else {
this.acceptHandler = handler;
this.acceptAttachment = att;
}
this.acceptAcc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.acceptPending = true;
}
this.port.startPoll(this.fdVal, Net.POLLIN);
//这里是通过对系统底层的异步IO机制注册事件监听,相当于向选择器注册对对应事件的监听
PendingFuture var8 = result;
//既然不存在可用的监听事件,那么返回一个PendingFuture表示待处理并结束当前线程的运行
return var8;
}
} catch (Throwable var17) {
Throwable x = var17;
if (var17 instanceof ClosedChannelException) {
x = new AsynchronousCloseException();
}
exc = x;
} finally {
this.end();//释放读锁
}
AsynchronousSocketChannel child = null;
if (exc == null) {
try {
child = this.finishAccept(newfd, isaa[0], (AccessControlContext)null);
//这里创建了一个新的异步通道,将由这个异步通道负责接下来对连接的处理
} catch (Throwable x) {
exc = x;
}
}
this.enableAccept();//重置标志位,运行继续创建连接
if (handler == null) {//如果没有提供完成处理器,那么返回一个表示完成的future对象
return CompletedFuture.withResult(child, exc);
} else {
//如果存在回调处理器那么调用处理器进行处理,注意,这里的回调并不是发生在当前线程
Invoker.invokeIndirectly(this, handler, att, child, exc);
return null;
}
}
}

这里要特别说明两个关键点

  • 文件描述符:上面的方法中使用了文件描述符来指代一个网络通信,这来自于unix系统的万物皆文件的思想,注意,这个类本身就叫做UnixAsynchronousServerSocketChannelImpl,在操作系统中所有的文件都有一个非负的整数作为标识,而万物皆文件,自然而然的网络连接也是一个文件,所以我们通过这个数字来访问网络通信(注意,这是 Linux 版本的实现方式,在 Windows 上存在另一套实现方式)
  • 第二个问题是我们一直捋到这里都没有发现一个循环机制,那么为什么可以持续监听端口?我们这里选择的是nio2版本的处理流程,如果你选择去查看nio1.0版本的流程,你会发现一个while循环,但在新的版本中使用了另一种的办法:回调机制
    我们可以从头捋一捋这一套机制,首先回到 Nio2EndPointAcceptor 这个类,这个类的声明如下
1
2
protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>  
implements CompletionHandler<AsynchronousSocketChannel, Void>

可以看到这个类实现了一个叫做 CompletionHandler 的接口,也就是我们前面提到的完成处理器,结构如下

1
2
3
4
5
public interface CompletionHandler<V, A> {  
void completed(V var1, A var2);

void failed(Throwable var1, A var2);
}

这个接口的实现规定了当某种操作结束时针对是否失败的不同情况的不同处理方法,而 Acceptor 中对 Completed 方法的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
 public void completed(AsynchronousSocketChannel socket,  
Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket if (isRunning() && !isPaused()) {
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else if (getConnectionCount() < getMaxConnections()) {
try {
// This will not block
countUpOrAwaitConnection();//如果当前连接数小于最大值则当前连接数加1,否则阻塞
} catch (InterruptedException e) {
// Ignore
}
serverSock.accept(null, this); //看,这里有一次调用了accept方法
} else {
// Accept again on a new thread since countUpOrAwaitConnection may block
getExecutor().execute(this);
}
//
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);
}
}

还记得吗,Acceptor 的 run 方法实现其实也是调用了 serverSock.accept(null, this) 方法,注意,这个方法的两个参数分别如下

1
public abstract <A> void accept(A var1, CompletionHandler<AsynchronousSocketChannel, ? super A> var2);

也就是说这里将自己做回完成处理器传入,进一步的,这个方法的实现是上面的 implAccept,这个方法在通道正常打开的情况下执行下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
FileDescriptor newfd = new FileDescriptor();  
InetSocketAddress[] isaa = new InetSocketAddress[1];
Throwable exc = null;
try {
this.begin();//这里只是简单的为接下来的操作加了一个读锁
int n = Net.accept(this.fd, newfd, isaa);
/*
*这个方法时是真正的连接方法,根据文件描述符去查询是否有可用的连接,这个方法时基于异步的NIO机制实现的,不会发生阻
*塞,其中返回的n是发现的可用的连接事件,及OP_Accept事件
*/
if (n == -2) {//如果n是-2,当前不存在可用的连接事件
PendingFuture<AsynchronousSocketChannel, Object> result = null;
synchronized(this.updateLock) {
if (handler == null) {
this.acceptHandler = null;
result = new PendingFuture(this);
this.acceptFuture = result;
} else {
this.acceptHandler = handler;
this.acceptAttachment = att;
}
this.acceptAcc = System.getSecurityManager() == null ? null : AccessController.getContext();
this.acceptPending = true;
}
this.port.startPoll(this.fdVal, Net.POLLIN);
//这个方法是阻塞的,直到出现对应的事件才会继续执行
PendingFuture var8 = result;
//既然不存在可用的监听事件,那么返回一个PendingFuture表示待处理并结束当前线程的运行
return var8;
}

这里有一点需要解释:既然 startPoll 方法是阻塞的,然后又返回了一个 PendingFuture,但在更上层的 completeed 和 run 方法中没有对返回值有任何处理呢?这源于对操作系统底层的回调机制的的处理,在 startPoll 方法中,向操作系统注册了对对应事件的监听,如果发生了对应事件,操作系统将会调用对应的回调方法将连接加入通道组,这个过程发生在操作系统中,所以在代码中不可见。
当然,也存在当前有可用连接的情况,此时向下执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
AsynchronousSocketChannel child = null;  
if (exc == null) {
try {
child = this.finishAccept(newfd, isaa[0], (AccessControlContext)null);
} catch (Throwable x) {
exc = x;
}
}

this.enableAccept();
if (handler == null) {
return CompletedFuture.withResult(child, exc);
} else {
Invoker.invokeIndirectly(this, handler, att, child, exc);
return null;
}

我们可以先看看这个 finishAccept 方法到底做了些什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
private AsynchronousSocketChannel finishAccept(FileDescriptor newfd, final InetSocketAddress remote, AccessControlContext acc) throws IOException, SecurityException {  
AsynchronousSocketChannel ch = null;

try {
ch = new UnixAsynchronousSocketChannelImpl(this.port, newfd, remote);
//创建一个新的异步通道,三个参数分别为绑定的端口,表示新连接的文件描述符,以及连接的相关信息remote
} catch (IOException x) {
nd.close(newfd);
throw x;
}
//下面是对安全管理的一些设置,我们暂且略过
try {
if (acc != null) {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Void run() {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());
}

return null;
}
}, acc);
} else {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());
}
}

return ch;
} catch (SecurityException var8) {
try {
ch.close();
} catch (Throwable suppressed) {
var8.addSuppressed(suppressed);
}

throw var8;
}
}

然后还有一个值得一看的是 Invoker 的执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static <V, A> void invokeIndirectly(AsynchronousChannel channel, final CompletionHandler<V, ? super A> handler, final A attachment, final V result, final Throwable exc) {  
try {
((Groupable)channel).group().executeOnPooledThread(new Runnable() {
public void run() {//可以看到在这里使用了对应的线程池在新的线程中执行了下面的任务
GroupAndInvokeCount thisGroupAndInvokeCount = (GroupAndInvokeCount)Invoker.myGroupAndInvokeCount.get();
if (thisGroupAndInvokeCount != null) {
thisGroupAndInvokeCount.setInvokeCount(1);
}

Invoker.invokeUnchecked(handler, attachment, result, exc);
//这个方法就是直接调用了completed方法
}
});
} catch (RejectedExecutionException var6) {
throw new ShutdownChannelGroupException();
}
}

走到这一步,一个连接就被建立了起来,那么接下来了,是谁去处理连接后的任务呢,我们需要再往回倒一倒,看看 completed 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override  
public void completed(AsynchronousSocketChannel socket,
Void attachment) {
// Successful accept, reset the error delay
errorDelay = 0;
// Continue processing the socket on the current thread
// Configure the socket if (isRunning() && !isPaused()) {
if (getMaxConnections() == -1) {
serverSock.accept(null, this);
} else if (getConnectionCount() < getMaxConnections()) {
try {
// This will not block
countUpOrAwaitConnection();
} catch (InterruptedException e) {
// Ignore
}
serverSock.accept(null, this);
} else {
// Accept again on a new thread since countUpOrAwaitConnection may block
getExecutor().execute(this);
}
//注意这里,setSocketOptions中的参数为刚刚建立的连接,也就是说这里有对刚建立的连接的进一步处理
if (!setSocketOptions(socket)) {
closeSocket(socket);
}
} else {
if (isRunning()) {
state = AcceptorState.PAUSED;
}
destroySocket(socket);
}
}

所以我们看看 setSocketOption 方法的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override  
protected boolean setSocketOptions(AsynchronousSocketChannel socket) {
Nio2SocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
Nio2Channel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}//这个nioChannels就是在EndPoint启动时创建的通道缓存
//如果没可用的通道就创建一个,如果有那么直接用
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNio2Channel(bufhandler, this);
} else {

channel = new Nio2Channel(bufhandler);
}
}
//创建一个SocketWrapper
Nio2SocketWrapper newWrapper = new Nio2SocketWrapper(channel, this);
//将刚刚创建的连接通道打包进去
channel.reset(socket, newWrapper);
//connections是一个Map,以当前连接的异步通道为键,新打包的Wrapper为值放进去
connections.put(socket, newWrapper);
socketWrapper = newWrapper;

// Set socket properties
socketProperties.setProperties(socket);

socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(Nio2Endpoint.this.getMaxKeepAliveRequests());
// Continue processing on the same thread as the acceptor is async
//开始对通道中的信息进行处理,监听可用的读事件
return processSocket(socketWrapper, SocketEvent.OPEN_READ, false);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.socketOptionsError"), t);
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}

顺其自然的我们继续找 processSocket 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean processSocket(SocketWrapperBase<S> socketWrapper,  
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
//获取缓存的处理器Processor,如果不存在那么创建一个
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
//如果处理器存在那么重新将处理器与当前的wrapper关联
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
//如果要求分发,在线程池执行,否则直接在当前线程执行
if (dispatch && executor != null) {
executor.execute(sc);
} else {
//刚刚的setSocketOption用的是false,直接在当前线程执行
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

那么刚刚 run 的是什么呢

1
2
3
4
5
6
7
8
9
10
11
12
public final void run() {  
Lock lock = socketWrapper.getLock();
lock.lock();
try {
if (socketWrapper.isClosed()) {
return;
}
doRun();
} finally {
lock.unlock();
}
}

好吧,加个锁,然后继续 run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
    @Override  
protected void doRun() {
boolean launch = false;
try {
int handshake;

try {
//这里时对TCP连接三次握手机制的检查,0表示完成了握手过程
if (socketWrapper.getSocket().isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed. handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake();
event = SocketEvent.OPEN_READ;
}
} catch (IOException x) {
handshake = -1;
if (logHandshake.isDebugEnabled()) {
logHandshake.debug(sm.getString("endpoint.err.handshake",
socketWrapper.getRemoteAddr(), Integer.toString(socketWrapper.getRemotePort())), x);
}
}
if (handshake == 0) {
SocketState state;
// Process the request from this socket
//这里真正的发生了处理
state = getHandler().process(socketWrapper, Objects.requireNonNullElse(event, SocketEvent.OPEN_READ));
//检查处理完后的状态
if (state == SocketState.CLOSED) {
// Close socket and pool
socketWrapper.close();
//如果状态标识为需要升级协议,将launch变为true
} else if (state == SocketState.UPGRADING) {
launch = true;
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
socketWrapper.close();
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error(sm.getString("endpoint.processing.fail"), t);
if (socketWrapper != null) {
socketWrapper.close();
}
} finally {
if (launch) {
try {
//新创建一个处理器处理进一步处理
getExecutor().execute(new SocketProcessor(socketWrapper, SocketEvent.OPEN_READ));
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
socketWrapper = null;
event = null;
//return to cache
//然后将当前的处理器重新放到缓存中
if (running && processorCache != null) {
processorCache.push(this);
}
}
}
}

接下来看看 process 方法在干什么,这个方法不太好找,位于 AbstractProtocol 类中,而且比较长,所以我只截取部分

1
2
3
Processor processor = (Processor) wrapper.takeCurrentProcessor();
//在这里获取处理器
state = processor.process(wrapper, status);//调用处理器的process方法

这个 process 方法的实现也不好找,位于AbstractProcessorLight,看一眼具体的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override  
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {

SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
if (getLog().isTraceEnabled()) {
getLog().trace("Processing dispatch type: [" + nextDispatch + "]");
}
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
//当时传入的状态是读,所以我们直接看这里
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}

if (getLog().isTraceEnabled()) {
getLog().trace(
"Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]");
}

/*
* If state is already CLOSED don't call asyncPostProcess() as that will likely change the state to some
* other value causing processing to continue when it should cease. The AsyncStateMachine will be recycled
*
as part of the Processor clean-up on CLOSED so it doesn't matter what state it is left in at this point.
*/
if (isAsync() && state != SocketState.CLOSED) {
state = asyncPostProcess();
if (getLog().isTraceEnabled()) {
getLog().trace(
"Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]");
}
}

if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);

return state;
}

所以接下来我们要看看 service 方法,这个方法也比较长,所以我们只看看关键部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
setSocketWrapper(socketWrapper);//直接将打包好的连接放进处理器,之后直接通过处理器来获得对应的信息
//首先是这部分,开始尝试对HTTP请求进行解析
if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
protocol.getKeepAliveTimeout())) {
if (inputBuffer.getParsingRequestLinePhase() == -1) {
return SocketState.UPGRADING;
} else if (handleIncompleteRequestLineRead()) {
break;
}
}

// Process the Protocol component of the request line
// Need to know if this is an HTTP 0.9 request before trying to
// parse headers.
//开始获取请求的协议
prepareRequestProtocol();

if (protocol.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
// Don't parse headers for HTTP/0.9
if (!http09 && !inputBuffer.parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!protocol.getDisableUploadTimeout()) {
socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
}
}

我们今天只讨论网络通信本身,不会深入的探究具体的对请求的解析流程,所以如果对请求的解析实现有兴趣可以自己看看。
接下来会运行到这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
if (getErrorState().isIoAllowed()) {  
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {

prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
}
}

prepareRequest 方法负责将收到的请求信息转换为能够使用的键值对形式,还是和上面的一样,HTTP 信息解析不属于我们今天讨论的内容,有需要可以自己了解
接下来就是正式的请求处理了

1
2
3
4
5
6
7
8
if (getErrorState().isIoAllowed()) {//如果运行进行IO  
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);//将状态切换为正在处理
getAdapter().service(request, response);//调用Adapter进行处理,此时请求的信息已经被封装到了request中
if (keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}

在这里,我们终于见到了 Tomcat 的下一个重要组件:Adapter。这个组件时一个转换组件,负责将获得的 HTTP 请求信息转换为容器能够处理的形式。

我们在上一部分中看到了 Adapter 组件被调用,adapter 会尝试着对信息做分离,然后将需要处理的信息投送到对应的 Container 组件中进行处理,我们使用 Tomcat 时提供的 Servlet 就是容器的一种,负责具体的业务处理
上面我们看到了 adapter 方法被调用,在完成几个异常检查之后会有下面的流程

1
2
3
4
5
6
7
8
9
10
//对请求中的信息做必要的处理
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
// check valves if we support async
//检查是否支持异步
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
//开始获取正确的容器然后对请求进行处理
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}

解释一下最后一行的一长串流式调用,首先获得当前的 connector 对应的 Service,然后从 service 中获取容器,此处的获取容器方法固定返回 Engine 容器,Engine 一般在每个 Service 中只有一个,是顶层容器,负责将请求分发给真正负责处理的容器。然后从 Engine 中获得处理管线,处理管线由一个基本容器 BaseContainer 和一系列可选包装器(Valves)组成,这些包装器负责在正式的请求处理前的检查工作与请求结束后的资源清理工作。Tomcat 原生实现了大量的 Valve,具体可以看一下这张图

在默认的情况下 Engine 中的 Valve 是一个 StandardEngineValve(当然可以通过配置加一些实现),实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void invoke(Request request, Response response) throws IOException, ServletException {  

// Select the Host to be used for this Request
Host host = request.getHost();
if (host == null) {
if (!response.isError()) {
response.sendError(404);
}
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}

// Ask this Host to process this request
host.getPipeline().getFirst().invoke(request, response);
}

我们可以看到这里实际上是将请求分发到了 HOST 容器,HOST 容器包含一个 StandardHostValve,实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void invoke(Request request, Response response) throws IOException, ServletException {  

// Select the Context to be used for this Request
Context context = request.getContext();
//尝试获取请求的上下文容器
if (context == null) {
// Don't overwrite an existing error
if (!response.isError()) {
response.sendError(404);
}
return;
}

if (request.isAsyncSupported()) {
request.setAsyncSupported(context.getPipeline().isAsyncSupported());
}

boolean asyncAtStart = request.isAsync();

try {
context.bind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);

if (!asyncAtStart && !context.fireRequestInitEvent(request.getRequest())) {
return;
//异步请求检查
}

. try {
if (!response.isErrorReportRequired()) {
//尝试将请求分发给context容器
context.getPipeline().getFirst().invoke(request, response);
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
container.getLogger().error(sm.getString("standardHostValve.exception", request.getRequestURI()), t);
. if (!response.isErrorReportRequired()) {
request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, t);
throwable(request, response, t);
}
}
//现在请求又回到了Host的控制下
response.setSuspended(false);

Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
//如果context容器刚刚已经销毁了,直接返回
if (!context.getState().isAvailable()) {
return;
}

//如果程序需要进行错误报告
if (response.isErrorReportRequired()) {
//接下来检测是否运行更进一步的IO
AtomicBoolean result = new AtomicBoolean(false);
response.getCoyoteResponse().action(ActionCode.IS_IO_ALLOWED, result);
//如果允许IO,那么渲染一个错误页面
if (result.get()) {
if (t != null) {
throwable(request, response, t);
} else {
status(request, response);
}
}
}

if (!request.isAsync() && !asyncAtStart) {
context.fireRequestDestroyEvent(request.getRequest());
}
} finally {
//刷新长连接的最后一次访问的时间
if (context.getAlwaysAccessSession()) {
request.getSession(false);
}

context.unbind(Globals.IS_SECURITY_ENABLED, MY_CLASSLOADER);
}
}

接下来是 context 容器的 invoke 方法,这个方法我就不展示了,在 StandardContextValve 中实现,就是做了一点处理后移交给 Wrapper 容器,我们可以看看 Wrapper 在干什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//在跳过错误检查后开始分配Servlet
// Allocate a servlet instance to process this request
try {
if (!unavailable) {
servlet = wrapper.allocate();
}
} catch (UnavailableException e) {
container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()), e);
checkWrapperAvailable(response, wrapper);
} catch (ServletException e) {
container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()),
StandardWrapper.getRootCause(e));
throwable = e;
exception(request, response, e);
} catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
container.getLogger().error(sm.getString("standardWrapper.allocateException", wrapper.getName()), e);
throwable = e;
exception(request, response, e);
// servlet = null; is set here
}

对,在上面漫长的兜圈子后我们终于看到了我们熟悉的 Servlet,allocate 方法就不看了,就是将 Wrapper 内携带的单例容器返回来而已,如果没有创建单例就自己创建一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if ((servlet != null) && (filterChain != null)) {  
// Swallow output if needed
if (context.getSwallowOutput()) {
//判断是否需要捕获系统输出,如System.in或System.err
try {
SystemLogHandler.startCapture();
if (request.isAsyncDispatching()) {//异步请求处理
request.getAsyncContextInternal().doInternalDispatch();
} else {
//调用过滤链
filterChain.doFilter(request.getRequest(), response.getResponse());
}
} finally {
//最后记录刚刚捕获的信息
String log = SystemLogHandler.stopCapture();
if (log != null && !log.isEmpty()) {
context.getLogger().info(log);
}
}
} else {
//不需要捕获输出那么直接处理
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
filterChain.doFilter(request.getRequest(), response.getResponse());
}
}

}

实质上 Servlet 容器中方法的调用发生在 doFilter 方法中,我们可以看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private void internalDoFilter(ServletRequest request, ServletResponse response)  
throws IOException, ServletException {

// Call the next filter if there is one
if (pos < n) {
//顺次获取下一个过滤器
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();

if (request.isAsyncSupported() && !(filterConfig.getFilterDef().getAsyncSupportedBoolean())) {
request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
}
if (Globals.IS_SECURITY_ENABLED) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal = ((HttpServletRequest) req).getUserPrincipal();

Object[] args = new Object[] { req, res, this };
SecurityUtil.doAsPrivilege("doFilter", filter, classType, args, principal);
} else {
//再次调用相同的方法,调用下一个过滤器,由于一般情况下过滤器是我们自己实现的,我们会在
//doFilter中进行过滤,然后手动调用filterChain.doFilter来实现连续的过滤
filter.doFilter(request, response, this);
}
} catch (IOException | ServletException | RuntimeException e) {
throw e;
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("filterChain.filter"), e);
}
return;
}

// We fell off the end of the chain -- call the servlet instance
//运行到此处说明走完了所有过滤器,接下来调用Servlet容器
try {
if (dispatcherWrapsSameObject) {
lastServicedRequest.set(request);
lastServicedResponse.set(response);
}

if (request.isAsyncSupported() && !servletSupportsAsync) {
request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
}
// Use potentially wrapped request from this point
if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED) {
final ServletRequest req = request;
final ServletResponse res = response;
Principal principal = ((HttpServletRequest) req).getUserPrincipal();
Object[] args = new Object[] { req, res };
SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService, args, principal);
} else {
//一般的HTTP请求直接运行到这里,然后调用Service方法
servlet.service(request, response);
}
} catch (IOException | ServletException | RuntimeException e) {
throw e;
} catch (Throwable e) {
e = ExceptionUtils.unwrapInvocationTargetException(e);
ExceptionUtils.handleThrowable(e);
throw new ServletException(sm.getString("filterChain.servlet"), e);
} finally {
if (dispatcherWrapsSameObject) {
lastServicedRequest.set(null);
lastServicedResponse.set(null);
}
}
}

显然,到这一步请求对应的相应已经有了,该返回对应的结果了,那么返回怎么走,来,回忆上面的流程,我们实际是调用了 Adapter 组件的 service 方法,然后一层一层的调用容器,现在我们继续返回 service 方法,看看接下来该干什么,比较关键的是这个

1
2
3
4
5
6
// Recycle the wrapper request and response  
if (!async) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}

我们注意到分别调用了请求和响应的 recycle 方法,这个方法会释放对象内所有的引用,即清理对象,使对象可以循环使用。
再往回是 processor 的 service 方法,最后部分的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (getErrorState().isError() || (protocol.isPaused() && !isAsync())) {  
return SocketState.CLOSED;//如果存在错误或者协议处理器中标注被暂停且不是异步请求
} else if (isAsync()) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileState == SendfileState.PENDING) {
return SocketState.SENDFILE; //可能的状态为Pending,即等待处理,此时需要发送文件
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED; //对于一个非异步的,没有出错的,不要求长连接的请求,返回关闭信号
}
}
}

接下来是协议处理器的 process 方法,我就不展示了,检测到 closed 状态直接返回,最后一路收到 closed 状态一路关闭与回收,最终关闭 Socket 通道

我们可以大概的梳理一下上面的过程,当 Tomcat 启动时 Service 组件启动Connector ,Connector组件启动线程池与监听端口的 Acceptor,监听到的请求被 SocketWrapper,SocketWrapper 实现了 Runnable 接口,自己将自己传递给 Protocol 协议处理器,协议处理器内部通过 Adapter 将请求转发给 Container,在经历了 Engine,Host,Context,Wrapper 容器的 Valve 后被传递给负责实际业务逻辑的 Servlet,在 Servlet 内发生实际的请求处理,将相应结果写回。

在最后的最后,我们再回到一个问题,Spring 框架与 Tomcat 时怎么整合的?答案非常简单,DisPatchServlet 是 HttpServlet 的一个子类,然后在启动时相 Tomcat 只注册了这一个 Servlet,然后由 DisPatchServlet 来调用不同的 Controller 实现。

结语

终于写完了,心累啊。不过讲了这么多,还是存在不少问题,比如我们实际上没有去关注 TCP 协议,HTTP 协议等内容,只是泛泛的谈了一下 Tomcat 的思路,下期吧,下一篇我们直接从网络架构开讲,好好说说各个协议的实现。

  • 标题: 网络通信
  • 作者: Soul
  • 创建于 : 2025-05-08 10:59:41
  • 更新于 : 2025-05-13 19:36:16
  • 链接: https://soulmate.org.cn/posts/3e3a64ad/
  • 版权声明: 本文章采用 CC BY-NC 4.0 进行许可。