第十九节:学习WebFlux与前端响应式-非阻塞-流式通讯(自学Spring boot 3.x的第四天)
这节记录下如何使用Springboot中的WebFlux的Mono和Flux实现后台向前端的数据传输。
适用场景:需要向某个url请求数据,但是对方url返回的是流式数据,异步的。这个时候传统的restful响应已经不适用了,需要用到webclient来进行处理,而WebFlux刚好集成了webclient。
在webclient中,有2种重要的数据结构,一个是Mono,一个是Flux。
Mono适合一次性数据,而Flux适合多次数据。这样的话,其实就是说:Flux适用于流式数据传输。
为了节省篇幅,本篇主要讲Flux使用方法,Mono和Flux使用方法基本一致。
第一步:
先在pom中集成webflux所需要的依赖库。
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>3.3.3</version>
</dependency>
第二步:
Controller层
/**
* 通过返回Flux<String>>实现非阻塞异步请求,优先此方法
* 通过此方法向客户端发送字符串
* @param messageId
* @return
* @throws IOException
*/
@GetMapping("/chatsse")
public Flux<String> chatSSE(@RequestParam String messageId) throws IOException {
return articleService.sendMessage(content);
}
第三步:
Service层:
public Flux<String> sendMessage(String prompt) throws IOException {
return webClient.post()
.uri(xxx)
.body(BodyInserters.fromValue(jsonStr))
.retrieve()
.bodyToFlux(DataBuffer.class)
.map(response -> {
try {
return "测试";
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
上面代码可以实现向客户端发送流式数据。
代码分析:
上面的webClient.post返回的是一个Flux<String>对象,如果要实时处理数据的话,可以调用它的subscribe和doNext方法,不过这2个都不会影响真实的数据传输结果,只能作为调试使用。(个人理解,不一定正确)。也就是说不管你在这里面写的天花地坠,传输给前端的数据不会改变。传输的。
仅观察数据,不对数据本身起到影响怎么做?可以用doNext或者订阅方法subscribe。
例如:下面的doNext方法可以用于调试观察数据。
.map(response -> {
try {
return extractResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
})
.doOnNext(s->{
System.out.println(s);
});
怎么对传输的流数据进行处理?关键看map方法。
如果要进行数据处理,得先告知flux返回的对象,.需要用到bodyToFlux(DataBuffer.class)。
其次再使用map进行数据转换。把DataBuffer转换为了String。下面的response对象跟bodyToFlux(DataBuffer.class)有关系,这里声明的是DataBuffer,所以response在这里实际上是DataBuffer对象。可以在map方法里使用各种各样的方式处理数据流。
.map(response -> {
try {
return extractResponse(response);
} catch (IOException e) {
throw new RuntimeException(e);
}
});//调用这个函数会自动输出,而且仅执行一次