优化SpringBoot接口:异步处理提升系统吞吐量策略
作者介绍:✌️大厂全栈码农|毕设实战开发,专注于大学生项目实战开发、讲解和毕业答疑辅导。
推荐订阅精彩专栏 👇🏻 避免错过下次更新
Springboot项目精选实战案例
更多项目:CSDN主页YAML墨韵
学如逆水行舟,不进则退。学习如赶路,不能慢一步。
前言
一、Servlet 3.0之前
每一次Http请求都由一个线程从头到尾处理。
1.1、基本处理方式
每一次Http请求都由某一个特定的线程从头到尾负责处理。这种方式被称为“Thread-Per-Request”模式,即每个请求分配一个线程。
1.2、流程解析
- 请求接收:当Web服务器接收到一个Http请求时,它会从线程池中分配一个线程来处理该请求。
- 请求处理:该线程会负责解析请求、调用相应的Servlet方法(如doGet或doPost)、执行相关的业务逻辑、生成响应等。
- 资源占用:如果请求需要进行IO操作(如访问数据库、调用第三方服务接口等),线程会同步地等待IO操作完成。由于IO操作通常较慢,线程在等待期间无法被释放回线程池以供后续使用。
- 响应返回:一旦IO操作完成或业务逻辑执行完毕,线程会生成响应并将其发送回客户端。
1.3、性能问题
随着并发量的增加,这种“Thread-Per-Request”模式会带来严重的性能问题:
- 线程占用:由于IO操作的阻塞性,线程在等待期间无法被释放,导致线程池中的可用线程数量减少。
- 资源消耗:每个线程都占用一定的系统资源(如内存和CPU),当线程数量过多时,会导致系统资源耗尽。
- 响应延迟:在高并发场景下,如果线程池中的线程都被占用,新的请求可能需要等待线程释放才能被处理,从而导致响应延迟。
二、Servlet 3.0之后
提供了异步处理请求:可以先释放容器分配给请求的线程与相关资源,减轻系统负担,从而增加服务的吞吐量。
在springboot应用中,可以有4种方式实现异步接口(至于ResponseBodyEmitter
、SseEmitter
、StreamingResponseBody
,不在本文介绍内,之后新写文章介绍):
-
AsyncContext
-
Callable
-
WebAsyncTask
-
DeferredResult
第一中AsyncContext
是Servlet层级的,比较原生的方式,(一般都不使用它,太麻烦了)。本文着重介绍后面三种方式。
特别说明:服务端的异步或同步对于客户端而言是不可见的。不会因为服务端使用了异步,接口的结果就和同步不一样了。另外,对于单个请求而言,使用异步接口会导致响应时间比同步大,但不特别明显。具体后文分析。
2.1、基于AsyncContext实现
@GetMapping(value = "/email/send")
public void servletReq(HttpServletRequest request) {
AsyncContext asyncContext = request.startAsync(); // 启动异步处理
// 设置监听器,处理异步线程的开始、完成、异常、超时等事件回调
asyncContext.addListener(new AsyncListener() {
@Override
public void onTimeout(AsyncEvent event) {
System.out.println("处理超时了...");
}
@Override
public void onStartAsync(AsyncEvent event) {
System.out.println("线程开始执行");
}
@Override
public void onError(AsyncEvent event) {
System.out.println("执行过程中发生错误:" + event.getThrowable().getMessage());
}
@Override
public void onComplete(AsyncEvent event) {
System.out.println("执行完成,释放资源");
}
});
// 设置异步处理超时时间
asyncContext.setTimeout(6000);
// 启动异步线程处理业务
asyncContext.start(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000); // 模拟耗时操作
System.out.println("内部线程:" + Thread.currentThread().getName());
asyncContext.getResponse().getWriter().println("async processing");
} catch (Exception e) {
System.out.println("异步处理发生异常:" + e.getMessage());
}
// 异步请求完成通知
asyncContext.complete();
}
});
// 此时request的线程连接已经释放了
System.out.println("主线程:" + Thread.currentThread().getName());
}
在上述示例中,当Servlet接收到请求后,通过request.startAsync()
方法启动异步处理,并返回一个AsyncContext对象。然后,通过AsyncContext的addListener
方法设置监听器,用于处理异步线程的开始、完成、异常、超时等事件回调。接着,通过asyncContext.start(new Runnable())
方法启动一个异步线程来处理业务逻辑。最后,当异步线程处理完业务逻辑后,调用asyncContext.complete()
方法通知Servlet容器异步处理完成,并返回响应给客户端。
2.1、基于Callable实现
Controller中,返回一个java.util.concurrent.Callable
包装的任何值,都表示该接口是一个异步接口:
@GetMapping("/testCallAble")
public Callable<String> testCallAble() {
return () -> {
Thread.sleep(40000);
return "hello";
};
}
服务器端的异步处理对客户端来说是不可见的。例如,上述接口,最终返回的客户端的是一个String,和同步接口中,直接返回String的效果是一样的。
Callable
处理过程如下:
控制器返回一个 Callable
。
-
Spring MVC 调用
request.startAsync()
并将 Callable 提交给AsyncTaskExecutor
以在单独的线程中进行处理。 -
同时,
DispatcherServlet
和所有过滤器退出 Servlet 容器线程,但response保持打开状态。 -
最终 Callable 产生结果,Spring MVC将请求分派回Servlet容器以完成处理。
-
再次调用
DispatcherServlet
,并使用 Callable 异步生成的返回值继续处理。
Callable
默认使用SimpleAsyncTaskExecutor
类来执行,这个类非常简单而且没有重用线程。在实践中,需要使用AsyncTaskExecutor
类来对线程进行配置。
2.3、基于WebAsyncTask实现
Spring提供的WebAsyncTask
是对Callable的包装,提供了更强大的功能,比如:处理超时回调、错误回调、完成回调等。本质上,和Callable区别不大,但是由于它额外封装了一些事件的回调,所有,通常都使用WebAsyncTask
而不是Callable
:
@GetMapping("/webAsyncTask")
public WebAsyncTask<String> webAsyncTask() {
WebAsyncTask<String> result = new WebAsyncTask<>(30003, () -> {
return "success";
});
result.onTimeout(() -> {
log.info("timeout callback");
return "timeout callback";
});
result.onCompletion(() -> log.info("finish callback"));
return result;
}
这里额外提一下,WebAsyncTask
可以配置一个超时时间,这里配置的超时时间比全局配置的超时时间优先级都高(会覆盖全局配置的超时时间)。
2.4、基于DeferredResult实现
DeferredResult
使用方式与Callable
类似,但在返回结果时不一样,它返回的时实际结果可能没有生成,实际的结果可能会在另外的线程里面设置到DeferredResult
中去。
//定义一个全局的变量,用来存储DeferredResult对象
private Map<String, DeferredResult<String>> deferredResultMap = new ConcurrentHashMap<>();
@GetMapping("/testDeferredResult")
public DeferredResult<String> testDeferredResult(){
DeferredResult<String> deferredResult = new DeferredResult<>();
deferredResultMap.put("test", deferredResult);
return deferredResult;
}
如果调用以上接口,会发现客户端的请求一直是在pending状态——等待后端响应。这里,我简单的将该接口返回的DeferredResult
对象存放在了一个Map集合中,实际应用中可以设计一个对象管理器来统一管理这些个对象。
注意:要考虑定时轮询(或其他方式)这些对象,将已经处理过或无效的
DeferredResult
对象清理掉(DeferredResult.isSetOrExpired
方法可以判断是否还有效),避免内存泄露。
这里我又写了一个接口,模拟
@GetMapping("/testSetDeferredResult")
public String testSetDeferredResult() throws InterruptedException {
DeferredResult<String> deferredResult = deferredResultMap.get("test");
boolean flag = deferredResult.setResult("testSetDeferredResult");
if(!flag){
log.info("结果已经被处理,此次操作无效");
}
return "ok";
}
其他线程修改DeferredResult
的值:首先是从之前存放DeferredResult
的map中拿到DeferredResult
的值,然后设置它的返回值。当执行deferredResult.setResult
之后,可以看到之前pending状态的接口完成了响应,得到的结果,就是这里设置的值。
这里也额外说下:在返回DeferredResult
时也可以设置超时时间,这个时间的优先级也是大于全局设置的。另外,判断DeferredResult
是否有效,只是一个简单的判断,实际中判断有效的并不一定是有效的(比如:客户端取消了请求,服务端是不知道的),但是一般判断为无效的,那肯定是无效了。
DeferredResult
处理过程如下:
-
控制器返回一个
DeferredResult
并将其保存在可以访问的内存队列或列表中。 -
Spring MVC 调用
request.startAsync()
。 -
同时,
DispatcherServlet
和所有配置的过滤器退出请求处理线程,但响应保持打开状态。 -
应用程序从某个线程设置
DeferredResult
,Spring MVC 将请求分派回 Servlet 容器。 -
再次调用
DispatcherServlet
,并使用异步生成的返回值继续处理。
提供一个线程池
异步请求,不会一直占用请求的主线程(tomcat容器中处理请求的线程),而是通过一个其他的线程来处理异步任务。也正是如此,在相同的最大请求数配置下,异步请求由于迅速的释放了主线程,所以才能提高吞吐量。
这里提到一个其他线程,那么这个其他线程我们一般都不适用默认的,都是根据自身情况提供一个线程池供异步请求使用:(我给的参数都是测试用的,实际中不可照搬)
@Bean("mvcAsyncTaskExecutor")
public AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量
// asyncServiceExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
executor.setCorePoolSize(5);
// 线程池维护线程的最大数量
executor.setMaxPoolSize(10);
// 线程池所使用的缓冲队列
executor.setQueueCapacity(10);
// asyncServiceExecutor.prefersShortLivedTasks();
executor.setThreadNamePrefix("fyk-mvcAsyncTask-Thread-");
asyncServiceExecutor.setBeanName("TaskId" + taskId);
// asyncServiceExecutor.setKeepAliveSeconds(20);
//调用者执行
// asyncServiceExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 线程全部结束才关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 如果超过60s还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
executor.setAwaitTerminationSeconds(30);
executor.initialize();
return executor;
}
把这个线程池配置设置到异步请求配置中:
@Configuration
public class FykWebMvcConfigurer implements WebMvcConfigurer {
@Autowired
@Qualifier("mvcAsyncTaskExecutor")
private AsyncTaskExecutor asyncTaskExecutor;
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
//异步操作的超时时间,值为0或者更小,表示永不超时
configurer.setDefaultTimeout(60001);
configurer.setTaskExecutor(asyncTaskExecutor);
}
}
什么时候使用异步请求
异步请求能提高吞吐量,这个是建立在相同配置(这里的配置指的是:最大连接数、最大工作线程数)的情况下。因此并不是说任何接口都可以使用异步请求。比如:一个请求是进行大量的计算(总之就是在处理这个请求的业务方法时CPU是没有休息的),这种情况使用异步请求就没有多大意义了,因为这时的异步请求只是把一个任务从tomcat的工作线程搬到了另一个线程罢了。
直接调大最大工作线程数配置也能到达要求。所以,真正使用异步请求的场景应该是该请求的业务代码中,大量的时间CPU是休息的(比如:在业务代码中请求其他系统的接口,在其他系统响应之前,CPU是阻塞等待的),这个时候使用异步请求,就可以释放tomcat的工作线程,让释放的工作线程可以处理其他的请求,从而提高吞吐量。
由于异步请求增加了更多的线程切换(同步请求是同一个工作线程一直处理),所以理论上会增加接口的耗时。但,这个耗时很短很短。