当前位置: 首页 > article >正文

SpringBoot Tomcat 请求处理全流程详解

SpringBoot Tomcat 请求处理全流程详解

首先要对文章的一些概念进行简单解释:

Tomcat 整体架构可以参考这篇文章:https://blog.csdn.net/qq_56517253/article/details/143100177?spm=1001.2014.3001.5501

  • 连接器:主要用于接收连接,封装请求数据,解析应用层协议,将数据转换为符合 Servlet 规范的格式,并传递给容器进行请求处理
    • ProtocolHandler:应用层协议与IO模型结合的抽象,例如:Http11NioProtocol,Http11Nio2Protocol
      • Endpoint:请求端点,监听请求,生成 socket
        • Acceptor:监听 Socket 连接请求,对请求进行封装
        • SocketProcessor:处理接收到的 Socket 请求,它实现 Runnable 接口,在 Run 方法里调用协议处理组件 Processor 进行处理
      • Processor:解析应用层协议,接收来自 EndPoint 的 Socket,读取字节流解析成 Tomcat Request 和 Response 对象
    • Adapter: 适配器,将 Tomcat Request/Response 转化为 Servlet Request/Response,并转发给容器处理
  • 多层容器:通过多层容器提供隔离环境,利用责任链和 Value 管道进行链式处理,最终把请求转发到对应的 Servlet

以 Tomcat 默认请求 Http11Nio 为例:

Acceptor 处理逻辑

Acceptor (org.apache.tomcat.util.net.Acceptor) 实现了 Runnable 接口,在 run 方法里 while 循环获取新的 socket,(省略非核心代码)

    public void run() {
        try {
            // 循环 直到收到了停止指令
            while (!stopCalled) {

                try {
                    // 如果已经到了最大连接数 等待
                    endpoint.countUpOrAwaitConnection();

                    U socket = null;
                    try {
                        // 接受来自 socket 的下一个连接
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {

                    }

                    // Configure the socket
                    if (!stopCalled && !endpoint.isPaused()) {
                        // setSocketOptions() will hand the socket off to an appropriate processor if successful
                        // setSocketOptions 方法将 socket 封装注册到 poller 中
                        if (!endpoint.setSocketOptions (socket)) {
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        endpoint.destroySocket(socket);
                    }
                } 
            }
        }
    }

在 setSocketOptions() 方法中,将 socket 封装为 socketWrapper,注册到 poller 中

    protected boolean setSocketOptions(SocketChannel socket) {
        NioSocketWrapper socketWrapper = null;
        try {
            // 分配 channel 和 socket 包装器
            NioChannel channel = "这里经过一些判断,对 channel 进行了赋值";

            NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
            channel.reset(socket, newWrapper);
            connections.put(socket, newWrapper);
            
            // 给 socketWrapper 赋值 
            socketWrapper = newWrapper;

            // 设置 socket 属性为非阻塞,由 selector 轮询
            socket.configureBlocking(false);

            // 配置一些设置项 超时时间等
            
            // 注册到 poller 中,注册的是一个 PollerEvent 对象
            poller.register(socketWrapper);
            return true;
        }
        // Tell to close the socket if needed
        return false;
    }

poller 实现了 Runnable 接口,内部有 selector 和 PollerEvent 的一个队列,socketWrapper 最终就是注册到了这个队列中

Poller 处理逻辑

Poller 实现了 Runnable 接口,其 run 方法如下:

        public void run() {
            // 循环直到 destory 方法被调用
            while (true) {

                boolean hasEvents = false;

                // 进行一些逻辑判断和处理

                Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // 遍历已就绪的集合并调度活动事件
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    iterator.remove();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    
					// 如果另一个线程调用了 cancelledKey() ,则 attachment 可能为 null
                    if (socketWrapper != null) {
                        // 进行处理
                        processKey(sk, socketWrapper);
                    }
                }

                // Process timeouts
                timeout(keyCount,hasEvents);
            }

            getStopLatch().countDown();
        }

在 processKey 方法中,进行一些逻辑判断,确定事件类型,进行下一步处理:

        protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) {
            try {
                if (close) {
                    cancelledKey(sk, socketWrapper);
                } else if (sk.isValid()) {
                    if (sk.isReadable() || sk.isWritable()) {
                        if (socketWrapper.getSendfileData() != null) {
                            processSendfile(sk, socketWrapper, false);
                        } else {
                            unreg(sk, socketWrapper, sk.readyOps());
                            boolean closeSocket = false;
                            // Read goes before write
                            if (sk.isReadable()) {
                
                                // 省略前置判断,进行读事件处理   
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) {
                                    closeSocket = true;
                                }
                            }
                            if (!closeSocket && sk.isWritable()) {
                
                                //省略前置判断, 进行写事件处理   
                                } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) {
                                    closeSocket = true;
                                }
                            }
                    
                    		// 处理完成,撤销 key
                            if (closeSocket) {
                                cancelledKey(sk, socketWrapper);
                            }
                        }
                    }
                }
            }
        }

