步入响应式编程篇(三)之spring webFlux与R2DBC
spring webFlux与R2DBC
- 前言
- Spring webFlux
- 与spring mvc的对比
- spring mvc
- spring webFlux
- SSE的demo
- R2DBC
前言
前面介绍响应式编程主要用到基于Stream API的Reactor API的方式,但如今业务操作需结合springboot,所以spring webFlux使用的更多,它是类似于spring mvc的web框架,不像springmvc的阻塞操作,一个请求通常对应一个线程,而spring webFlux响应式编程,天然支持异步+非阻塞+信号驱动;而且即使web框架用到响应式编程,但瓶颈仍然可能会出现在与关系型数据库的交互,遂引入了R2DBC等响应式方式与DB交互;
Spring webFlux
与spring mvc的对比
spring webFlux兼容spring mvc的,所以原来采用spring mvc开发,后面直接新引入依赖,使用spring webFlux即可;
对于响应式编程,前面两篇其实也介绍的听详细了,所以往下简单介绍它们:
spring mvc
spring mvc通常用于命令式编程,且它是阻塞的,线程接收一个请求执行的流程:浏览器 --> Controller --> Service --> Dao;
即使Tomcat中使用APR连接器接收请求,但spring mvc在处理业务时遇到阻塞业务的IO操作,该业务线程仍然需要等待;
APR利用JNI调用本地API,在NIO线程模型基础上优化的一种连接器,所以比NIO连接器性能更高,可以大幅提升Tomcat性能;
spring webFlux
而spring webFlux则默认基于Netty接收请求,是Dao(根据请求入参去数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器(相当于订阅者),而且dao查完数据推送给service时,service还可根据背压模式决定消费能力,起到异步、非阻塞发布订阅的作用;
值得一提是,spring cloud gateway的过滤器底层用的就是webflux,所以学习spring
webflux响应式编程是很有必要的;
众所周知,spring mvc处理请求时是先交由DispatcherServlet前端控制器,而spring webFlux则是交由的前端控制器为DispatcherHandler,至于后面也是交由处理器适配器、映射器等处理,只不过DispatcherHandler使用lambda更清晰:
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
// 跨域问题处理
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
//拿到所有的 handlerMappings
return Flux.fromIterable(this.handlerMappings)
// 找每一个mapping看谁能处理请求
.concatMap(mapping -> mapping.getHandler(exchange))
// 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
.next()
// 如果没拿到这个元素,则响应404错误;
.switchIfEmpty(createNotFoundError())
// 异常处理,一旦前面发生异常,调用处理异常
.onErrorResume(ex -> handleDispatchError(exchange, ex))
// 调用方法处理请求,得到响应结果
.flatMap(handler -> handleRequestWith(exchange, handler));
}
spring webFlux基于Reactor api的,对于api的操作翻看Reactor api篇即可;
SSE的demo
作为W3C的推荐规范,SSE在浏览器端的支持也比较广泛,而结合响应式编程,用spring webFlux实现SSE是非常契合的;
SSE 全称是 Server-Sent Events,它的作用和 WebSocket
的作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息,不同的是,WebSocket 是一种全双工通信协议,而 SSE
则是一种单工通信协议,即使用 SSE 只能服务器向浏览器推送信息流,浏览器如果向服务器发送信息,就是一个普通的 HTTP 请求。
@RequestMapping(value = "/sse",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> sse(){
System.out.println("sse入口……");
return Flux.range(1,100).map(i->
ServerSentEvent.builder("ha-"+i).id(i+"").comment("hello,"+i).event("haha").build()
).delayElements(Duration.ofSeconds(1));
}
如上例,每隔1s推送ServerSentEvent事件给浏览器;
效果如图,每隔1s,controller返回数据给浏览器;
R2DBC
要说spring webFlux是在业务处理层面异步化处理,而R2DBC则是与关系型数据库层面的异步化处理, R2DBC是 Spring 官方在 Spring5 发布了响应式 Web 框架 Spring WebFlux 之后急需能够满足异步响应的数据库交互 API,不过由于缺乏标准和驱动,Pivotal 团队开始自己研究响应式关系型数据库连接 Reactive Relational Database Connectivity,并提出了 R2DBC 规范 API 用来评估可行性并讨论数据库厂商是否有兴趣支持响应式的异步非阻塞驱动程序。最早只有 PostgreSQL 、H2、MSSQL 三家数据库厂商,不过现在 MySQL 也加入进来了,这是一个极大的利好。目前 R2DBC 的最新版本是 0.9.0.RELEASE。
官网介绍:
Spring Data R2 DBC是更大的Spring Data家族的一部分,可以轻松实现基于R2 DBC的存储库。R2DBC是Reactive Relational Database Connectivity(反应式关系数据库连接)的缩写,是一种使用反应式驱动程序集成SQL数据库的规范。Spring Data R2DBC应用了熟悉的Spring抽象和对R2DBC的存储库支持。它使得在反应式应用程序堆栈中构建使用关系数据访问技术的Spring驱动的应用程序变得更加容易。
其实spring的R2DBC也是基于Reactor api,官网使用R2DBC操作DB的例子:
ConnectionFactory connectionFactory = ConnectionFactories
.get("r2dbc:h2:mem:///testdb");
Mono.from(connectionFactory.create())
.flatMapMany(connection -> connection
.createStatement("SELECT firstname FROM PERSON WHERE age > $1")
.bind("$1", 42)
.execute())
.flatMap(result -> result
.map((row, rowMetadata) -> row.get("firstname", String.class)))
.doOnNext(System.out::println)
.subscribe();
而业务上使用的例子:
接口继承R2db2Repository,也是类似于mybatis plus的方式,userRepository作为dao,与关系型数据库交互:
同时R2DBC还像mybatis plus默认提供了一些api,如图笔者在接口上根本没有定义这些方法;
因spring webFlux和R2DBC都是基于reactor api,故这里只是简单列举使用例子,更多细节可以查看官网!
如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!