当前位置: 首页 > article >正文

(八)Reactor响应式框架之核心特性

一、Mono和Flux

Mono: 0|1 数据流

Flux: N 数据流

响应式流:元素(内容) + 信号(完成/异常);

二、subscribe()

1.自定义流的信号感知回调

flux.subscribe(
        v-> System.out.println("v = " + v), //流元素消费
        throwable -> System.out.println("throwable = " + throwable), //感知异常结束
        ()-> System.out.println("流结束了...") //感知正常结束
);

2.自定义消费者

flux.subscribe(new BaseSubscriber<String>() {

            // 生命周期钩子1: 订阅关系绑定的时候触发
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                // 流被订阅的时候触发
                System.out.println("绑定了..."+subscription);

                //找发布者要数据
                request(1); //要1个数据
//                requestUnbounded(); //要无限数据
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("数据到达,正在处理:"+value);
                request(1); //要1个数据
            }


            //  hookOnComplete、hookOnError 二选一执行
            @Override
            protected void hookOnComplete() {
                System.out.println("流正常结束...");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("流异常..."+throwable);
            }

            @Override
            protected void hookOnCancel() {
                System.out.println("流被取消...");
            }

            @Override
            protected void hookFinally(SignalType type) {
                System.out.println("最终回调...一定会被执行");
            }
        });

三、流的取消

消费者调用 cancle() 取消流的订阅;

Disposable

        Flux<String> flux = Flux.range(1, 10)
                .map(i -> {
                    System.out.println("map..."+i);
                    if(i==9) {
                        i = 10/(9-i); //数学运算异常;  doOnXxx
                    }
                    return "哈哈:" + i;
                }); //流错误的时候,把错误吃掉,转为正常信号


//        flux.subscribe(); //流被订阅; 默认订阅;
//        flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素


//        flux.subscribe(
//                v-> System.out.println("v = " + v), //流元素消费
//                throwable -> System.out.println("throwable = " + throwable), //感知异常结束
//                ()-> System.out.println("流结束了...") //感知正常结束
//        );


        // 流的生命周期钩子可以传播给订阅者。
        //  a() {
        //      data = b();
        //  }
        flux.subscribe(new BaseSubscriber<String>() {

            // 生命周期钩子1: 订阅关系绑定的时候触发
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                // 流被订阅的时候触发
                System.out.println("绑定了..."+subscription);

                //找发布者要数据
                request(1); //要1个数据
//                requestUnbounded(); //要无限数据
            }

            @Override
            protected void hookOnNext(String value) {
                System.out.println("数据到达,正在处理:"+value);
                if(value.equals("哈哈:5")){
                    cancel(); //取消流
                }
                request(1); //要1个数据
            }


            //  hookOnComplete、hookOnError 二选一执行
            @Override
            protected void hookOnComplete() {
                System.out.println("流正常结束...");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("流异常..."+throwable);
            }

            @Override
            protected void hookOnCancel() {
                System.out.println("流被取消...");
            }

            @Override
            protected void hookFinally(SignalType type) {
                System.out.println("最终回调...一定会被执行");
            }
        });

四、自定义消费者

自定义消费者推荐直接编写 BaseSubscriber 的逻辑;

五、背压(Backpressure )

1、buffer:缓冲

Flux<List<Integer>> flux = Flux.range(1, 10)  //原始流10个
        .buffer(3)
        .log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
//        //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成

2、limit:限流

Flux.range(1, 1000)
    .log()
    //限流触发,看上游是怎么限流获取数据的
    .limitRate(100) //一次预取30个元素; 第一次 request(100),以后request(75)
    .subscribe();

六、以编程方式创建序列-Sink

Sink.next

Sink.complete

1、同步环境-generate

import reactor.core.publisher.Flux;

public class FluxGenerateExample {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.generate(() -> 0,
                (state, sink) -> {
                    // 0-10
                    if (state <= 10) {
                        sink.next(state); // 把元素传出去
                    } else {
                        sink.complete(); // 完成,给通道传递一个完成信号
                    }
                    return state + 1; // 更新状态
                });

        // 订阅 Flux 并处理元素和完成信号
        flux.subscribe(
                value -> System.out.println("Received value: " + value),
                error -> System.err.println("Error occurred: " + error),
                () -> System.out.println("Sequence completed")
        );
    }
}

2、多线程-create

import reactor.core.publisher.Flux;

public class FluxCreateExample {
    public static void main(String[] args) {
        Flux<Integer> flux = Flux.create(sink -> {
            for (int i = 0; i <= 10; i++) {
                sink.next(i); // 把元素传出去
            }
            sink.complete(); // 完成,给通道传递一个完成信号
        });

        // 订阅 Flux 并处理元素和完成信号
        flux.subscribe(
                value -> System.out.println("Received value: " + value),
                error -> System.err.println("Error occurred: " + error),
                () -> System.out.println("Sequence completed")
        );
    }
}

七、handle()

自定义流中元素处理规则

   //
        Flux.range(1,10)
                .handle((value,sink)->{
                    System.out.println("拿到的值:"+value);
                    sink.next("张三:"+value); //可以向下发送数据的通道
                })
                .log() //日志
                .subscribe();

八、自定义线程调度

响应式:响应式编程: 全异步、消息、事件回调

默认还是用当前线程,生成整个流、发布流、流操作

public void thread1(){
    Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

    final Flux<String> flux = Flux
            .range(1, 2)
            .map(i -> 10 + i)
            .log()
            .publishOn(s)
            .map(i -> "value " + i)
            ;

    //只要不指定线程池,默认发布者用的线程就是订阅者的线程;
    new Thread(() -> flux.subscribe(System.out::println)).start();
}