在 processSocket 方法中,根据参数和执行器,判断是否放入执行器调度:

    public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) {
        try {
            
           	// 省略前置判断

            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            
            // 注意这里将 socketWrapper 封装成了 SocketProcessorBase<S>
            // SocketProcessorBase<S> 实现了 Runnable 接口,在调度时会执行 run 方法
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            
            // 获取执行器进行调度
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
        }
        return true;
    }

Processor 处理逻辑

Poller 将 sockerWrapper 封装为 SocketProcessorBase 对象,并放入执行器中执行,查看 SocketProcessorBase 类:

public abstract class SocketProcessorBase<S> implements Runnable {

    protected SocketWrapperBase<S> socketWrapper;
    protected SocketEvent event;

    public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        reset(socketWrapper, event);
    }


    public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
        Objects.requireNonNull(event);
        this.socketWrapper = socketWrapper;
        this.event = event;
    }


    @Override
    public final void run() {
        synchronized (socketWrapper) {
			// 可能会同时触发读取和写入处理。synchronized 可确保处理不会并行进行
            // 如果要处理的第一个事件导致套接字关闭,则不再处理后续事件
            if (socketWrapper.isClosed()) {
                return;
            }
            doRun();
        }
    }


    protected abstract void doRun();
}

SocketProcessorBase 是一个抽象类,在 run 方法里调用了抽象方法 doRun,交给了子类实现:

image-20241112161837289

查看 Nio 的实现:

        protected void doRun() {

            Poller poller = NioEndpoint.this.poller;
   
            try {
                // 进行逻辑判断,确定 handshake 和 event 的值
                int handshake = -1;
                try {
                    if (socketWrapper.getSocket().isHandshakeComplete()) {
                        // No TLS handshaking required. Let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                            event == SocketEvent.ERROR) {
                        // Unable to complete the TLS handshake. Treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socketWrapper.getSocket().handshake(event == SocketEvent.OPEN_READ, event == SocketEvent.OPEN_WRITE);
                        event = SocketEvent.OPEN_READ;
                    }
                }
                
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // 处理这个 socket 中的请求
                    if (event == null) {
                        state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
                    } else {
                        // 我测试时,普通的一个 Get 请求走的这里
                        state = getHandler().process(socketWrapper, event);
                    }
                    if (state == SocketState.CLOSED) {
                        poller.cancelledKey(getSelectionKey(), socketWrapper);
                    }
                } else if (handshake == -1 ) {
                    getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL);
                    poller.cancelledKey(getSelectionKey(), socketWrapper);
                } else if (handshake == SelectionKey.OP_READ){
                    socketWrapper.registerReadInterest();
                } else if (handshake == SelectionKey.OP_WRITE){
                    socketWrapper.registerWriteInterest();
                }
            }
        }

这是一个接口,有唯一实现类:

        public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
            
            // 进行前置判断,主要是校验合法性

            // 获取 socket 对象
            S socket = wrapper.getSocket();

            // 获取 processor 对象
            Processor processor = (Processor) wrapper.takeCurrentProcessor();

            // 进行一些超时等判断
            
            try {

                // 进行一些判断
                
                SocketState state = SocketState.CLOSED;
                do {
                   
                    // 调用对应的 processor 进行下一步请求处理
                    state = processor.process(wrapper, status);

                    // 进行一些其他处理,比如有的请求是要升级为其他协议的
                    if (state == SocketState.UPGRADING) {

                        } else {
                            
                            
                        }
                    }
                } while ( state == SocketState.UPGRADING);

  			// 进行后续处理
                
            return SocketState.CLOSED;
        }

