SpringBoot Tomcat 请求处理全流程详解
SpringBoot Tomcat 请求处理全流程详解
首先要对文章的一些概念进行简单解释:
Tomcat 整体架构可以参考这篇文章:https://blog.csdn.net/qq_56517253/article/details/143100177?spm=1001.2014.3001.5501
- 连接器:主要用于接收连接,封装请求数据,解析应用层协议,将数据转换为符合 Servlet 规范的格式,并传递给容器进行请求处理
- ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
- Endpoint:请求端点,监听请求,生成 socket
- Acceptor:监听 Socket 连接请求,对请求进行封装
- SocketProcessor:处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理
- Processor:解析应用层协议,接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象
- Endpoint:请求端点,监听请求,生成 socket
- Adapter: 适配器,将 Tomcat Request/Response 转化为 Servlet Request/Response,并转发给容器处理
- ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
- 多层容器:通过多层容器提供隔离环境,利用责任链和 Value 管道进行链式处理,最终把请求转发到对应的 Servlet
以 Tomcat 默认请求 Http11Nio 为例:
Acceptor 处理逻辑
Acceptor (org.apache.tomcat.util.net.Acceptor) 实现了 Runnable 接口,在 run 方法里 while 循环获取新的 socket,(省略非核心代码)
public void run() {
try {
// 循环 直到收到了停止指令
while (!stopCalled) {
try {
// 如果已经到了最大连接数 等待
endpoint.countUpOrAwaitConnection();
U socket = null;
try {
// 接受来自 socket 的下一个连接
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
}
// Configure the socket
if (!stopCalled && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to an appropriate processor if successful
// setSocketOptions 方法将 socket 封装注册到 poller 中
if (!endpoint.setSocketOptions (socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
}
}
}
}
在 setSocketOptions() 方法中,将 socket 封装为 socketWrapper,注册到 poller 中
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// 分配 channel 和 socket 包装器
NioChannel channel = "这里经过一些判断,对 channel 进行了赋值";
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
// 给 socketWrapper 赋值
socketWrapper = newWrapper;
// 设置 socket 属性为非阻塞,由 selector 轮询
socket.configureBlocking(false);
// 配置一些设置项 超时时间等
// 注册到 poller 中,注册的是一个 PollerEvent 对象
poller.register(socketWrapper);
return true;
}
// Tell to close the socket if needed
return false;
}
poller 实现了 Runnable 接口,内部有 selector 和 PollerEvent 的一个队列,socketWrapper 最终就是注册到了这个队列中
Poller 处理逻辑
Poller 实现了 Runnable 接口,其 run 方法如下:
public void run() {
// 循环直到 destory 方法被调用
while (true) {
boolean hasEvents = false;
// 进行一些逻辑判断和处理
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 遍历已就绪的集合并调度活动事件
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
// 如果另一个线程调用了 cancelledKey() ,则 attachment 可能为 null
if (socketWrapper != null) {
// 进行处理
processKey(sk, socketWrapper);
}
}
// Process timeouts
timeout(keyCount,hasEvents);
}
getStopLatch().countDown();
}
在 processKey 方法中,进行一些逻辑判断,确定事件类型,进行下一步处理:
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
try {
if (close) {
cancelledKey(sk, socketWrapper);
} else if (sk.isValid()) {
if (sk.isReadable() || sk.isWritable()) {
if (socketWrapper.getSendfileData() != null) {
processSendfile(sk, socketWrapper, false);
} else {
unreg(sk, socketWrapper, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 省略前置判断,进行读事件处理
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
//省略前置判断, 进行写事件处理
} else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
// 处理完成,撤销 key
if (closeSocket) {
cancelledKey(sk, socketWrapper);
}
}
}
}
}
}
在 processSocket 方法中,根据参数和执行器,判断是否放入执行器调度:
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
try {
// 省略前置判断
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
// 注意这里将 socketWrapper 封装成了 SocketProcessorBase<S>
// SocketProcessorBase<S> 实现了 Runnable 接口,在调度时会执行 run 方法
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
// 获取执行器进行调度
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
}
return true;
}
Processor 处理逻辑
Poller 将 sockerWrapper 封装为 SocketProcessorBase 对象,并放入执行器中执行,查看 SocketProcessorBase 类:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// 可能会同时触发读取和写入处理。synchronized 可确保处理不会并行进行
// 如果要处理的第一个事件导致套接字关闭,则不再处理后续事件
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
SocketProcessorBase 是一个抽象类,在 run 方法里调用了抽象方法 doRun,交给了子类实现:
查看 Nio 的实现:
protected void doRun() {
Poller poller = NioEndpoint.this.poller;
try {
// 进行逻辑判断,确定 handshake 和 event 的值
int handshake = -1;
try {
if (socketWrapper.getSocket().isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
event = SocketEvent.OPEN_READ;
}
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// 处理这个 socket 中的请求
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
// 我测试时,普通的一个 Get 请求走的这里
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
poller.cancelledKey(getSelectionKey(), socketWrapper);
}
} else if (handshake == -1 ) {
getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
poller.cancelledKey(getSelectionKey(), socketWrapper);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
}
}
这是一个接口,有唯一实现类:
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
// 进行前置判断,主要是校验合法性
// 获取 socket 对象
S socket = wrapper.getSocket();
// 获取 processor 对象
Processor processor = (Processor) wrapper.takeCurrentProcessor();
// 进行一些超时等判断
try {
// 进行一些判断
SocketState state = SocketState.CLOSED;
do {
// 调用对应的 processor 进行下一步请求处理
state = processor.process(wrapper, status);
// 进行一些其他处理,比如有的请求是要升级为其他协议的
if (state == SocketState.UPGRADING) {
} else {
}
}
} while ( state == SocketState.UPGRADING);
// 进行后续处理
return SocketState.CLOSED;
}
processor.process() 还是一个接口,有唯一抽象实现类:
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
// 根据 SocketEvent 的类型进行不同的操作
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
if (getLog().isDebugEnabled()) {
getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
}
state = dispatch(nextDispatch.getSocketStatus());
if (!dispatches.hasNext()) {
state = checkForPipelinedData(state, socketWrapper);
}
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
state = checkForPipelinedData(state, socketWrapper);
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ) {
// 我所测试的 Get 请求走的这里,这其实是个抽象方法,具体走的是子类的实现,默认就是 Http11Processor
state = service(socketWrapper);
} else if (status == SocketEvent.CONNECT_FAIL) {
logAccess(socketWrapper);
} else {
state = SocketState.CLOSED;
}
// 如果是异步请求
if (isAsync()) {
state = asyncPostProcess();
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], State after async post processing: [" + state + "]");
}
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
service(SocketWrapperBase<?> socketWrapper) 其实是个抽象方法,具体走的是子类的实现,正常情况是 Http11Processor:
Http11Processor 处理逻辑
public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
SendfileState sendfileState = SendfileState.DONE;
while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
sendfileState == SendfileState.DONE && !protocol.isPaused()) {
// 解析请求头,进行一些处理
// 判断是否升级协议了
// 设置一些参数
int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}
// 调用适配器进行处理
if (getErrorState().isIoAllowed()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
// 获取适配器进行处理
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
}
}
// 对异步、请求错误等进行处理
}
Adapter 处理逻辑
Adapter 将传入的 Tomcat Request/Response 转化为 Servlet Request/Response 并转发给多层容器的顶级容器,即 Engine 处理:
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
// 封装新对象
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
// 为新对象赋一些值,如请求头等
// 设置异步标记
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
req.setRequestThread();
try {
// Parse and set Catalina and configuration specific
// request parameters
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// 调用 Engine 容器 第一个 Value 对象的 invoke 方法
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
// 进行异步请求的处理,设置标记位等
if (request.isAsync()) {
async = true;
} else {
// 不是非异步,直接返回响应
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
// 如果不是异步请求,回收请求和响应
if (!async) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
}
多层容器处理逻辑
直接来看 StandardWrapper 的 invoke 方法实现:
public final void invoke(Request request, Response response)
throws IOException, ServletException {
// 进行一些前置判断和赋值
// 分配一个 servlet 实例来处理这个请求
try {
if (!unavailable) {
servlet = wrapper.allocate();
}
}
// 创建 filter 链
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
// 开始调用 filter 链,链的最后会调用到这个 servlet 的方法
Container container = this.container;
try {
if ((servlet != null) && (filterChain != null)) {
// Swallow output if needed
if (context.getSwallowOutput()) {
try {
// 是否是异步
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
// doFilter
filterChain.doFilter(request.getRequest(), response.getResponse());
}
}
} else {
// 是否是异步
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
// doFilter
filterChain.doFilter(request.getRequest(), response.getResponse());
}
}
}
}
// 异常处理
}
doFilter 最后会调用到 internalDoFilter 方法,这里写了在调用完成后调用 servlet 的实现:
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
// 当前链所处位置 < 链的长度
if (pos < n) {
ApplicationFilterConfig filterConfig = filters[pos++];
try {
Filter filter = filterConfig.getFilter();
if( Globals.IS_SECURITY_ENABLED ) {
} else {
// 调用下一个链,把当前对象传进去
filter.doFilter(request, response, this);
}
}
return;
}
// 已经到了链的尽头,可以调用 servlet 处理了
try {
// Use potentially wrapped request from this point
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
} else {
// 调用 servlet 处理
servlet.service(request, response);
}
}
}
HttpServlet 处理逻辑
service(ServletRequest req, ServletResponse res)是一个接口,被 HttpServlet 实现:
public void service(ServletRequest req, ServletResponse res)
throws ServletException, IOException {
HttpServletRequest request;
HttpServletResponse response;
// 将参数转为 HttpServletRequest 和 HttpServletResponse
try {
request = (HttpServletRequest) req;
response = (HttpServletResponse) res;
} catch (ClassCastException e) {
throw new ServletException(lStrings.getString("http.non_http"));
}
// 调用内部 service 方法
service(request, response);
}
内部 service 方法根据请求方法的不同走不同的处理逻辑
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
String method = req.getMethod();
// 根据不同请求方法走不同的处理逻辑
if (method.equals(METHOD_GET)) {
long lastModified = getLastModified(req);
if (lastModified == -1) {
// 走 Get 请求
doGet(req, resp);
} else {
long ifModifiedSince;
try {
ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
} catch (IllegalArgumentException iae) {
// Invalid date header - proceed as if none was set
ifModifiedSince = -1;
}
if (ifModifiedSince < (lastModified / 1000 * 1000)) {
// If the servlet mod time is later, call doGet()
// Round down to the nearest second for a proper compare
// A ifModifiedSince of -1 will always be less
maybeSetLastModified(resp, lastModified);
doGet(req, resp);
} else {
resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
}
} else if (method.equals(METHOD_HEAD)) {
long lastModified = getLastModified(req);
maybeSetLastModified(resp, lastModified);
doHead(req, resp);
} else if (method.equals(METHOD_POST)) {
doPost(req, resp);
} else if (method.equals(METHOD_PUT)) {
doPut(req, resp);
} else if (method.equals(METHOD_DELETE)) {
doDelete(req, resp);
} else if (method.equals(METHOD_OPTIONS)) {
doOptions(req,resp);
} else if (method.equals(METHOD_TRACE)) {
doTrace(req,resp);
} else {
// 未实现的 HttpMethod
}
}
doGet方法又被 FrameworkServlet(org.springframework.web.servlet.FrameworkServlet)所重写,此时已进入 Spring 的实现:
protected final void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
processRequest(request, response);
}
protected final void processRequest(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
long startTime = System.currentTimeMillis();
Throwable failureCause = null;
LocaleContext previousLocaleContext = LocaleContextHolder.getLocaleContext();
LocaleContext localeContext = buildLocaleContext(request);
RequestAttributes previousAttributes = RequestContextHolder.getRequestAttributes();
ServletRequestAttributes requestAttributes = buildRequestAttributes(request, response, previousAttributes);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.registerCallableInterceptor(FrameworkServlet.class.getName(), new RequestBindingInterceptor());
initContextHolders(request, localeContext, requestAttributes);
try {
// 这里的 doService 大家应该都熟悉了
doService(request, response);
}
catch (ServletException | IOException ex) {
failureCause = ex;
throw ex;
}
catch (Throwable ex) {
failureCause = ex;
throw new NestedServletException("Request processing failed", ex);
}
finally {
resetContextHolders(request, previousLocaleContext, previousAttributes);
if (requestAttributes != null) {
requestAttributes.requestCompleted();
}
logResult(request, response, failureCause, asyncManager);
publishRequestHandledEvent(request, response, startTime, failureCause);
}
}
其中 doService 的唯一实现,就是大名鼎鼎的 DispatcherServlet 了
后续就是寻找最匹配路径的 Bean,一系列操作调用到最后的方法上了
注:本文梳理的只是 Tomcat 相关的请求处理逻辑,后续的 DispatcherServlet 转发到对应的 Controller 就不梳理了。以上均以默认情况(HttpNio)下的某 Get 请求为例,且大部分代码段都省略了非关键代码。