九、错误处理

命令式编程:常见的错误处理方式

1. Catch and return a static default value. 捕获异常返回一个静态默认值

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

onErrorReturn: 实现上面效果,错误的时候返回一个值

  • 1、吃掉异常,消费者无异常感知
  • 2、返回一个兜底默认值
  • 3、流正常完成;
Flux.just(1, 2, 0, 4)
      .map(i -> "100 / " + i + " = " + (100 / i))
      .onErrorReturn(NullPointerException.class,"哈哈-6666")
      .subscribe(v-> System.out.println("v = " + v),
                err -> System.out.println("err = " + err),
                ()-> System.out.println("流结束")); // error handling example

2. Catch and execute an alternative path with a fallback method.

吃掉异常,执行一个兜底方法;

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return doOtherthing(10);
}

onErrorResume

  • 1、吃掉异常,消费者无异常感知
  • 2、调用一个兜底方法
  • 3、流正常完成
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i)).onErrorResume(err -> Mono.just("哈哈-777"))
    .subscribe(v -> System.out.println("v = " + v),
               err -> System.out.println("err = " + err),
               () -> System.out.println("流结束"));

3. Catch and dynamically compute a fallback value. 捕获并动态计算一个返回值

根据错误返回一个新值

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}
  • 1、吃掉异常,消费者有感知
  • 2、调用一个自定义方法
  • 3、流异常完成
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))

4. Catch, wrap to a BusinessException, and re-throw.

捕获并包装成一个业务异常,并重新抛出

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

包装重新抛出异常: 推荐用 .onErrorMap

  • 1、吃掉异常,消费者有感知
  • 2、抛新异常
  • 3、流异常完成
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
        Flux.just(1, 2, 0, 4)
                .map(i -> "100 / " + i + " = " + (100 / i))
                .onErrorMap(err-> new BusinessException(err.getMessage()+": 又炸了..."))
                .subscribe(v -> System.out.println("v = " + v),
                        err -> System.out.println("err = " + err),
                        () -> System.out.println("流结束"));

5. Catch, log an error-specific message, and re-throw.

捕获异常,记录特殊的错误日志,重新抛出

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}
  • 异常被捕获、做自己的事情
  • 不影响异常继续顺着流水线传播
  • 不吃掉异常,只在异常发生的时候做一件事,消费者有感知
 Flux.just(1, 2, 0, 4)
          .map(i -> "100 / " + i + " = " + (100 / i))
          .doOnError(err -> {
               System.out.println("err已被记录 = " + err);
            })
          .subscribe(v -> System.out.println("v = " + v),
               err -> System.out.println("err = " + err),
                () -> System.out.println("流结束"));

6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.

Flux.just(1, 2, 3, 4)
      .map(i -> "100 / " + i + " = " + (100 / i))
      .doOnError(err -> {
            System.out.println("err已被记录 = " + err);
       })
      .doFinally(signalType -> {
            System.out.println("流信号:"+signalType);
       })

7. 忽略当前异常,仅通知记录,继续推进

Flux.just(1,2,3,0,5)
        .map(i->10/i)
        .onErrorContinue((err,val)->{
            System.out.println("err = " + err);
            System.out.println("val = " + val);
            System.out.println("发现"+val+"有问题了,继续执行其他的,我会记录这个问题");
        }) //发生
        .subscribe(v-> System.out.println("v = " + v),
                err-> System.out.println("err = " + err));

十、常用操作

filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith...

十一、阻塞式API

        在响应式编程中,在任何时候执行业务代码时都不要使用block()方法,为了避免使用block(),我们应该尽可能地使用响应式操作符(如map、flatMap、filter等)对数据流进行转换和处理,并使用其他响应式方法(如subscribe())来订阅数据流并触发异步处理。

什么问题都可以评论区留言,看见都会回复的

如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区

多多支持吧!!!

点赞加藏评论,是对小编莫大的肯定。抱拳了!


http://www.kler.cn/a/597600.html

相关文章:

  • 5、MySQL为什么使用 B+树 来作索引【高频】
  • 矩阵可相似对角化
  • G-Star 校园开发者计划·黑科大|开源第一课之 Git 入门
  • 强化学习中循环神经网络在序列决策中的应用研究
  • 2025新版懒人精灵零基础安装调试+lua基础+UI设计交互+常用方法封装+项目实战+项目打包安装板块-视频教程(初学者必修课)
  • 基于javaweb的SpringBoot医院管理系统设计与实现(源码+文档+部署讲解)
  • maven在windows系统上的详细安装和配置
  • 无阻塞UI:通过Web Worker提升用户体验的新途径
  • 基于LabVIEW的Windows平台高速闭环控制
  • windows+ragflow+deepseek实战之一excel表查询
  • 第19章:StatefulSet:有状态应用部署最佳实践
  • dify案例分享-儿童故事绘本语音播报视频工作流
  • Lustre 语言的 Rust 生成相关的工作
  • 高考志愿填报管理系统基于Spring Boot SSM
  • 流程图软件推荐,好用的流程图工具分享
  • 西门子200smart之modbus_TCP(做主站与第三方设备)通讯
  • 常考计算机操作系统面试习题(四)
  • 蓝桥杯备考-----》前缀和+哈希表之连续自然数和
  • kotlin 内联函数 inline
  • Java集合框架深度剖析:从数据结构到实战应用