processor.process() 还是一个接口,有唯一抽象实现类:

    public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {

        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            
            // 根据 SocketEvent 的类型进行不同的操作
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Processing dispatch type: [" + nextDispatch + "]");
                }
                state = dispatch(nextDispatch.getSocketStatus());
                if (!dispatches.hasNext()) {
                    state = checkForPipelinedData(state, socketWrapper);
                }
            } else if (status == SocketEvent.DISCONNECT) {
                // Do nothing here, just wait for it to get recycled
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                state = checkForPipelinedData(state, socketWrapper);
            } else if (status == SocketEvent.OPEN_WRITE) {
                // Extra write event likely after async, ignore
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ) {
                
                // 我所测试的 Get 请求走的这里,这其实是个抽象方法,具体走的是子类的实现,默认就是 Http11Processor
                state = service(socketWrapper);
                
            } else if (status == SocketEvent.CONNECT_FAIL) {
                logAccess(socketWrapper);
            } else {
                state = SocketState.CLOSED;
            }


            // 如果是异步请求
            if (isAsync()) {
                state = asyncPostProcess();
                if (getLog().isDebugEnabled()) {
                    getLog().debug("Socket: [" + socketWrapper +
                            "], State after async post processing: [" + state + "]");
                }
            }

        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }

service(SocketWrapperBase<?> socketWrapper) 其实是个抽象方法,具体走的是子类的实现,正常情况是 Http11Processor:

image-20241112163543478

Http11Processor 处理逻辑

    public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException {
        RequestInfo rp = request.getRequestProcessor();
        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

        // Setting up the I/O
        setSocketWrapper(socketWrapper);

        // Flags
        keepAlive = true;
        openSocket = false;
        readComplete = true;
        boolean keptAlive = false;
        SendfileState sendfileState = SendfileState.DONE;

        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
                sendfileState == SendfileState.DONE && !protocol.isPaused()) {

            // 解析请求头,进行一些处理


            // 判断是否升级协议了


            // 设置一些参数
            int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();
            if (maxKeepAliveRequests == 1) {
                keepAlive = false;
            } else if (maxKeepAliveRequests > 0 &&
                    socketWrapper.decrementKeepAlive() <= 0) {
                keepAlive = false;
            }

            // 调用适配器进行处理
            if (getErrorState().isIoAllowed()) {
                try {
                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                    
                    // 获取适配器进行处理
                    getAdapter().service(request, response);
                    
                    // Handle when the response was committed before a serious
                    // error occurred.  Throwing a ServletException should both
                    // set the status to 500 and set the errorException.
                    // If we fail here, then the response is likely already
                    // committed, so we can't try and set headers.
                    if(keepAlive && !getErrorState().isError() && !isAsync() &&
                            statusDropsConnection(response.getStatus())) {
                        setErrorState(ErrorState.CLOSE_CLEAN, null);
                    }
                }
            }

            // 对异步、请求错误等进行处理

    }

Adapter 处理逻辑

Adapter 将传入的 Tomcat Request/Response 转化为 Servlet Request/Response 并转发给多层容器的顶级容器,即 Engine 处理:

    public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
            throws Exception {

        // 封装新对象
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);

		// 为新对象赋一些值,如请求头等


        // 设置异步标记
        boolean async = false;
        boolean postParseSuccess = false;

        req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
        req.setRequestThread();

        try {
            // Parse and set Catalina and configuration specific
            // request parameters
            postParseSuccess = postParseRequest(req, request, res, response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(
                        connector.getService().getContainer().getPipeline().isAsyncSupported());
                
                // 调用 Engine 容器 第一个 Value 对象的 invoke 方法
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request, response);
            }
            
            
            // 进行异步请求的处理,设置标记位等
            if (request.isAsync()) {
                async = true;
            } else {
                // 不是非异步,直接返回响应
                request.finishRequest();
                response.finishResponse();
            }

        } catch (IOException e) {
            // Ignore
        } finally {
            // 如果不是异步请求,回收请求和响应
            if (!async) {
                updateWrapperErrorCount(request, response);
                request.recycle();
                response.recycle();
            }
        }
    }

