webflux响应式编程
webflux&webclient
尚硅谷SpringBoot响应式编程教程,最新springboot3入门到实战
响应式编程设计实战及SpringWebFlux源码剖析 - 拉勾
文章目录
- 前置知识
- 1、Lambda
- 2、Function
- 3、StreamAPI
- 中间操作:Intermediate Operations
- 终止操作:Terminal Operation
- 4、Reactive-Stream
- 为什么有Reactive-Stream规范
- 消息传递是响应式核心
- Reactive-Stream规范核心接口
- API Components
- 发布订阅写法
- 响应式编程理解
- Reactor
- 1、快速上手
- 介绍
- 依赖
- 2、响应式编程
- 2.1. 阻塞是对资源的浪费
- 2.2. 异步可以解决问题吗?
- 2.3. 从命令式编程到响应式编程
- 2.3.1. 可编排性与可读性
- 2.3.2. 就像装配流水线
- 2.3.3. 操作符(Operators)
- 2.3.4. subscribe() 之前什么都不会发生
- 2.3.5. 背压
- 2.3.6. 热(Hot) vs 冷(Cold)
- 3、核心特性
- 1、Mono和Flux
- ==响应式流:元素(内容) + 信号(完成/异常)==
- 基本操作
- 2、subscribe()
- 自定义流的信号感知回调
- 自定义消费者
- 3、流的取消
- **Disposable**
- 4、BaseSubscriber与生命周期钩子
- 5、背压和请求重塑
- 1、buffer:缓冲
- 2、limit:限流
- 6、以编程方式创建序列-Sink
- 1、同步环境-generate
- 2、多线程-create
- 7、 handle()
- 8、自定义线程调度
- 9、错误处理
- 1. Catch and return a static default value.
- 2. Catch and execute an alternative path with a fallback method.
- 3. Catch and dynamically compute a fallback value.
- 4. Catch, wrap to a BusinessException, and re-throw.
- 5. Catch, log an error-specific message, and re-throw.
- 6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
- 7. 忽略当前异常,仅通知记录,继续推进
- 8.其它
- 10、常用操作
- filter
- filterMap
- concatMap
- concat
- concatWith
- transform
- defaultIfEmpty
- merge
- zip
- 11、超时与重试
- 12、Sinks工具类
- 单播/多播/重放/背压
- 缓存
- 13、阻塞式api
- block
- 14、Context api
- WebFlux
- 0、组件对比
- 1、WebFlux
- 1、引入
- 2、Reactor Core
- 1、HttpHandler、HttpServer
- 3、DispatcherHandler
- 1、请求处理流程
- 4、注解开发
- 1、目标方法传参
- 2、返回值写法
- 5、文件上传
- 6、错误处理
- 7、RequestContext
- 8、自定义Flux配置
- WebFluxConfigurer
- 9、Filter

