(八)Reactor响应式框架之核心特性
一、Mono和Flux
Mono: 0|1 数据流
Flux: N 数据流
响应式流:元素(内容) + 信号(完成/异常);
二、subscribe()
1.自定义流的信号感知回调
flux.subscribe(
v-> System.out.println("v = " + v), //流元素消费
throwable -> System.out.println("throwable = " + throwable), //感知异常结束
()-> System.out.println("流结束了...") //感知正常结束
);
2.自定义消费者
flux.subscribe(new BaseSubscriber<String>() {
// 生命周期钩子1: 订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了..."+subscription);
//找发布者要数据
request(1); //要1个数据
// requestUnbounded(); //要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理:"+value);
request(1); //要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束...");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常..."+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消...");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调...一定会被执行");
}
});
三、流的取消
消费者调用 cancle() 取消流的订阅;
Disposable
Flux<String> flux = Flux.range(1, 10)
.map(i -> {
System.out.println("map..."+i);
if(i==9) {
i = 10/(9-i); //数学运算异常; doOnXxx
}
return "哈哈:" + i;
}); //流错误的时候,把错误吃掉,转为正常信号
// flux.subscribe(); //流被订阅; 默认订阅;
// flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素
// flux.subscribe(
// v-> System.out.println("v = " + v), //流元素消费
// throwable -> System.out.println("throwable = " + throwable), //感知异常结束
// ()-> System.out.println("流结束了...") //感知正常结束
// );
// 流的生命周期钩子可以传播给订阅者。
// a() {
// data = b();
// }
flux.subscribe(new BaseSubscriber<String>() {
// 生命周期钩子1: 订阅关系绑定的时候触发
@Override
protected void hookOnSubscribe(Subscription subscription) {
// 流被订阅的时候触发
System.out.println("绑定了..."+subscription);
//找发布者要数据
request(1); //要1个数据
// requestUnbounded(); //要无限数据
}
@Override
protected void hookOnNext(String value) {
System.out.println("数据到达,正在处理:"+value);
if(value.equals("哈哈:5")){
cancel(); //取消流
}
request(1); //要1个数据
}
// hookOnComplete、hookOnError 二选一执行
@Override
protected void hookOnComplete() {
System.out.println("流正常结束...");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("流异常..."+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println("流被取消...");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println("最终回调...一定会被执行");
}
});
四、自定义消费者
自定义消费者推荐直接编写 BaseSubscriber 的逻辑;
五、背压(Backpressure )
1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10) //原始流10个
.buffer(3)
.log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
// //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成
2、limit:限流
Flux.range(1, 1000)
.log()
//限流触发,看上游是怎么限流获取数据的
.limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75)
.subscribe();
六、以编程方式创建序列-Sink
Sink.next
Sink.complete
1、同步环境-generate
import reactor.core.publisher.Flux;
public class FluxGenerateExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.generate(() -> 0,
(state, sink) -> {
// 0-10
if (state <= 10) {
sink.next(state); // 把元素传出去
} else {
sink.complete(); // 完成,给通道传递一个完成信号
}
return state + 1; // 更新状态
});
// 订阅 Flux 并处理元素和完成信号
flux.subscribe(
value -> System.out.println("Received value: " + value),
error -> System.err.println("Error occurred: " + error),
() -> System.out.println("Sequence completed")
);
}
}
2、多线程-create
import reactor.core.publisher.Flux;
public class FluxCreateExample {
public static void main(String[] args) {
Flux<Integer> flux = Flux.create(sink -> {
for (int i = 0; i <= 10; i++) {
sink.next(i); // 把元素传出去
}
sink.complete(); // 完成,给通道传递一个完成信号
});
// 订阅 Flux 并处理元素和完成信号
flux.subscribe(
value -> System.out.println("Received value: " + value),
error -> System.err.println("Error occurred: " + error),
() -> System.out.println("Sequence completed")
);
}
}
七、handle()
自定义流中元素处理规则
//
Flux.range(1,10)
.handle((value,sink)->{
System.out.println("拿到的值:"+value);
sink.next("张三:"+value); //可以向下发送数据的通道
})
.log() //日志
.subscribe();
八、自定义线程调度
响应式:响应式编程: 全异步、消息、事件回调
默认还是用当前线程,生成整个流、发布流、流操作
public void thread1(){
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.log()
.publishOn(s)
.map(i -> "value " + i)
;
//只要不指定线程池,默认发布者用的线程就是订阅者的线程;
new Thread(() -> flux.subscribe(System.out::println)).start();
}
九、错误处理
命令式编程:常见的错误处理方式
1. Catch and return a static default value. 捕获异常返回一个静态默认值
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return "RECOVERED";
}
onErrorReturn: 实现上面效果,错误的时候返回一个值
- 1、吃掉异常,消费者无异常感知
- 2、返回一个兜底默认值
- 3、流正常完成;
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorReturn(NullPointerException.class,"哈哈-6666")
.subscribe(v-> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
()-> System.out.println("流结束")); // error handling example
2. Catch and execute an alternative path with a fallback method.
吃掉异常,执行一个兜底方法;
try {
return doSomethingDangerous(10);
}
catch (Throwable error) {
return doOtherthing(10);
}
onErrorResume
- 1、吃掉异常,消费者无异常感知
- 2、调用一个兜底方法
- 3、流正常完成
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777"))
.subscribe(v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束"));
3. Catch and dynamically compute a fallback value. 捕获并动态计算一个返回值
根据错误返回一个新值
try {
Value v = erroringMethod();
return MyWrapper.fromValue(v);
}
catch (Throwable error) {
return MyWrapper.fromError(error);
}
- 1、吃掉异常,消费者有感知
- 2、调用一个自定义方法
- 3、流异常完成
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
4. Catch, wrap to a BusinessException, and re-throw.
捕获并包装成一个业务异常,并重新抛出
try {
return callExternalService(k);
}
catch (Throwable error) {
throw new BusinessException("oops, SLA exceeded", error);
}
包装重新抛出异常: 推荐用 .onErrorMap
- 1、吃掉异常,消费者有感知
- 2、抛新异常
- 3、流异常完成
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.onErrorMap(err-> new BusinessException(err.getMessage()+": 又炸了..."))
.subscribe(v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束"));
5. Catch, log an error-specific message, and re-throw.
捕获异常,记录特殊的错误日志,重新抛出
try {
return callExternalService(k);
}
catch (RuntimeException error) {
//make a record of the error
log("uh oh, falling back, service failed for key " + k);
throw error;
}
- 异常被捕获、做自己的事情
- 不影响异常继续顺着流水线传播
- 不吃掉异常,只在异常发生的时候做一件事,消费者有感知
Flux.just(1, 2, 0, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(err -> {
System.out.println("err已被记录 = " + err);
})
.subscribe(v -> System.out.println("v = " + v),
err -> System.out.println("err = " + err),
() -> System.out.println("流结束"));
6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 3, 4)
.map(i -> "100 / " + i + " = " + (100 / i))
.doOnError(err -> {
System.out.println("err已被记录 = " + err);
})
.doFinally(signalType -> {
System.out.println("流信号:"+signalType);
})
7. 忽略当前异常,仅通知记录,继续推进
Flux.just(1,2,3,0,5)
.map(i->10/i)
.onErrorContinue((err,val)->{
System.out.println("err = " + err);
System.out.println("val = " + val);
System.out.println("发现"+val+"有问题了,继续执行其他的,我会记录这个问题");
}) //发生
.subscribe(v-> System.out.println("v = " + v),
err-> System.out.println("err = " + err));
十、常用操作
filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith...
十一、阻塞式API
在响应式编程中,在任何时候执行业务代码时都不要使用block()
方法,为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数据流进行转换和处理,并使用其他响应式方法(如subscribe())来订阅数据流并触发异步处理。
什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!