多层容器处理逻辑

img

直接来看 StandardWrapper 的 invoke 方法实现:

    public final void invoke(Request request, Response response)
        throws IOException, ServletException {

        // 进行一些前置判断和赋值
       

        // 分配一个 servlet 实例来处理这个请求
        try {
            if (!unavailable) {
                servlet = wrapper.allocate();
            }
        }


        // 创建 filter 链
        ApplicationFilterChain filterChain =
                ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);

        // 开始调用 filter 链,链的最后会调用到这个 servlet 的方法
        Container container = this.container;
        try {
            if ((servlet != null) && (filterChain != null)) {
                // Swallow output if needed
                if (context.getSwallowOutput()) {
                    try {
                        // 是否是异步
                        if (request.isAsyncDispatching()) {
                            request.getAsyncContextInternal().doInternalDispatch();
                        } else {
                            // doFilter
                            filterChain.doFilter(request.getRequest(), response.getResponse());
                        }
                    }
                } else {
                    // 是否是异步
                    if (request.isAsyncDispatching()) {
                        request.getAsyncContextInternal().doInternalDispatch();
                    } else {
                        // doFilter
                        filterChain.doFilter(request.getRequest(), response.getResponse());
                    }
                }

            }
        }

        // 异常处理

    }

doFilter 最后会调用到 internalDoFilter 方法,这里写了在调用完成后调用 servlet 的实现:

    private void internalDoFilter(ServletRequest request,
                                  ServletResponse response)
        throws IOException, ServletException {

        // 当前链所处位置 < 链的长度
        if (pos < n) {
            ApplicationFilterConfig filterConfig = filters[pos++];
            try {
                Filter filter = filterConfig.getFilter();

                if( Globals.IS_SECURITY_ENABLED ) {

                } else {
                    // 调用下一个链,把当前对象传进去
                    filter.doFilter(request, response, this);
                }
            }
            return;
        }

        // 已经到了链的尽头,可以调用 servlet 处理了
        try {

            // Use potentially wrapped request from this point
            if ((request instanceof HttpServletRequest) &&
                    (response instanceof HttpServletResponse) &&
                    Globals.IS_SECURITY_ENABLED ) {
      
            } else {
                // 调用 servlet 处理
                servlet.service(request, response);
            }
        }
    }

HttpServlet 处理逻辑

service(ServletRequest req, ServletResponse res)是一个接口,被 HttpServlet 实现:

    public void service(ServletRequest req, ServletResponse res)
        throws ServletException, IOException {

        HttpServletRequest  request;
        HttpServletResponse response;

        // 将参数转为 HttpServletRequest 和 HttpServletResponse
        try {
            request = (HttpServletRequest) req;
            response = (HttpServletResponse) res;
        } catch (ClassCastException e) {
            throw new ServletException(lStrings.getString("http.non_http"));
        }
        
        // 调用内部 service 方法
        service(request, response);
    }

内部 service 方法根据请求方法的不同走不同的处理逻辑

    protected void service(HttpServletRequest req, HttpServletResponse resp)
        throws ServletException, IOException {

        String method = req.getMethod();

        // 根据不同请求方法走不同的处理逻辑
        if (method.equals(METHOD_GET)) {
            long lastModified = getLastModified(req);
            if (lastModified == -1) {
                // 走 Get 请求
                doGet(req, resp);
            } else {
                long ifModifiedSince;
                try {
                    ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
                } catch (IllegalArgumentException iae) {
                    // Invalid date header - proceed as if none was set
                    ifModifiedSince = -1;
                }
                if (ifModifiedSince < (lastModified / 1000 * 1000)) {
                    // If the servlet mod time is later, call doGet()
                    // Round down to the nearest second for a proper compare
                    // A ifModifiedSince of -1 will always be less
                    maybeSetLastModified(resp, lastModified);
                    doGet(req, resp);
                } else {
                    resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
                }
            }

        } else if (method.equals(METHOD_HEAD)) {
            long lastModified = getLastModified(req);
            maybeSetLastModified(resp, lastModified);
            doHead(req, resp);

        } else if (method.equals(METHOD_POST)) {
            doPost(req, resp);

        } else if (method.equals(METHOD_PUT)) {
            doPut(req, resp);

        } else if (method.equals(METHOD_DELETE)) {
            doDelete(req, resp);

        } else if (method.equals(METHOD_OPTIONS)) {
            doOptions(req,resp);

        } else if (method.equals(METHOD_TRACE)) {
            doTrace(req,resp);

        } else {
            // 未实现的 HttpMethod
        }
    }