前置知识
1、Lambda
Java8语法糖:
package com.atguiggu.lambda;
import java.util.*;
import java.util.function.*;
import java.util.stream.Collectors;
/**
* @author lfy
* @Description
* @create 2023-11-16 20:07
*/
//函数式接口;只要是函数式接口就可以用Lambda表达式简化
//函数式接口: 接口中有且只有一个未实现的方法,这个接口就叫函数式接口
interface MyInterface {
int sum(int i, int j);
}
interface MyHaha {
int haha();
default int heihei() {
return 2;
}
; //默认实现
}
interface My666 {
void aaa(int i,int j,int k);
}
@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
int hehe(int i);
}
//1、自己写实现类
class MyInterfaceImpl implements MyInterface {
@Override
public int sum(int i, int j) {
return i + j;
}
}
public class Lambda {
public static void main(String[] args) {
//声明一个函数
BiConsumer<String,String> consumer = (a,b)->{
System.out.println("哈哈:"+a+";呵呵:"+b);
};
consumer.accept("1","2");
//声明一个函数
Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));
Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);
BiFunction<String,Integer,Long> biFunction = (a,b)-> 888L;
Predicate<Integer> even = (t)-> t%2 ==0;
// even.test()//正向判断
// even.negate().test(2) //反向判断
System.out.println(even.negate().test(2));
}
public static void bbbbb(String[] args) {
var names = new ArrayList<String>();
names.add("Alice");
names.add("Bob");
names.add("Charlie");
names.add("David");
//比较器
// Collections.sort(names, new Comparator<String>() {
// @Override
// public int compare(String o1, String o2) {
// return o2.compareTo(o1);
// }
// });
//直接写函数式接口就方便 (o1,o2)->o1.compareTo(o2)
// Collections.sort(names,(o1,o2)->o1.compareTo(o2));
System.out.println(names);
// 类::方法; 引用类中的实例方法; 忽略lambda的完整写法
Collections.sort(names,String::compareTo);
System.out.println(names);
new Thread(
new Runnable() {
@Override
public void run() {
System.out.println("哈哈啊");
}
}
).start();
Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();
//最佳实战:
//1、以后调用某个方法传入参数,这个参数实例是一个接口对象,且只定义了一个方法,就直接用lambda简化写法
}
/**
* lambda简化函数式接口实例创建
*
* @param args
*/
public static void aaaa(String[] args) {
//1、自己创建实现类对象
MyInterface myInterface = new MyInterfaceImpl();
System.out.println(myInterface.sum(1, 2));
//2、创建匿名实现类
MyInterface myInterface1 = new MyInterface() {
@Override
public int sum(int i, int j) {
return i * i + j * j;
}
};
// System.out.println(myInterface1.sum(2, 3));
//冗余写法
//3、lambda表达式:语法糖 参数列表 + 箭头 + 方法体
MyInterface myInterface2 = (x, y) -> {
return x * x + y * y;
};
System.out.println(myInterface2.sum(2, 3));
//参数位置最少情况
MyHaha myHaha = () -> {
return 1;
};
MyHehe myHehe = y -> {
return y * y;
};
MyHehe hehe2 = y -> y - 1;
//完整写法如上:
//简化写法:
//1)、参数类型可以不写,只写(参数名),参数变量名随意定义;
// 参数表最少可以只有一个 (),或者只有一个参数名;
//2、方法体如果只有一句话,{} 可以省略
MyHehe hehe3 = y -> y + 1;
System.out.println(hehe3.hehe(7));
//以上Lambda表达式简化了实例的创建。
//总结:
// 1、Lambda表达式: (参数表) -> {方法体}
// 2、分辨出你的接口是否函数式接口。 函数式接口就可以lambda简化
}
}
2、Function
在Java中,函数式接口是只包含一个抽象方法的接口。它们是支持Lambda表达式的基础,因为Lambda表达式需要一个目标类型,这个目标类型必须是一个函数式接口。
函数式接口的出入参定义:
1、有入参,无出参【消费者】: function.accept
BiConsumer<String,String> function = (a,b)->{ //能接受两个入参
System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");
2、有入参,有出参【多功能函数】: function.apply
Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));
3、无入参,无出参【普通函数】:
Runnable runnable = () -> System.out.println("aaa");
new Thread(runnable).start();
4、无入参 ,有出参【提供者】: supplier.get()
Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);
java.util.function包下的所有function定义:
- Consumer: 消费者
- Supplier: 提供者
- Predicate: 断言
get/test/apply/accept调用的函数方法;
位于java.util.function包下
3、StreamAPI
最佳实战:以后凡是你写for循环处理数据的统一全部用StreamAPI进行替换;
Stream所有数据和操作被组合成流管道流管道组成:
- 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
- 零或多个中间操作(将一个流变形成另一个流)
- 一个终止操作(产生最终结果)
中间操作:Intermediate Operations
-
filter:过滤; 挑出我们用的元素
-
map: 映射: 一一映射,a 变成 b
-
- mapToInt、mapToLong、mapToDouble
-
flatMap:打散、散列、展开、扩维:一对多映射
filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile、
终止操作:Terminal Operation
forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator
Stream.of(1, 2, 3).filter(e -> {
System.out.println("e1 = " + e);
return true;
}).filter(e -> {
System.out.println("e2 = " + e);
return true;
}).collect(Collectors.toList());
输出是:
e1 = 1
e2 = 1
e1 = 2
e2 = 2
e1 = 3
e2 = 3
4、Reactive-Stream
Reactive Streams是JVM面向流的库的标准和规范(jdk9开始,有java.util.concurrent.Flow类)
1、处理可能无限数量的元素
2、有序
3、在组件之间异步传递元素
4、强制性非阻塞,背压模式
推荐阅读:
-
jdk9 reactive响应式规范:https://www.reactive-streams.org/
-
响应式宣言:https://www.reactivemanifesto.org/zh-CN
-
ReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md
2、响应式编程-Reactor核心.pptx
为什么有Reactive-Stream规范
目的:通过全异步的方式、加缓存区构建一个实时的数据流系统,
Kafka、MQ能构建出大型分布式的响应式系统。
缺本地化的消息系统解决方案:
- 让所有的异步线程能互相监听消息,处理消息,构建实时消息处理流
消息传递是响应式核心
之前a调用b,必须b做完了事情,a才能接着做事情。现在响应式就是a先将b要做的事情放到缓冲区中,b监听这个缓冲区,从缓冲区中拿数据,去做事情,这样a就不用等待了。
引入一个缓存区,引入消息队列,就能实现全系统、全异步、不阻塞、不等待、实时响应
Reactive-Stream规范核心接口
API Components
查看jdk9的java.util.concurrent.Flow类
发布订阅写法
package com.atguigu.flow;
import lombok.SneakyThrows;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* @author lfy
* @Description
* @create 2023-11-17 20:59
*/
public class FlowDemo {
//定义流中间操作处理器; 只用写订阅者的接口
static class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String> {
private Flow.Subscription subscription; //保存绑定关系
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("processor订阅绑定完成");
this.subscription = subscription;
subscription.request(1); //找上游要一个数据
}
@Override //数据到达,触发这个回调
public void onNext(String item) {
System.out.println("processor拿到数据:"+item);
//再加工
item += ":哈哈";
submit(item);//把我加工后的数据发出去
subscription.request(1); //再要新数据
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
/**
* 1、Publisher:发布者
* 2、Subscriber:订阅者
* 3、Subscription: 订阅关系
* 4、Processor: 处理器
* @param args
*/
//发布订阅模型:观察者模式,
public static void main(String[] args) throws InterruptedException {
//1、定义一个发布者; 发布数据;
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
//2、定一个中间操作: 给每个元素加个 哈哈 前缀
MyProcessor myProcessor1 = new MyProcessor();
MyProcessor myProcessor2 = new MyProcessor();
MyProcessor myProcessor3 = new MyProcessor();
//3、定义一个订阅者; 订阅者感兴趣发布者的数据;
Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override //在订阅时 onXxxx:在xxx事件发生时,执行这个回调
public void onSubscribe(Flow.Subscription subscription) {
System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);
this.subscription = subscription;
//从上游请求一个数据
subscription.request(1);
}
@Override //在下一个元素到达时; 执行这个回调; 接受到新数据
public void onNext(String item) {
System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);
if(item.equals("p-7")){
subscription.cancel(); //取消订阅
}else {
subscription.request(1);
}
}
@Override //在错误发生时,
public void onError(Throwable throwable) {
System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);
}
@Override //在完成时
public void onComplete() {
System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");
}
};
//4、绑定发布者和订阅者
publisher.subscribe(myProcessor1); //此时处理器相当于订阅者
myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者
myProcessor2.subscribe(myProcessor3);
myProcessor3.subscribe(subscriber); //链表关系绑定出责任链。
//绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。
// publisher.subscribe(subscriber);
for (int i = 0; i < 10; i++) {
//发布10条数据
if(i == 5){
// publisher.closeExceptionally(new RuntimeException("5555"));
}else {
publisher.submit("p-"+i);
}
//publisher发布的所有数据在它的buffer区;
//中断
// publisher.closeExceptionally();
}
//ReactiveStream
//jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;
// 我们只需要编排流处理环节,其它的交给jvm完成(比如:异步就又jvm自己干,我们不用关心)
//发布者通道关闭
publisher.close();
// publisher.subscribe(subscriber2);
//发布者有数据,订阅者就会拿到
Thread.sleep(20000);
}
}
响应式编程理解
使用少量资源处理大量并发的一种解决方案。
Reactor
projectreactor官网
1、快速上手
介绍
Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)
和 Mono(用于 [0|1]个元素)
,并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。
Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。
依赖
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2、响应式编程
响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。
了解历史:
- 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
- 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
- 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
- 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]
2.1. 阻塞是对资源的浪费
现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。
广义来说我们有两种思路来提升程序性能:
- 并行化(parallelize) :使用更多的线程和硬件资源。[异步]
- 基于现有的资源来 提高执行效率 。
通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。
更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。
所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。
2.2. 异步可以解决问题吗?
第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。
但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:
- 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
- Futures :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。
这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。
回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。
考虑这样一种情景:
- 在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
- 这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):
回调地狱(Callback Hell)的例子:
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
Reactor改造后为:
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。
userService.getFavorites(userId)
.timeout(Duration.ofMillis(800))
.onErrorResume(cacheService.cachedFavoritesFor(userId))
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
额外扩展:
Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。
考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。
CompletableFuture 处理组合的例子
CompletableFuture<List<String>> ids = ifhIds();
CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
Stream<CompletableFuture<String>> zip =
l.stream().map(i -> {
CompletableFuture<String> nameTask = ifhName(i);
CompletableFuture<Integer> statTask = ifhStat(i);
return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
});
List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
return allDone.thenApply(v -> combinationList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
});
List<String> results = result.join();
assertThat(results).contains(
"Name NameJoe has stats 103",
"Name NameBart has stats 104",
"Name NameHenry has stats 105",
"Name NameNicole has stats 106",
"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
2.3. 从命令式编程到响应式编程
类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:
- 可编排性(Composability) 以及 可读性(Readability)
- 使用丰富的 操作符 来处理形如 流 的数据
- 在 订阅(subscribe) 之前什么都不会发生
- 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
- 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
2.3.1. 可编排性与可读性
可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。
这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。
Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。
2.3.2. 就像装配流水线
你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。
原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。
2.3.3. 操作符(Operators)
在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。**每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。**就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。
理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。
虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。
2.3.4. subscribe() 之前什么都不会发生
在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。
当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。
2.3.5. 背压
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。
在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。
中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。
这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。
2.3.6. 热(Hot) vs 冷(Cold)
在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:
- 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
- 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。
3、核心特性
1、Mono和Flux
Mono: 0|1 数据流
Flux: N数据流
响应式流:元素(内容) + 信号(完成/异常)
基本操作
类比Stream流操作,中间操作对应流转为另1个新流,终止操作对应订阅。只有开始订阅了,流才会开始,流中的 每个元素 和 信号 都是按顺序进入流处理,然后交给订阅者处理。
package com.atguigu.reactor;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* @author lfy
* @Description
* @create 2023-11-23 20:58
*/
public class FluxDemo {
public static void main(String[] args) {
// Flux.concat(Flux.just(1,2,3),Flux.just(7,8,9))
// .subscribe(System.out::println);
Flux.range(1, 7)
// .log() //日志 onNext(1~7)
.filter(i -> i > 3) //挑出>3的元素
// .log() //onNext(4~7)
.map(i -> "haha-" + i)
.log() // onNext(haha-4 ~ 7)
.subscribe(System.out::println);
//今天: Flux、Mono、弹珠图、事件感知API、每个操作都是操作的上个流的东西
}
/**
* 响应式编程核心:看懂文档弹珠图;
* 信号: 正常/异常(取消)
* SignalType:
* SUBSCRIBE: 被订阅
* REQUEST: 请求了N个元素
* CANCEL: 流被取消
* ON_SUBSCRIBE:在订阅时候
* ON_NEXT: 在元素到达
* ON_ERROR: 在流错误
* ON_COMPLETE:在流正常完成时
* AFTER_TERMINATE:中断以后
* CURRENT_CONTEXT:当前上下文
* ON_CONTEXT:感知上下文
* <p>
* doOnXxx API触发时机
* 1、doOnNext:每个数据(流的数据)到达的时候触发
* 2、doOnEach:每个元素(流的数据和信号)到达的时候触发
* 3、doOnRequest: 消费者请求流元素的时候
* 4、doOnError:流发生错误
* 5、doOnSubscribe: 流被订阅的时候
* 6、doOnTerminate: 发送取消/异常信号中断了流
* 7、doOnCancle: 流被取消
* 8、doOnDiscard:流中元素被忽略的时候
*
* @param args
*/
public void doOnXxxx(String[] args) {
// 关键:doOnNext:表示流中某个元素到达以后触发我一个回调
// doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面
Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6)
.doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发
.doOnEach(integerSignal -> { //each封装的详细
System.out.println("doOnEach.." + integerSignal);
})//1,2,3,4,5,6,7,0
.map(integer -> 10 / integer) //10,5,3,
.doOnError(throwable -> {
System.out.println("数据库已经保存了异常:" + throwable.getMessage());
})
.map(integer -> 100 / integer)
.doOnNext(integer -> System.out.println("元素到哈:" + integer))
.subscribe(System.out::println);
}
//Mono<Integer>: 只有一个Integer
//Flux<Integer>: 有很多Integer
public void fluxDoOn(String[] args) throws IOException, InterruptedException {
// Mono<Integer> just = Mono.just(1);
//
// just.subscribe(System.out::println);
//空流: 链式API中,下面的操作符,操作的是上面的流。
// 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;
Flux<Integer> flux = Flux.range(1, 7)
.delayElements(Duration.ofSeconds(1))
.doOnComplete(() -> {
System.out.println("流正常结束...");
})
.doOnCancel(() -> {
System.out.println("流已被取消...");
})
.doOnError(throwable -> {
System.out.println("流出错..." + throwable);
})
.doOnNext(integer -> {
System.out.println("doOnNext..." + integer);
}); //有一个信号:此时代表完成信号
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("订阅者和发布者绑定好了:" + subscription);
request(1); //背压
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("元素到达:" + value);
if (value < 5) {
request(1);
if (value == 3) {
int i = 10 / 0;
}
} else {
cancel();//取消订阅
}
; //继续要元素
}
@Override
protected void hookOnComplete() {
System.out.println("数据流结束");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("数据流异常");
}
@Override
protected void hookOnCancel() {
System.out.println("数据流被取消");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("结束信号:" + type);
// 正常、异常
// try {
// //业务
// }catch (Exception e){
//
// }finally {
// //结束
// }
}
});
Thread.sleep(2000);
// Flux<Integer> range = Flux.range(1, 7);
System.in.read();
}
//测试Flux
public void flux() throws IOException {
// Mono: 0|1个元素的流
// Flux: N个元素的流; N>1
//发布者发布数据流:源头
//1、多元素的流
Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); //
//流不消费就没用; 消费:订阅
just.subscribe(e -> System.out.println("e1 = " + e));
//一个数据流可以有很多消费者
just.subscribe(e -> System.out.println("e2 = " + e));
//对于每个消费者来说流都是一样的; 广播模式;
System.out.println("==========");
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字
flux.subscribe(System.out::println);
System.in.read();
}
}
2、subscribe()
flxu.subscribe() // 流被订阅,默认订阅
flux.subscribe(e->System.out.println(e)) // 指定订阅规则:正常消费者:只消费正常元素
自定义流的信号感知回调
flux.subscribe(
v-> System.out.println("v = " + v), //流元素消费
throwable -> System.out.println("throwable = " + throwable), //感知异常结束
()-> System.out.println("流结束了...") //感知正常结束
);
自定义消费者
flux.subscribe(new BaseSubscriber<String>() {
// 生命周期钩子1: 订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了..."+subscription);
//找发布者要数据
request(1); //要1个数据
// requestUnbounded(); //要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理:"+value);
request(1); //要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束...");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常..."+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消...");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调...一定会被执行");
}
});
3、流的取消
消费者调用 cancle() 取消流的订阅;
Disposable
Flux<String> flux = Flux.range(1, 10)
.map(i -> {
System.out.println("map..."+i);
if(i==9) {
i = 10/(9-i); //数学运算异常; doOnXxx
}
return "哈哈:" + i;
}); //流错误的时候,把错误吃掉,转为正常信号
// flux.subscribe(); //流被订阅; 默认订阅;
// flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素
// flux.subscribe(
// v-> System.out.println("v = " + v), //流元素消费
// throwable -> System.out.println("throwable = " + throwable), //感知异常结束
// ()-> System.out.println("流结束了...") //感知正常结束
// );
// 流的生命周期钩子可以传播给订阅者。
// a() {
// data = b();
// }
flux.subscribe(new BaseSubscriber<String>() {
// 生命周期钩子1: 订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了..."+subscription);
//找发布者要数据
request(1); //要1个数据
// requestUnbounded(); //要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理:"+value);
if(value.equals("哈哈:5")){
cancel(); //取消流
}
request(1); //要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束...");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常..."+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消...");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调...一定会被执行");
}
});
4、BaseSubscriber与生命周期钩子
自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;
Flux.range(1, 10)
.map(e -> String.valueOf(e))
.map(e -> e + "-zzhua")
.doOnCancel(()->{
System.out.println("doOnCancel");
})
.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("hookOnSubscribe");
subscription.request(1);
}
@Override
protected void hookOnNext(String value) {
System.out.println("hookOnNext:" + value);
request(1);
if (value.equals("5-zzhua")) {
cancel();
}
}
@Override
protected void hookOnComplete() {
System.out.println("hookOnComplete");
super.hookOnComplete(); // no-op
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("hookOnError" + throwable);
super.hookOnError(throwable); // throw ...
}
@Override
protected void hookOnCancel() {
System.out.println("hookOnCancel");
super.hookOnCancel(); // no-op
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("hookFinally");
super.hookFinally(type);// no-op
}
})
;
5、背压和请求重塑
背压(Backpressure ):由订阅者请求发布者传送指定数量请求,而不是由发布者任意发布
请求重塑(Reshape Requests)
1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10) //原始流10个
.buffer(3)
.log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
// //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成
flux.subscribe(v-> system.out.println("类型:"+v.getclass()+" 值为:"+V));
// 消费者每次 request(1)拿到的是几个真正的数据:buffer数据
Flux<List<Integer>> flux = Flux.range(0, 10)
.buffer(3)
.log()
;
flux.subscribe(e -> {
System.out.println("subscribe" + e);
});
2、limit:限流
Flux.range(1, 1000)
.log()
//限流触发,看上游是怎么限流获取数据的
.limitRate(100)
.subscribe();
// 75 % 预取策路:limitRate(100)
// 第一次抓取100个数据,如果 75 % 的元素已经处理了,继续抓取 新的 75 % 元素;
6、以编程方式创建序列-Sink
Sink.next
Sink.complete
1、同步环境-generate
// 创建1个1~10的序列
Flux<Object> flux = Flux.generate(
() -> 0, // 初始值
(state, sink) -> {
if (state <= 10) {
sink.next(state);// 发送数据(每次执行,只能调用1次next)
} else {
sink.complete(); // 完成信号
}
if (state == 7) {
sink.error(new RuntimeException("i dislike 7"));
}
return state + 1;
}
);
flux
.log()
.doOnError(e -> System.out.println("doOnError:" + e))
.subscribe();
System.out.println("main start...");
Disposable disposable = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1))
.log()
.map(t -> t * 10)
.doOnCancel(()-> System.out.println("doOnCancel"))
.subscribe(t -> System.out.println("subscribe:" + t));
System.out.println("start a thread...");
// 5s之后,取消这个流
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
}).start();
System.out.println("main end...");
/*
main start...
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
start a thread...
main end...
[ INFO] (parallel-1) onNext(1)
subscribe:10
[ INFO] (parallel-2) onNext(2)
subscribe:20
[ INFO] (parallel-3) onNext(3)
subscribe:30
[ INFO] (parallel-4) onNext(4)
subscribe:40
doOnCancel
[ INFO] (Thread-0) cancel()
*/
2、多线程-create
static class MyListener {
private FluxSink<Object> fluxSink;
public MyListener(FluxSink<Object> fluxSink) {
this.fluxSink = fluxSink;
}
public void online(String name) {
System.out.println("用户登录了: " + name);
fluxSink.next(name); // 传入用户
}
}
public static void main(String[] args) {
Flux<Object> flux = Flux.create(fluxSink -> {
MyListener myListener = new MyListener(fluxSink);
for (int i = 1; i <= 100; i++) {
myListener.online("张" + i);
}
});
flux.subscribe();
}
7、 handle()
自定义流中元素处理规则
Flux.range(1, 10)
.log()
// 自定义元素处理规则, 比如替代map(..)等操作,由于handle可以自定义,所以更加强大
.handle((value, sink)->{
sink.next("张~" + value);
})
.log()
.subscribe()
8、自定义线程调度
响应式:响应式编程: 全异步、消息、事件回调
默认还是用当前线程,生成整个流、流的发布、流的中间操作
// 流的发布、中间操作,默认使用当前线程
Flux.range(1, 10)
.publishOn(Schedulers.single()) // 在哪个线程池把这个流的数据和操作执行
.log()
.map(t -> t * 10)
.log()
.subscribeOn(Schedulers.single()) // 指定订阅者订阅操作线程
.subscribe((e) -> System.out.println(Thread.currentThread().getName() + ":" + e));
// 调度器: 线程池
// Schedulers.immediate(); // 无执行上下文,当前线程运行所有操作
// Schedulers.single(); // 使用固定的1个单线程
// 线程池中有 10 * cpu核心个线程, 队列默认10w, keepAliveTime 60s
// Schedulers.boundedElastic(); // 有界、弹性调度; 不是无限扩充的的线程池;
Schedulers.parallel();
// 自定义线程池
Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)));
LockSupport.park();
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.log()
.publishOn(s)
.map(i -> "value " + i)
.log()
;
// 只要不指定线程池,默认发布者用的线程就是订阅者的线程;
new Thread(() -> flux.subscribe(System.out::println)).start();
/*
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | request(unbounded)
[ INFO] (Thread-0) | request(256)
[ INFO] (Thread-0) | onNext(11)
[ INFO] (Thread-0) | onNext(12)
[ INFO] (Thread-0) | onComplete()
[ INFO] (parallel-scheduler-1) | onNext(value 11)
value 11
[ INFO] (parallel-scheduler-1) | onNext(value 12)
value 12
[ INFO] (parallel-scheduler-1) | onComplete()
*/
9、错误处理
命令式编程:常见的错误处理方式
1. Catch and return a static default value.
捕获异常返回一个静态默认值
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
onErrorReturn: 实现上面效果,错误的时候返回一个值
- 1、吃掉异常,消费者无异常感知
- 2、返回一个兜底默认值
- 3、流正常完成(不再处理后续元素);
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.subscribe(
v-> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
()-> System.out.println("流结束") // error handling example
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
// 只要发生异常,就使用默认值
.onErrorReturn("哈哈-6666")
.subscribe(
v-> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
()-> System.out.println("流结束") // error handling example
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-6666
流结束
*/
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
// 是指定的异常,才使用默认值
// .onErrorReturn(ArithmeticException.class,"哈哈-6666")
.onErrorReturn(NullPointerException.class,"哈哈-6666")
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束") // error handling example
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
2. Catch and execute an alternative path with a fallback method.
吃掉异常,执行一个兜底方法;
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return doOtherthing(10);
}
onErrorResume
- 1、吃掉异常,消费者无异常感知
- 2、调用一个兜底方法
- 3、流正常完成
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(err -> Mono.just("哈哈-777"))
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
3. Catch and dynamically compute a fallback value.
捕获并动态计算一个返回值,即根据错误返回一个新值
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
- 1、吃掉异常,消费者有感知
- 2、调用一个自定义方法
- 3、流异常完成
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(err -> {
if (err instanceof NullPointerException) {
return Mono.just("哈哈-777");
}
return Mono.just("其它");
})
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
4. Catch, wrap to a BusinessException, and re-throw.
捕获并包装成一个业务异常,并重新抛出
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
包装重新抛出异常: 推荐用 .onErrorMap
- 1、吃掉异常,消费者有感知
- 2、抛新异常
- 3、流异常完成
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage())))
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorMap(err -> {
return new BusinessException("除数不能为0" + err.getMessage());
})
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
5. Catch, log an error-specific message, and re-throw.
捕获异常,记录特殊的错误日志,重新抛出
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
- 异常被捕获、做自己的事情
- 不影响异常继续顺着流水线传播
- 不吃掉异常,只在异常发生的时候做一件事,订阅者有感知
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(err -> {
System.out.println("err已被记录 = " + err);
})
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
*/
6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(err -> {
System.out.println("err已被记录 = " + err);
})
.doFinally(signalType -> {
System.out.println("流信号:" + signalType);
})
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
流信号:onError
*/
7. 忽略当前异常,仅通知记录,继续推进
Flux.just(1, 2, 3, 0, 5)
.map(i -> 10 / i)
.onErrorContinue((err, val) -> {
System.out.println("err = " + err);
System.out.println("val = " + val);
System.out.println("发现" + val + "有问题了,继续执行其他的,我会记录这个问题");
}) //发生
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
val = 0
发现0有问题了,继续执行其他的,我会记录这个问题
v = 2
流结束
*/
8.其它
Flux.just(1, 2, 3, 0, 5)
.map(i -> 10 / i)
.onErrorStop() // 错误后,停止流,源头中断,所有订阅者全部结束,错误结束
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 3, 0, 5)
.map(i -> 10 / i)
.onErrorComplete() //发生错误后,停止流
.subscribe(
v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束")
);
/*
v = 10
v = 5
v = 3
流结束
*/
10、常用操作
filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith…
filter
Flux.just(1, 2, 3, 4)
.log()
.filter(e -> e % 2 == 0)
.subscribe();
/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onComplete()
*/
filterMap
Flux.just("zhang san", "li si")
.log()
.flatMap((t)->{
String[] arr = t.split("\\s");
return Flux.fromArray(arr);
})
.log()
.subscribe();
/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(zhang san)
[ INFO] (main) onNext(zhang)
[ INFO] (main) onNext(san)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(li si)
[ INFO] (main) onNext(li)
[ INFO] (main) onNext(si)
[ INFO] (main) | request(1)
[ INFO] (main) | onComplete()
[ INFO] (main) onComplete()
*/
concatMap
Flux.just(1, 2)
.concatMap(s -> Flux.just(s * 10, s * 100))
.log()
.subscribe((e)-> System.out.println("e = " + e))
;
/*
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(10)
e = 10
[ INFO] (main) onNext(100)
e = 100
[ INFO] (main) onNext(20)
e = 20
[ INFO] (main) onNext(200)
e = 200
[ INFO] (main) onComplete()
*/
concat
Flux.concat(Flux.just(1, 2), Flux.just("a", "b"))
.log()
.subscribe();
/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(a)
[ INFO] (main) onNext(b)
[ INFO] (main) onComplete()
*/
concatWith
Flux.just(1, 2).concatWith(Flux.just(3, 4))
.log()
.subscribe
/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
*/
transform
AtomicInteger atomic = new AtomicInteger(0);
Flux<String> flux = Flux.just("a", "b", "c")
.transform(values -> {
if (atomic.incrementAndGet() == 1) {
//如果是:第一次调用,老流中的所有元素转成大写
return values.map(String::toUpperCase);
} else {
//如果不是第一次调用,原封不动返回
return values;
}
});
//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次
//transform 有defer,会共享外部变量的值。 有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次
flux.subscribe(v -> System.out.println("订阅者1:v = " + v));
flux.subscribe(v -> System.out.println("订阅者2:v = " + v));
/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = A
订阅者2:v = B
订阅者2:v = C
*/
改成transformDeferred
/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = a
订阅者2:v = b
订阅者2:v = c
*/
defaultIfEmpty
/**
* defaultIfEmpty: 静态兜底数据
* switchIfEmpty: 空转换; 调用动态兜底方法; 返回新流数据
*/
void empty() {
//Mono.just(null);//流里面有一个null值元素
//Mono.empty();//流里面没有元素,只有完成信号/结束信号
haha()
.defaultIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;
.subscribe(v -> System.out.println("v = " + v));
haha()
.switchIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;
.subscribe(v -> System.out.println("v = " + v));
}
Mono<String> hehe() {
return Mono.just("兜底数据...");
}
Mono<String> haha() {
return Mono.empty();
}
merge
/**
* concat: 连接; A流 所有元素和 B流所有元素拼接
* merge:合并; A流 所有元素和 B流所有元素 按照时间序列合并
* mergeWith:
* mergeSequential: 按照哪个流先发元素排队
*/
@Test
void merge() throws IOException {
Flux.mergeSequential();
Flux.merge(
Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),
Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),
Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500)))
.log()
.subscribe();
Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6));
System.in.read();
}
zip
/**
* zip: 无法结对的元素会被忽略;
* 最多支持8流压缩;
*/
void zip (){
//Tuple:元组;
// Flux< Tuple2:<Integer,String> >
Flux.just(1,2,3)
.zipWith(Flux.just("a","b","c","d"))
.map(tuple -> {
Integer t1 = tuple.getT1(); // 元组中的第一个元素
String t2 = tuple.getT2(); // 元组中的第二个元素
return t1 + "==>" + t2;
})
.log()
.subscribe(v-> System.out.println("v = " + v));
}
11、超时与重试
Flux.just(1)
.delayElements(Duration.ofSeconds(3))
.log()
.timeout(Duration.ofSeconds(2))
.retry(3) // 把流从头到尾重新请求1次
.map(i -> "haha-" + i)
.subscribe(e-> System.out.println("e = " + e))
;
LockSupport.park();
/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
[ERROR] (parallel-7) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:296)
...
*/
Flux.just(1)
.delayElements(Duration.ofSeconds(3))
.log()
.timeout(Duration.ofSeconds(2))
.retry(3) // 把流从头到尾重新请求1次
.onErrorReturn(2) // 上面重试失败后, 会抛出异常, 这里在抛出异常的情况下返回2
.map(i -> "haha-" + i)
.subscribe(e-> System.out.println("e = " + e))
;
LockSupport.park();
/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
e = haha-2
*/
12、Sinks工具类
// Sinks.many(); // 发送Flux数据
// Sinks.one(); // 发送Mono数据
// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的
Sinks.many().unicast(); // 单播 这个管道只能绑定单个订阅者(消费者)
Sinks.many().multicast(); // 多播 这个管道能绑定多个订阅者
Sinks.many().replay(); // 重放 这个管道能重放元素。是否给后来的订阅者把之前的元素依然发给它;
单播/多播/重放/背压
// Sinks.Many<Object> many = Sinks.many()
// .multicast() //多播
// .onBackpressureBuffer(); //背压队列
//默认订阅者,从订阅的那一刻开始接元素
//发布者数据重放; 底层利用队列进行缓存之前数据
Sinks.Many<Object> many = Sinks.many().replay().limit(3);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
many.tryEmitNext("a-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}).start();
//订阅
many.asFlux().subscribe(v -> System.out.println("v1 = " + v));
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
many.asFlux().subscribe(v -> System.out.println("v2 = " + v));
}).start();
缓存
Flux<Integer> cache = Flux.range(1, 10)
.delayElements(Duration.ofSeconds(1)) // 不调缓存默认就是缓存所有
.cache(2); // 缓存两个元素; 默认全部缓存
// 立即订阅
cache.subscribe();
new Thread(()->{
// 5s后再去订阅
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
cache.subscribe(v-> System.out.println("v = " + v));
}).start();
LockSupport.park();
/*
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9
v = 10
*/
13、阻塞式api
block
Integer integer = Flux.just(1, 2, 4)
.map(i -> i + 10)
.blockLast();
System.out.println(integer);
List<Integer> integers = Flux.just(1, 2, 4)
.map(i -> i + 10)
.collectList()
.block(); // 也是一种订阅者; BlockingMonoSubscriber
System.out.println("integers = " + integers);
/*
14
integers = [11, 12, 14]
*/
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
Flux.range(1,1000000)
.buffer(100)
.parallel(8)
.runOn(Schedulers.newParallel("yy"))
.log()
.flatMap(list->Flux.fromIterable(list))
.collectSortedList(Integer::compareTo)
.subscribe(v-> System.out.println("v = " + v));
LockSupport.park();
14、Context api
//Context-API: https://projectreactor.io/docs/core/release/reference/#context
//ThreadLocal在响应式编程中无法使用。
//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;
static void threadlocal() {
//支持Context的中间操作
Flux.just(1, 2, 3)
.transformDeferredContextual((flux, context) -> {
System.out.println("flux = " + flux);
System.out.println("context = " + context);
return flux.map(i -> i + "==>" + context.get("prefix"));
})
//上游能拿到下游的最近一次数据
.contextWrite(Context.of("prefix", "哈哈"))
//ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
.subscribe(v -> System.out.println("v = " + v));
// 以前 命令式编程
// controller -- service -- dao
// 响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播
}
WebFlux
-
Reactor核心:HttpHandler 原生API;
-
DispatcherHandler 原理;
-
- DispatcherHandler 组件分析
- DispatcherHandler 请求处理流程
- 返回结果处理
- 异常处理
- 视图解析
-
-
- 重定向
- Rendering
-
-
注解式 - Controller
-
- 兼容老版本方式
- 新版本变化
-
-
- SSE
- 文件上传
-
-
错误响应
-
- @ExceptionHandler
-
-
- ErrorResponse: 自定义 错误响应
- ProblemDetail:自定义PD返回
-
-
WebFlux配置
-
- @EnableWebFlux
- WebFluxConfigurer
WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步
、非阻塞
的web响应式框架
底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统
优点:能使用少量资源处理大量请求;
0、组件对比
API功能 | Servlet-阻塞式Web | WebFlux-响应式Web |
---|---|---|
前端控制器 | DispatcherServlet | DispatcherHandler |
处理器 | Controller | WebHandler/Controller |
请求、响应 | ServletRequest、ServletResponse | ServerWebExchange: ServerHttpRequest、ServerHttpResponse |
过滤器 | Filter(HttpFilter) | WebFilter |
异常处理器 | HandlerExceptionResolver | DispatchExceptionHandler |
Web配置 | @EnableWebMvc | @EnableWebFlux |
自定义配置 | WebMvcConfigurer | WebFluxConfigurer |
返回结果 | 任意 | Mono、Flux、任意 |
发送REST请求 | RestTemplate | WebClient |
Mono: 返回[ 0 |1 ]数据流
Flux:返回[ N ]数据流
1、WebFlux
底层基于Netty实现的Web容器与请求/响应处理机制
参照:https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html
1、引入
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.6</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
</dependencies>
Context 响应式上下文数据传递; 由下游传播给上游;
以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程
现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式
大数据流程: 从一个数据源拿到大量数据进行分析计算;
ProductVistorDao.loadData()
.distinct()
.map()
.filter()
.handle()
.subscribe();
;//加载最新的商品浏览数据
2、Reactor Core
1、HttpHandler、HttpServer
public static void main(String[] args) throws IOException {
//快速自己编写一个能处理请求的服务器
//1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
HttpHandler handler = (ServerHttpRequest request,
ServerHttpResponse response)->{
URI uri = request.getURI();
System.out.println(Thread.currentThread()+"请求进来:"+uri);
//编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
// response.getHeaders(); // 获取响应头
// response.getCookies(); // 获取Cookie
// response.getStatusCode(); // 获取响应状态码;
// response.bufferFactory(); // buffer工厂
// response.writeWith() // 把xxx写出去
// response.setComplete(); // 响应结束, 该方法返回Mono<Void>
//创建 响应数据的 DataBuffer
DataBufferFactory factory = response.bufferFactory();
//数据Buffer
DataBuffer buffer = factory.wrap(new String(uri + " => Hello!").getBytes());
// 数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
// 需要一个 DataBuffer 的发布者
return response.writeWith(Mono.just(buffer));
};
//2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
//3、启动Netty服务器
HttpServer.create()
.host("localhost")
.port(8080)
.handle(adapter) //用指定的处理器处理请求
.bindNow(); //现在就绑定
System.out.println("服务器启动完成....监听8080,接受请求");
System.in.read();
System.out.println("服务器停止....");
}
3、DispatcherHandler
SpringMVC: DispatcherServlet;
SpringWebFlux: DispatcherHandler
1、请求处理流程
- HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
- HandlerAdapter:处理器适配器;反射执行目标方法
- HandlerResultHandler:处理器结果处理器;
SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;
WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings
// 找每一个mapping看谁能处理请求
.concatMap(mapping -> mapping.getHandler(exchange))
.next() // 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
.switchIfEmpty(createNotFoundError()) // 如果没拿到这个元素,则响应404错误;
// 异常处理,一旦前面发生异常,调用处理异常
.onErrorResume(ex -> handleDispatchError(exchange, ex))
// 调用方法处理请求,得到响应结果
.flatMap(handler -> handleRequestWith(exchange, handler));
}
- 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
- 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
- 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
- 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;
源码中的核心两个:
- handleRequestWith: 编写了handlerAdapter怎么处理请求
- handleResult: String、User、ServerSendEvent、Mono、Flux …
concatMap: 先挨个元素变,然后把变的结果按照之前元素的顺序拼接成一个完整流
private <R> Mono<R> createNotFoundError() {
Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);
return Mono.error(ex);
}
Mono.defer(() -> {
Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);
return Mono.error(ex);
}); //有订阅者,且流被激活后就动态调用这个方法; 延迟加载;
4、注解开发
1、目标方法传参
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html
Controller method argument | Description |
---|---|
ServerWebExchange | 封装了请求和响应对象的对象; 自定义获取数据、自定义响应 |
ServerHttpRequest, ServerHttpResponse | 请求、响应 |
WebSession | 访问Session对象 |
java.security.Principal | |
org.springframework.http.HttpMethod | 请求方式 |
java.util.Locale | 国际化 |
java.util.TimeZone + java.time.ZoneId | 时区 |
@PathVariable | 路径变量 |
@MatrixVariable | 矩阵变量 |
@RequestParam | 请求参数 |
@RequestHeader | 请求头; |
@CookieValue | 获取Cookie |
@RequestBody | 获取请求体,Post、文件上传 |
HttpEntity | 封装后的请求对象 |
@RequestPart | 获取文件上传的数据 multipart/form-data. |
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap. | Map、Model、ModelMap |
@ModelAttribute | |
Errors, BindingResult | 数据校验,封装错误 |
SessionStatus + class-level @SessionAttributes | |
UriComponentsBuilder | For preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links. |
@SessionAttribute | |
@RequestAttribute | 转发请求的请求域数据 |
Any other argument | 所有对象都能作为参数:1、基本类型 ,等于标注@RequestParam 2、对象类型,等于标注 @ModelAttribute |
2、返回值写法
sse和websocket区别:
- SSE:单工;请求过去以后,等待服务端源源不断的数据
- websocket:双工: 连接建立后,可以任何交互;
Controller method return value | Description |
---|---|
@ResponseBody | 把响应数据写出去,如果是对象,可以自动转为json |
HttpEntity, ResponseEntity | ResponseEntity:支持快捷自定义响应内容 |
HttpHeaders | 没有响应内容,只有响应头 |
ErrorResponse | 快速构建错误响应 |
ProblemDetail | SpringBoot3; |
String | 就是和以前的使用规则一样;forward: 转发到一个地址redirect: 重定向到一个地址配合模板引擎 |
View | 直接返回视图对象 |
java.util.Map, org.springframework.ui.Model | 以前一样 |
@ModelAttribute | 以前一样 |
Rendering | 新版的页面跳转API; 不能标注 @ResponseBody 注解 |
void | 仅代表响应完成信号 |
Flux, Observable, or other reactive type | 使用 text/event-stream 完成SSE效果 |
Other return values | 未在上述列表的其他返回值,都会当成给页面的数据; |
5、文件上传
https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html
class MyForm {
private String name;
private MultipartFile file;
// ...
}
@Controller
public class FileUploadController {
@PostMapping("/form")
public String handleFormUpload(MyForm form, BindingResult errors) {
// ...
}
}
现在
@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata,
@RequestPart("file-data") FilePart file) {
// ...
}
6、错误处理
@ExceptionHandler(ArithmeticException.class)
public String error(ArithmeticException exception){
System.out.println("发生了数学运算异常"+exception);
//返回这些进行错误处理;
// ProblemDetail: 建造者:声明式编程、链式调用
// ErrorResponse :
return "炸了,哈哈...";
}
7、RequestContext
8、自定义Flux配置
WebFluxConfigurer
容器中注入这个类型的组件,重写底层逻辑
@Configuration
public class MyWebConfiguration {
//配置底层
@Bean
public WebFluxConfigurer webFluxConfigurer(){
return new WebFluxConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedHeaders("*")
.allowedMethods("*")
.allowedOrigins("localhost");
}
};
}
}
9、Filter
@Component
public class MyWebFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
System.out.println("请求处理放行到目标方法之前...");
Mono<Void> filter = chain.filter(exchange); //放行
//流一旦经过某个操作就会变成新流
Mono<Void> voidMono = filter.doOnError(err -> {
System.out.println("目标方法异常以后...");
}) // 目标方法发生异常后做事
.doFinally(signalType -> {
System.out.println("目标方法执行以后...");
});// 目标方法执行之后
//上面执行不花时间。
return voidMono; //看清楚返回的是谁!!!
}
}