doGet方法又被 FrameworkServlet(org.springframework.web.servlet.FrameworkServlet)所重写,此时已进入 Spring 的实现

	protected final void doGet(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {

		processRequest(request, response);
	}
	protected final void processRequest(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {

		long startTime = System.currentTimeMillis();
		Throwable failureCause = null;

		LocaleContext previousLocaleContext = LocaleContextHolder.getLocaleContext();
		LocaleContext localeContext = buildLocaleContext(request);

		RequestAttributes previousAttributes = RequestContextHolder.getRequestAttributes();
		ServletRequestAttributes requestAttributes = buildRequestAttributes(request, response, previousAttributes);

		WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
		asyncManager.registerCallableInterceptor(FrameworkServlet.class.getName(), new RequestBindingInterceptor());

		initContextHolders(request, localeContext, requestAttributes);

		try {
            // 这里的 doService 大家应该都熟悉了
			doService(request, response);
		}
		catch (ServletException | IOException ex) {
			failureCause = ex;
			throw ex;
		}
		catch (Throwable ex) {
			failureCause = ex;
			throw new NestedServletException("Request processing failed", ex);
		}

		finally {
			resetContextHolders(request, previousLocaleContext, previousAttributes);
			if (requestAttributes != null) {
				requestAttributes.requestCompleted();
			}
			logResult(request, response, failureCause, asyncManager);
			publishRequestHandledEvent(request, response, startTime, failureCause);
		}
	}

其中 doService 的唯一实现,就是大名鼎鼎的 DispatcherServlet 了

后续就是寻找最匹配路径的 Bean,一系列操作调用到最后的方法上了

注:本文梳理的只是 Tomcat 相关的请求处理逻辑,后续的 DispatcherServlet 转发到对应的 Controller 就不梳理了。以上均以默认情况(HttpNio)下的某 Get 请求为例,且大部分代码段都省略了非关键代码。


http://www.kler.cn/a/391530.html

相关文章:

  • 【计算机网络】深入浅出计算机网络
  • 【Uniapp-Vue3】showLoading加载和showModal模态框示例
  • 手撕代码: C++实现按位序列化和反序列化
  • Hadoop•安装JDK
  • unity打包sdk热更新笔记
  • 73.矩阵置零 python
  • 汇川PLC EtherNET/IP无线通信,开启国产工控无线互联新时代
  • SASS 控制指令详解@for、@if、@each、@while
  • 面试问答:TCP协议中的三开四断,三次握手四次挥手
  • 关于CSS表达使中使用的 max() 函数
  • sqlite3数据库的相关API使用
  • 二叉树的前序遍历---一个简单高效的算法
  • 以数字产业园区规划为笔,绘智慧城市新篇章
  • 【ExcelWPS如何对工作表和文档进行加密保护】
  • 【大数据技术基础 | 实验十】Hive实验:部署Hive
  • Leetcode:645. 错误的集合——Java暴力解法哈希表法
  • 科目一汇总笔记2024
  • JAP+Hibernate持久化框架
  • 大模型学习笔记------BLIP模型详解与思考
  • Linux(CentOS)yum update -y 事故
  • 吴恩达深度学习笔记:卷积神经网络(Foundations of Convolutional Neural Networks)4.11
  • 【嵌入式开发——ARM】2ARM汇编指令
  • C/C++ 模板与so
  • elementUI input 禁止内容两端存在空格,或者是自动去除两端空格
  • springboot小型养猪场信息管理系统-计算机毕业设计源码48584
  • 【青牛科技】 GC6153——TMI8152 的不二之选,可应用于摇头机等产品中