当前位置: 首页 > article >正文

webflux响应式编程

webflux&webclient

尚硅谷SpringBoot响应式编程教程,最新springboot3入门到实战

响应式编程设计实战及SpringWebFlux源码剖析 - 拉勾

文章目录

  • 前置知识
    • 1、Lambda
    • 2、Function
    • 3、StreamAPI
      • 中间操作:Intermediate Operations
      • 终止操作:Terminal Operation
    • 4、Reactive-Stream
      • 为什么有Reactive-Stream规范
      • 消息传递是响应式核心
      • Reactive-Stream规范核心接口
        • API Components
        • 发布订阅写法
        • 响应式编程理解
  • Reactor
    • 1、快速上手
      • 介绍
      • 依赖
    • 2、响应式编程
      • 2.1. 阻塞是对资源的浪费
      • 2.2. 异步可以解决问题吗?
      • 2.3. 从命令式编程到响应式编程
        • 2.3.1. 可编排性与可读性
        • 2.3.2. 就像装配流水线
        • 2.3.3. 操作符(Operators)
        • 2.3.4. subscribe() 之前什么都不会发生
        • 2.3.5. 背压
        • 2.3.6. 热(Hot) vs 冷(Cold)
    • 3、核心特性
      • 1、Mono和Flux
        • ==响应式流:元素(内容) + 信号(完成/异常)==
        • 基本操作
      • 2、subscribe()
        • 自定义流的信号感知回调
        • 自定义消费者
      • 3、流的取消
        • **Disposable**
      • 4、BaseSubscriber与生命周期钩子
      • 5、背压和请求重塑
        • 1、buffer:缓冲
        • 2、limit:限流
      • 6、以编程方式创建序列-Sink
        • 1、同步环境-generate
        • 2、多线程-create
      • 7、 handle()
      • 8、自定义线程调度
      • 9、错误处理
        • 1. Catch and return a static default value.
        • 2. Catch and execute an alternative path with a fallback method.
        • 3. Catch and dynamically compute a fallback value.
        • 4. Catch, wrap to a BusinessException, and re-throw.
        • 5. Catch, log an error-specific message, and re-throw.
        • 6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
        • 7. 忽略当前异常,仅通知记录,继续推进
        • 8.其它
      • 10、常用操作
        • filter
        • filterMap
        • concatMap
        • concat
        • concatWith
        • transform
        • defaultIfEmpty
        • merge
        • zip
      • 11、超时与重试
      • 12、Sinks工具类
        • 单播/多播/重放/背压
        • 缓存
      • 13、阻塞式api
        • block
      • 14、Context api
  • WebFlux
    • 0、组件对比
    • 1、WebFlux
      • 1、引入
      • 2、Reactor Core
        • 1、HttpHandler、HttpServer
      • 3、DispatcherHandler
        • 1、请求处理流程
      • 4、注解开发
        • 1、目标方法传参
        • 2、返回值写法
      • 5、文件上传
      • 6、错误处理
      • 7、RequestContext
      • 8、自定义Flux配置
        • WebFluxConfigurer
      • 9、Filter

image-20250222161307934

image-20250222161501414

前置知识

1、Lambda

Java8语法糖

package com.atguiggu.lambda;

import java.util.*;
import java.util.function.*;
import java.util.stream.Collectors;

/**
 * @author lfy
 * @Description
 * @create 2023-11-16 20:07
 */

//函数式接口;只要是函数式接口就可以用Lambda表达式简化
//函数式接口: 接口中有且只有一个未实现的方法,这个接口就叫函数式接口


interface MyInterface {
    int sum(int i, int j);
}

interface MyHaha {
    int haha();

    default int heihei() {
        return 2;
    }

    ; //默认实现
}

interface My666 {
    void aaa(int i,int j,int k);
}


@FunctionalInterface //检查注解,帮我们快速检查我们写的接口是否函数式接口
interface MyHehe {
    int hehe(int i);


}

//1、自己写实现类
class MyInterfaceImpl implements MyInterface {
    @Override
    public int sum(int i, int j) {
        return i + j;
    }
}


public class Lambda {

    public static void main(String[] args) {
        //声明一个函数
        BiConsumer<String,String> consumer = (a,b)->{
            System.out.println("哈哈:"+a+";呵呵:"+b);
        };
        consumer.accept("1","2");



        //声明一个函数
        Function<String,Integer> function = (String x) -> Integer.parseInt(x);
        System.out.println(function.apply("2"));


        Supplier<String> supplier = ()-> UUID.randomUUID().toString();
        String s = supplier.get();
        System.out.println(s);


        BiFunction<String,Integer,Long> biFunction = (a,b)-> 888L;

        Predicate<Integer> even = (t)-> t%2 ==0;

//        even.test()//正向判断
//        even.negate().test(2) //反向判断
        System.out.println(even.negate().test(2));


    }


    public static void bbbbb(String[] args) {
        var names = new ArrayList<String>();

        names.add("Alice");
        names.add("Bob");
        names.add("Charlie");
        names.add("David");


        //比较器
//        Collections.sort(names, new Comparator<String>() {
//            @Override
//            public int compare(String o1, String o2) {
//                return o2.compareTo(o1);
//            }
//        });


        //直接写函数式接口就方便   (o1,o2)->o1.compareTo(o2)
//        Collections.sort(names,(o1,o2)->o1.compareTo(o2));
        System.out.println(names);


        // 类::方法; 引用类中的实例方法; 忽略lambda的完整写法
        Collections.sort(names,String::compareTo);
        System.out.println(names);




        new  Thread(
                new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("哈哈啊");
                    }
                }
        ).start();

        Runnable runnable = () -> System.out.println("aaa");

        new Thread(runnable).start();



        //最佳实战:
        //1、以后调用某个方法传入参数,这个参数实例是一个接口对象,且只定义了一个方法,就直接用lambda简化写法


    }


    /**
     * lambda简化函数式接口实例创建
     *
     * @param args
     */
    public static void aaaa(String[] args) {

        //1、自己创建实现类对象
        MyInterface myInterface = new MyInterfaceImpl();
        System.out.println(myInterface.sum(1, 2));

        //2、创建匿名实现类
        MyInterface myInterface1 = new MyInterface() {
            @Override
            public int sum(int i, int j) {
                return i * i + j * j;
            }
        };
//        System.out.println(myInterface1.sum(2, 3));
        //冗余写法

        //3、lambda表达式:语法糖  参数列表  + 箭头 + 方法体
        MyInterface myInterface2 = (x, y) -> {
            return x * x + y * y;
        };
        System.out.println(myInterface2.sum(2, 3));


        //参数位置最少情况
        MyHaha myHaha = () -> {
            return 1;
        };

        MyHehe myHehe = y -> {
            return y * y;
        };


        MyHehe hehe2 = y -> y - 1;

        //完整写法如上:
        //简化写法:
        //1)、参数类型可以不写,只写(参数名),参数变量名随意定义;
        //    参数表最少可以只有一个 (),或者只有一个参数名;
        //2、方法体如果只有一句话,{} 可以省略


        MyHehe hehe3 = y -> y + 1;
        System.out.println(hehe3.hehe(7));
        //以上Lambda表达式简化了实例的创建。


        //总结:
        // 1、Lambda表达式: (参数表) -> {方法体}
        // 2、分辨出你的接口是否函数式接口。 函数式接口就可以lambda简化


    }


}

2、Function

在Java中,函数式接口是只包含一个抽象方法的接口。它们是支持Lambda表达式的基础,因为Lambda表达式需要一个目标类型,这个目标类型必须是一个函数式接口。

函数式接口的出入参定义:

1、有入参,无出参【消费者】: function.accept

BiConsumer<String,String> function = (a,b)->{ //能接受两个入参
    System.out.println("哈哈:"+a+";呵呵:"+b);
};
function.accept("1","2");

2、有入参,有出参【多功能函数】: function.apply

Function<String,Integer> function = (String x) -> Integer.parseInt(x);
System.out.println(function.apply("2"));

3、无入参,无出参【普通函数】:

Runnable runnable = () -> System.out.println("aaa");

new Thread(runnable).start();

4、无入参 ,有出参【提供者】: supplier.get()

Supplier<String> supplier = ()-> UUID.randomUUID().toString();
String s = supplier.get();
System.out.println(s);

java.util.function包下的所有function定义:

  • Consumer: 消费者
  • Supplier: 提供者
  • Predicate: 断言

get/test/apply/accept调用的函数方法;

位于java.util.function包下

image-20250222163309270

3、StreamAPI

最佳实战:以后凡是你写for循环处理数据的统一全部用StreamAPI进行替换;

Stream所有数据和操作被组合成流管道流管道组成:

  • 一个数据源(可以是一个数组、集合、生成器函数、I/O管道)
  • 零或多个中间操作(将一个流变形成另一个流)
  • 一个终止操作(产生最终结果)

中间操作:Intermediate Operations

  • filter:过滤; 挑出我们用的元素

  • map: 映射: 一一映射,a 变成 b

    • mapToInt、mapToLong、mapToDouble
  • flatMap:打散、散列、展开、扩维:一对多映射

filter、
map、mapToInt、mapToLong、mapToDouble
flatMap、flatMapToInt、flatMapToLong、flatMapToDouble
mapMulti、mapMultiToInt、mapMultiToLong、mapMultiToDouble、
parallel、unordered、onClose、sequential
distinct、sorted、peek、limit、skip、takeWhile、dropWhile、

终止操作:Terminal Operation

forEach、forEachOrdered、toArray、reduce、collect、toList、min、
max、count、anyMatch、allMatch、noneMatch、findFirst、findAny、iterator

image-20250222165413603

Stream.of(1, 2, 3).filter(e -> {
    System.out.println("e1 = " + e);
    return true;
}).filter(e -> {
    System.out.println("e2 = " + e);
    return true;
}).collect(Collectors.toList());

输出是:
e1 = 1
e2 = 1
e1 = 2
e2 = 2
e1 = 3
e2 = 3

4、Reactive-Stream

Reactive Streams是JVM面向流的库的标准和规范(jdk9开始,有java.util.concurrent.Flow类)

1、处理可能无限数量的元素

2、有序

3、在组件之间异步传递元素

4、强制性非阻塞背压模式

推荐阅读:

  • jdk9 reactive响应式规范:https://www.reactive-streams.org/

  • 响应式宣言:https://www.reactivemanifesto.org/zh-CN

  • ReactiveStream: https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.4/README.md

2、响应式编程-Reactor核心.pptx

img

为什么有Reactive-Stream规范

目的:通过全异步的方式、加缓存区构建一个实时的数据流系统,

Kafka、MQ能构建出大型分布式的响应式系统。

缺本地化的消息系统解决方案:

  • 让所有的异步线程能互相监听消息,处理消息,构建实时消息处理流

消息传递是响应式核心

之前a调用b,必须b做完了事情,a才能接着做事情。现在响应式就是a先将b要做的事情放到缓冲区中,b监听这个缓冲区,从缓冲区中拿数据,去做事情,这样a就不用等待了。

引入一个缓存区,引入消息队列,就能实现全系统、全异步、不阻塞、不等待、实时响应

image-20250222203555264

Reactive-Stream规范核心接口

API Components

查看jdk9的java.util.concurrent.Flow类

image-20250222205430843

image-20250222205700217

image-20250222205718529

发布订阅写法
package com.atguigu.flow;

import lombok.SneakyThrows;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

/**
 * @author lfy
 * @Description
 * @create 2023-11-17 20:59
 */
public class FlowDemo {

    //定义流中间操作处理器; 只用写订阅者的接口
    static class MyProcessor extends SubmissionPublisher<String>  implements Flow.Processor<String,String> {

        private Flow.Subscription subscription; //保存绑定关系
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            System.out.println("processor订阅绑定完成");
            this.subscription = subscription;
            subscription.request(1); //找上游要一个数据
        }

        @Override //数据到达,触发这个回调
        public void onNext(String item) {
            System.out.println("processor拿到数据:"+item);
            //再加工
            item += ":哈哈";
            submit(item);//把我加工后的数据发出去
            subscription.request(1); //再要新数据
        }

        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {

        }
    }

    /**
     * 1、Publisher:发布者
     * 2、Subscriber:订阅者
     * 3、Subscription: 订阅关系
     * 4、Processor: 处理器
     * @param args
     */

    //发布订阅模型:观察者模式,
    public static void main(String[] args) throws InterruptedException {

        //1、定义一个发布者; 发布数据;
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        //2、定一个中间操作:  给每个元素加个 哈哈 前缀
        MyProcessor myProcessor1 = new MyProcessor();
        MyProcessor myProcessor2 = new MyProcessor();
        MyProcessor myProcessor3 = new MyProcessor();


        //3、定义一个订阅者; 订阅者感兴趣发布者的数据;
        Flow.Subscriber<String> subscriber = new Flow.Subscriber<String>() {

            private Flow.Subscription subscription;

            @Override //在订阅时  onXxxx:在xxx事件发生时,执行这个回调
            public void onSubscribe(Flow.Subscription subscription) {
                System.out.println(Thread.currentThread()+"订阅开始了:"+subscription);
                this.subscription = subscription;
                //从上游请求一个数据
                subscription.request(1);
            }

            @Override //在下一个元素到达时; 执行这个回调;   接受到新数据
            public void onNext(String item) {
                System.out.println(Thread.currentThread()+"订阅者,接受到数据:"+item);

                if(item.equals("p-7")){
                    subscription.cancel(); //取消订阅
                }else {
                    subscription.request(1);
                }
            }

            @Override //在错误发生时,
            public void onError(Throwable throwable) {
                System.out.println(Thread.currentThread()+"订阅者,接受到错误信号:"+throwable);
            }

            @Override //在完成时
            public void onComplete() {
                System.out.println(Thread.currentThread()+"订阅者,接受到完成信号:");
            }
        };

        //4、绑定发布者和订阅者
        publisher.subscribe(myProcessor1); //此时处理器相当于订阅者
        myProcessor1.subscribe(myProcessor2); //此时处理器相当于发布者
        myProcessor2.subscribe(myProcessor3);
        myProcessor3.subscribe(subscriber);  //链表关系绑定出责任链。
        //绑定操作;就是发布者,记住了所有订阅者都有谁,有数据后,给所有订阅者把数据推送过去。


//        publisher.subscribe(subscriber);

        for (int i = 0; i < 10; i++) {
            //发布10条数据
            if(i == 5){
//                publisher.closeExceptionally(new RuntimeException("5555"));
            }else {
                publisher.submit("p-"+i);
            }
            //publisher发布的所有数据在它的buffer区;
            //中断
//            publisher.closeExceptionally();
        }

        //ReactiveStream
        //jvm底层对于整个发布订阅关系做好了 异步+缓存区处理 = 响应式系统;
        // 我们只需要编排流处理环节,其它的交给jvm完成(比如:异步就又jvm自己干,我们不用关心)

        //发布者通道关闭
        publisher.close();

//        publisher.subscribe(subscriber2);

        //发布者有数据,订阅者就会拿到
        
        Thread.sleep(20000);

    }
}

响应式编程理解

image-20250222215133725

使用少量资源处理大量并发的一种解决方案。

Reactor

projectreactor官网

image-20250222220814257

image-20250222224054001

image-20250222223757272

1、快速上手

介绍

Reactor 是一个用于JVM的完全非阻塞的响应式编程框架,具备高效的需求管理(即对 “背压(backpressure)”的控制)能力。它与 Java 8 函数式 API 直接集成,比如 CompletableFuture, Stream, 以及 Duration。它提供了异步序列 API Flux(用于[N]个元素)Mono(用于 [0|1]个元素),并完全遵循和实现了“响应式扩展规范”(Reactive Extensions Specification)。

Reactor 的 reactor-ipc 组件还支持非阻塞的进程间通信(inter-process communication, IPC)。 Reactor IPC 为 HTTP(包括 Websockets)、TCP 和 UDP 提供了支持背压的网络引擎,从而适合 应用于微服务架构。并且完整支持响应式编解码(reactive encoding and decoding)。

依赖

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2023.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

2、响应式编程

响应式编程是一种关注于数据流(data streams)变化传递(propagation of change)异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

了解历史:

  • 在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。
  • 响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。
  • 使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。
  • 除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:
onNext x 0..N [onError | onComplete]

2.1. 阻塞是对资源的浪费

现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素

广义来说我们有两种思路来提升程序性能:

  1. 并行化(parallelize) :使用更多的线程和硬件资源。[异步]
  2. 基于现有的资源来 提高执行效率

通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。

更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。

所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。

2.2. 异步可以解决问题吗?

第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。

但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:

  • 回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。
  • Futures :异步方法 立即 返回一个 Future<T>,该异步方法要返回结果的是 T 类型,通过 Future封装。这个结果并不是 立刻 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。

这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。

回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。

考虑这样一种情景:

  • 在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。
  • 这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):

回调地狱(Callback Hell)的例子:

userService.getFavorites(userId, new Callback<List<String>>() { 
  public void onSuccess(List<String> list) { 
    if (list.isEmpty()) { 
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { 
          UiUtils.submitOnUiThread(() -> { 
            list.stream()
                .limit(5)
                .forEach(uiList::show); 
            });
        }

        public void onError(Throwable error) { 
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() 
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, 
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

Reactor改造后为:

userService.getFavorites(userId) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions()) 
           .take(5) 
           .publishOn(UiUtils.uiThreadScheduler()) 
           .subscribe(uiList::show, UiUtils::errorPopup); 

如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) 
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) 
           .flatMap(favoriteService::getDetails) 
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

额外扩展:

Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。

考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。

CompletableFuture 处理组合的例子

CompletableFuture<List<String>> ids = ifhIds(); 

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { 
        Stream<CompletableFuture<String>> zip =
                        l.stream().map(i -> { 
                                                 CompletableFuture<String> nameTask = ifhName(i); 
                                                 CompletableFuture<Integer> statTask = ifhStat(i); 

                                                 return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
                                         });
        List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
        CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

        CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
        return allDone.thenApply(v -> combinationList.stream()
                                                                                                 .map(CompletableFuture::join) 
                                                                                                 .collect(Collectors.toList()));
});

List<String> results = result.join(); 
assertThat(results).contains(
                                "Name NameJoe has stats 103",
                                "Name NameBart has stats 104",
                                "Name NameHenry has stats 105",
                                "Name NameNicole has stats 106",
                                "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

2.3. 从命令式编程到响应式编程

类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:

  • 可编排性(Composability) 以及 可读性(Readability)
  • 使用丰富的 操作符 来处理形如 的数据
  • 订阅(subscribe) 之前什么都不会发生
  • 背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力
  • 高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果
2.3.1. 可编排性与可读性

可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

2.3.2. 就像装配流水线

你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

2.3.3. 操作符(Operators)

在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。**每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。**就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。

理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。

虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。

2.3.4. subscribe() 之前什么都不会发生

在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。

当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

2.3.5. 背压

向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。

中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。

这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

2.3.6. 热(Hot) vs 冷(Cold)

在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:

  • 一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。
  • 一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。

3、核心特性

1、Mono和Flux

Mono: 0|1 数据流

Flux: N数据流

响应式流:元素(内容) + 信号(完成/异常)

image-20250222225143286

基本操作

类比Stream流操作,中间操作对应流转为另1个新流,终止操作对应订阅。只有开始订阅了,流才会开始,流中的 每个元素 和 信号 都是按顺序进入流处理,然后交给订阅者处理。

package com.atguigu.reactor;

import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
 * @author lfy
 * @Description
 * @create 2023-11-23 20:58
 */
public class FluxDemo {

    public static void main(String[] args) {
        //        Flux.concat(Flux.just(1,2,3),Flux.just(7,8,9))
        //                .subscribe(System.out::println);


        Flux.range(1, 7)
            //                .log() //日志   onNext(1~7)
            .filter(i -> i > 3) //挑出>3的元素
            //                .log() //onNext(4~7)
            .map(i -> "haha-" + i)
            .log()  // onNext(haha-4 ~ 7)
            .subscribe(System.out::println);


        //今天: Flux、Mono、弹珠图、事件感知API、每个操作都是操作的上个流的东西
    }


    /**
     * 响应式编程核心:看懂文档弹珠图;
     * 信号: 正常/异常(取消)
     * SignalType:
     *      SUBSCRIBE: 被订阅
     *      REQUEST:  请求了N个元素
     *      CANCEL: 流被取消
     *      ON_SUBSCRIBE:在订阅时候
     *      ON_NEXT: 在元素到达
     *      ON_ERROR: 在流错误
     *      ON_COMPLETE:在流正常完成时
     *      AFTER_TERMINATE:中断以后
     *      CURRENT_CONTEXT:当前上下文
     *      ON_CONTEXT:感知上下文
     * <p>
     * doOnXxx API触发时机
     *      1、doOnNext:每个数据(流的数据)到达的时候触发
     *      2、doOnEach:每个元素(流的数据和信号)到达的时候触发
     *      3、doOnRequest: 消费者请求流元素的时候
     *      4、doOnError:流发生错误
     *      5、doOnSubscribe: 流被订阅的时候
     *      6、doOnTerminate: 发送取消/异常信号中断了流
     *      7、doOnCancle: 流被取消
     *      8、doOnDiscard:流中元素被忽略的时候
     *
     * @param args
     */
    public void doOnXxxx(String[] args) {

        // 关键:doOnNext:表示流中某个元素到达以后触发我一个回调
        // doOnXxx要感知某个流的事件,写在这个流的后面,新流的前面
        Flux.just(1, 2, 3, 4, 5, 6, 7, 0, 5, 6)
            .doOnNext(integer -> System.out.println("元素到达:" + integer)) //元素到达得到时候触发
            .doOnEach(integerSignal -> { //each封装的详细
                System.out.println("doOnEach.." + integerSignal);
            })//1,2,3,4,5,6,7,0
            .map(integer -> 10 / integer) //10,5,3,
            .doOnError(throwable -> {
                System.out.println("数据库已经保存了异常:" + throwable.getMessage());
            })
            .map(integer -> 100 / integer)
            .doOnNext(integer -> System.out.println("元素到哈:" + integer))

            .subscribe(System.out::println);
    }


    //Mono<Integer>: 只有一个Integer
    //Flux<Integer>: 有很多Integer
    public void fluxDoOn(String[] args) throws IOException, InterruptedException {
        //        Mono<Integer> just = Mono.just(1);
        //
        //        just.subscribe(System.out::println);

        //空流:  链式API中,下面的操作符,操作的是上面的流。
        // 事件感知API:当流发生什么事的时候,触发一个回调,系统调用提前定义好的钩子函数(Hook【钩子函数】);doOnXxx;
        Flux<Integer> flux = Flux.range(1, 7)
            .delayElements(Duration.ofSeconds(1))
            .doOnComplete(() -> {
                System.out.println("流正常结束...");
            })
            .doOnCancel(() -> {
                System.out.println("流已被取消...");
            })
            .doOnError(throwable -> {
                System.out.println("流出错..." + throwable);
            })
            .doOnNext(integer -> {
                System.out.println("doOnNext..." + integer);
            }); //有一个信号:此时代表完成信号

        flux.subscribe(new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                System.out.println("订阅者和发布者绑定好了:" + subscription);
                request(1); //背压
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("元素到达:" + value);
                if (value < 5) {
                    request(1);
                    if (value == 3) {
                        int i = 10 / 0;
                    }
                } else {
                    cancel();//取消订阅
                }
                ; //继续要元素
            }

            @Override
            protected void hookOnComplete() {
                System.out.println("数据流结束");
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                System.out.println("数据流异常");
            }

            @Override
            protected void hookOnCancel() {
                System.out.println("数据流被取消");
            }

            @Override
            protected void hookFinally(SignalType type) {
                System.out.println("结束信号:" + type);
                // 正常、异常
                //                try {
                //                    //业务
                //                }catch (Exception e){
                //
                //                }finally {
                //                    //结束
                //                }
            }
        });

        Thread.sleep(2000);


        //        Flux<Integer> range = Flux.range(1, 7);


        System.in.read();
    }


    //测试Flux
    public void flux() throws IOException {
        //        Mono: 0|1个元素的流
        //        Flux: N个元素的流;  N>1
        //发布者发布数据流:源头


        //1、多元素的流
        Flux<Integer> just = Flux.just(1, 2, 3, 4, 5); //


        //流不消费就没用; 消费:订阅
        just.subscribe(e -> System.out.println("e1 = " + e));
        //一个数据流可以有很多消费者
        just.subscribe(e -> System.out.println("e2 = " + e));

        //对于每个消费者来说流都是一样的;  广播模式;

        System.out.println("==========");
        Flux<Long> flux = Flux.interval(Duration.ofSeconds(1));//每秒产生一个从0开始的递增数字

        flux.subscribe(System.out::println);


        System.in.read();
    }
}

2、subscribe()

flxu.subscribe() // 流被订阅,默认订阅
flux.subscribe(e->System.out.println(e)) // 指定订阅规则:正常消费者:只消费正常元素
自定义流的信号感知回调
flux.subscribe(
        v-> System.out.println("v = " + v), //流元素消费
        throwable -> System.out.println("throwable = " + throwable), //感知异常结束
        ()-> System.out.println("流结束了...") //感知正常结束
);
自定义消费者
flux.subscribe(new BaseSubscriber<String>() {

    // 生命周期钩子1: 订阅关系绑定的时候触发
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // 流被订阅的时候触发
        System.out.println("绑定了..."+subscription);

        //找发布者要数据
        request(1); //要1个数据
        // requestUnbounded(); //要无限数据
    }

    @Override
    protected void hookOnNext(String value) {
        System.out.println("数据到达,正在处理:"+value);
        request(1); //要1个数据
    }


    //  hookOnComplete、hookOnError 二选一执行
    @Override
    protected void hookOnComplete() {
        System.out.println("流正常结束...");
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        System.out.println("流异常..."+throwable);
    }

    @Override
    protected void hookOnCancel() {
        System.out.println("流被取消...");
    }

    @Override
    protected void hookFinally(SignalType type) {
        System.out.println("最终回调...一定会被执行");
    }
});

3、流的取消

消费者调用 cancle() 取消流的订阅;

Disposable
Flux<String> flux = Flux.range(1, 10)
    .map(i -> {
        System.out.println("map..."+i);
        if(i==9) {
            i = 10/(9-i); //数学运算异常;  doOnXxx
        }
        return "哈哈:" + i;
    }); //流错误的时候,把错误吃掉,转为正常信号


//        flux.subscribe(); //流被订阅; 默认订阅;
//        flux.subscribe(v-> System.out.println("v = " + v));//指定订阅规则: 正常消费者:只消费正常元素


//        flux.subscribe(
//                v-> System.out.println("v = " + v), //流元素消费
//                throwable -> System.out.println("throwable = " + throwable), //感知异常结束
//                ()-> System.out.println("流结束了...") //感知正常结束
//        );


// 流的生命周期钩子可以传播给订阅者。
//  a() {
//      data = b();
//  }
flux.subscribe(new BaseSubscriber<String>() {

    // 生命周期钩子1: 订阅关系绑定的时候触发
    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        // 流被订阅的时候触发
        System.out.println("绑定了..."+subscription);

        //找发布者要数据
        request(1); //要1个数据
        // requestUnbounded(); //要无限数据
    }

    @Override
    protected void hookOnNext(String value) {
        System.out.println("数据到达,正在处理:"+value);
        if(value.equals("哈哈:5")){
            cancel(); //取消流
        }
        request(1); //要1个数据
    }


    //  hookOnComplete、hookOnError 二选一执行
    @Override
    protected void hookOnComplete() {
        System.out.println("流正常结束...");
    }

    @Override
    protected void hookOnError(Throwable throwable) {
        System.out.println("流异常..."+throwable);
    }

    @Override
    protected void hookOnCancel() {
        System.out.println("流被取消...");
    }

    @Override
    protected void hookFinally(SignalType type) {
        System.out.println("最终回调...一定会被执行");
    }
});

4、BaseSubscriber与生命周期钩子

自定义消费者,推荐直接编写 BaseSubscriber 的逻辑;

Flux.range(1, 10)
    .map(e -> String.valueOf(e))
    .map(e -> e + "-zzhua")
    .doOnCancel(()->{
        System.out.println("doOnCancel");
    })
    .subscribe(new BaseSubscriber<String>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("hookOnSubscribe");
            subscription.request(1);
        }

        @Override
        protected void hookOnNext(String value) {
            System.out.println("hookOnNext:" + value);
            request(1);
            if (value.equals("5-zzhua")) {
                cancel();
            }
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("hookOnComplete");
            super.hookOnComplete(); // no-op
        }

        @Override
        protected void hookOnError(Throwable throwable) {
            System.out.println("hookOnError" + throwable);
            super.hookOnError(throwable); // throw ...
        }

        @Override
        protected void hookOnCancel() {
            System.out.println("hookOnCancel");
            super.hookOnCancel(); // no-op
        }

        @Override
        protected void hookFinally(SignalType type) {
            System.out.println("hookFinally");
            super.hookFinally(type);// no-op
        }
    })
    ;

5、背压和请求重塑

背压(Backpressure ):由订阅者请求发布者传送指定数量请求,而不是由发布者任意发布

请求重塑(Reshape Requests)

1、buffer:缓冲
Flux<List<Integer>> flux = Flux.range(1, 10)  //原始流10个
        .buffer(3)
        .log();//缓冲区:缓冲3个元素: 消费一次最多可以拿到三个元素; 凑满数批量发给消费者
//
//        //一次发一个,一个一个发;
// 10元素,buffer(3);消费者请求4次,数据消费完成


flux.subscribe(v-> system.out.println("类型:"+v.getclass()+" 值为:"+V));

// 消费者每次 request(1)拿到的是几个真正的数据:buffer数据
Flux<List<Integer>> flux = Flux.range(0, 10)
    .buffer(3)
    .log()
    ;
flux.subscribe(e -> {
    System.out.println("subscribe" + e);
});
2、limit:限流
Flux.range(1, 1000)
    .log()
    //限流触发,看上游是怎么限流获取数据的
    .limitRate(100) 
    .subscribe();
// 75 % 预取策路:limitRate(100)
// 第一次抓取100个数据,如果 75 % 的元素已经处理了,继续抓取 新的 75 % 元素;

6、以编程方式创建序列-Sink

Sink.next

Sink.complete

1、同步环境-generate
// 创建1个1~10的序列
Flux<Object> flux = Flux.generate(
    () -> 0,                 // 初始值
    (state, sink) -> {
        if (state <= 10) {
            sink.next(state);// 发送数据(每次执行,只能调用1次next)
        } else {
            sink.complete(); // 完成信号
        }

        if (state == 7) {
            sink.error(new RuntimeException("i dislike 7"));
        }
        return state + 1;
    }
);

flux
    .log()
    .doOnError(e -> System.out.println("doOnError:" + e))
    .subscribe();
System.out.println("main start...");

Disposable disposable = Flux.range(1, 10)
    .delayElements(Duration.ofSeconds(1))
    .log()
    .map(t -> t * 10)
    .doOnCancel(()-> System.out.println("doOnCancel"))
    .subscribe(t -> System.out.println("subscribe:" + t));

System.out.println("start a thread...");

// 5s之后,取消这个流
new Thread(() -> {
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    disposable.dispose();
}).start();

System.out.println("main end...");
/*
main start...
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
start a thread...
main end...
[ INFO] (parallel-1) onNext(1)
subscribe:10
[ INFO] (parallel-2) onNext(2)
subscribe:20
[ INFO] (parallel-3) onNext(3)
subscribe:30
[ INFO] (parallel-4) onNext(4)
subscribe:40
doOnCancel
[ INFO] (Thread-0) cancel()
*/
2、多线程-create
static class MyListener {

    private FluxSink<Object> fluxSink;


    public MyListener(FluxSink<Object> fluxSink) {
        this.fluxSink = fluxSink;
    }

    public void online(String name) {
        System.out.println("用户登录了: " + name);
        fluxSink.next(name); // 传入用户
    }

}

public static void main(String[] args) {
    Flux<Object> flux = Flux.create(fluxSink -> {
        MyListener myListener = new MyListener(fluxSink);
        for (int i = 1; i <= 100; i++) {
            myListener.online("张" + i);
        }
    });

    flux.subscribe();
}

7、 handle()

自定义流中元素处理规则

Flux.range(1, 10)
    .log()
    // 自定义元素处理规则, 比如替代map(..)等操作,由于handle可以自定义,所以更加强大
    .handle((value, sink)->{
        sink.next("张~" + value);
    })
    .log()
    .subscribe()

8、自定义线程调度

响应式:响应式编程: 全异步、消息、事件回调

默认还是用当前线程,生成整个流、流的发布、流的中间操作

// 流的发布、中间操作,默认使用当前线程
Flux.range(1, 10)
    .publishOn(Schedulers.single()) // 在哪个线程池把这个流的数据和操作执行
    .log()
    .map(t -> t * 10)
    .log()
    .subscribeOn(Schedulers.single()) // 指定订阅者订阅操作线程
    .subscribe((e) -> System.out.println(Thread.currentThread().getName() + ":" + e));

// 调度器: 线程池
// Schedulers.immediate(); // 无执行上下文,当前线程运行所有操作
// Schedulers.single();    // 使用固定的1个单线程
// 线程池中有 10 * cpu核心个线程, 队列默认10w, keepAliveTime 60s
// Schedulers.boundedElastic(); // 有界、弹性调度; 不是无限扩充的的线程池;
Schedulers.parallel();
// 自定义线程池
Schedulers.fromExecutor(new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000)));

LockSupport.park();
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
    .range(1, 2)
    .map(i -> 10 + i)
    .log()
    .publishOn(s)
    .map(i -> "value " + i)
    .log()
    ;

// 只要不指定线程池,默认发布者用的线程就是订阅者的线程;
new Thread(() -> flux.subscribe(System.out::println)).start();

/*
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
[ INFO] (Thread-0) | request(unbounded)
[ INFO] (Thread-0) | request(256)
[ INFO] (Thread-0) | onNext(11)
[ INFO] (Thread-0) | onNext(12)
[ INFO] (Thread-0) | onComplete()
[ INFO] (parallel-scheduler-1) | onNext(value 11)
value 11
[ INFO] (parallel-scheduler-1) | onNext(value 12)
value 12
[ INFO] (parallel-scheduler-1) | onComplete()
*/

9、错误处理

命令式编程:常见的错误处理方式

1. Catch and return a static default value.

捕获异常返回一个静态默认值

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return "RECOVERED";
}

onErrorReturn: 实现上面效果,错误的时候返回一个值

  • 1、吃掉异常,消费者无异常感知
  • 2、返回一个兜底默认值
  • 3、流正常完成(不再处理后续元素);
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .subscribe(
        v-> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        ()-> System.out.println("流结束")  // error handling example
    );

/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    // 只要发生异常,就使用默认值
    .onErrorReturn("哈哈-6666")
    .subscribe(
        v-> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        ()-> System.out.println("流结束")  // error handling example
    );

/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-6666
流结束
*/
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    // 是指定的异常,才使用默认值
    // .onErrorReturn(ArithmeticException.class,"哈哈-6666")
    .onErrorReturn(NullPointerException.class,"哈哈-6666")
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")  // error handling example
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = java.lang.ArithmeticException: / by zero
*/
2. Catch and execute an alternative path with a fallback method.

吃掉异常,执行一个兜底方法;

try {
  return doSomethingDangerous(10);
}
catch (Throwable error) {
  return doOtherthing(10);
}

onErrorResume

  • 1、吃掉异常,消费者无异常感知
  • 2、调用一个兜底方法
  • 3、流正常完成
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .onErrorResume(err -> Mono.just("哈哈-777"))
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
3. Catch and dynamically compute a fallback value.

捕获并动态计算一个返回值,即根据错误返回一个新值

try {
  Value v = erroringMethod();
  return MyWrapper.fromValue(v);
}
catch (Throwable error) {
  return MyWrapper.fromError(error);
}
.onErrorResume(err -> Flux.error(new BusinessException(err.getMessage()+":炸了")))
  • 1、吃掉异常,消费者有感知
  • 2、调用一个自定义方法
  • 3、流异常完成
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .onErrorResume(err -> {
        if (err instanceof NullPointerException) {
            return Mono.just("哈哈-777");
        }
        return Mono.just("其它");
    })
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
v = 哈哈-777
流结束
*/
4. Catch, wrap to a BusinessException, and re-throw.

捕获并包装成一个业务异常,并重新抛出

try {
  return callExternalService(k);
}
catch (Throwable error) {
  throw new BusinessException("oops, SLA exceeded", error);
}

包装重新抛出异常: 推荐用 .onErrorMap

  • 1、吃掉异常,消费者有感知
  • 2、抛新异常
  • 3、流异常完成
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .onErrorResume(err -> Flux.error(new BusinessException(err.getMessage())))
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .onErrorMap(err -> {
        return new BusinessException("除数不能为0" + err.getMessage());
    })
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err = com.zzhua.test02.BusinessException
*/
5. Catch, log an error-specific message, and re-throw.

捕获异常,记录特殊的错误日志,重新抛出

try {
  return callExternalService(k);
}
catch (RuntimeException error) {
  //make a record of the error
  log("uh oh, falling back, service failed for key " + k);
  throw error;
}
  • 异常被捕获、做自己的事情
  • 不影响异常继续顺着流水线传播
  • 不吃掉异常,只在异常发生的时候做一件事,订阅者有感知
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .doOnError(err -> {
        System.out.println("err已被记录 = " + err);
    })
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );

/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
*/
6. Use the finally block to clean up resources or a Java 7 “try-with-resource” construct.
Flux.just(1, 2, 0, 4)
    .map(i -> "100 / " + i + " = " + (100 / i))
    .doOnError(err -> {
        System.out.println("err已被记录 = " + err);
    })
    .doFinally(signalType -> {
        System.out.println("流信号:" + signalType);
    })
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 100 / 1 = 100
v = 100 / 2 = 50
err已被记录 = java.lang.ArithmeticException: / by zero
err = java.lang.ArithmeticException: / by zero
流信号:onError
*/
7. 忽略当前异常,仅通知记录,继续推进
Flux.just(1, 2, 3, 0, 5)
    .map(i -> 10 / i)
    .onErrorContinue((err, val) -> {
        System.out.println("err = " + err);
        System.out.println("val = " + val);
        System.out.println("发现" + val + "有问题了,继续执行其他的,我会记录这个问题");
    }) //发生
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
val = 0
发现0有问题了,继续执行其他的,我会记录这个问题
v = 2
流结束
*/
8.其它
Flux.just(1, 2, 3, 0, 5)
    .map(i -> 10 / i)
    .onErrorStop() // 错误后,停止流,源头中断,所有订阅者全部结束,错误结束
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 10
v = 5
v = 3
err = java.lang.ArithmeticException: / by zero
*/
Flux.just(1, 2, 3, 0, 5)
    .map(i -> 10 / i)
    .onErrorComplete() //发生错误后,停止流
    .subscribe(
        v -> System.out.println("v = " + v),
        err -> System.out.println("err = " + err),
        () -> System.out.println("流结束")
    );
/*
v = 10
v = 5
v = 3
流结束
*/

10、常用操作

filter、flatMap、concatMap、flatMapMany、transform、defaultIfEmpty、switchIfEmpty、concat、concatWith、merge、mergeWith、mergeSequential、zip、zipWith…

filter
Flux.just(1, 2, 3, 4)
    .log()
    .filter(e -> e % 2 == 0)
    .subscribe();

/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArrayConditionalSubscription)
[ INFO] (main) | request(unbounded)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (main) | onNext(3)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(4)
[ INFO] (main) | onComplete()
*/
filterMap
Flux.just("zhang san", "li si")
    .log()
    .flatMap((t)->{
        String[] arr = t.split("\\s");
        return Flux.fromArray(arr);
    })
    .log()
    .subscribe();

/*
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[ INFO] (main) onSubscribe(FluxFlatMap.FlatMapMain)
[ INFO] (main) request(unbounded)
[ INFO] (main) | request(256)
[ INFO] (main) | onNext(zhang san)
[ INFO] (main) onNext(zhang)
[ INFO] (main) onNext(san)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(li si)
[ INFO] (main) onNext(li)
[ INFO] (main) onNext(si)
[ INFO] (main) | request(1)
[ INFO] (main) | onComplete()
[ INFO] (main) onComplete()
*/
concatMap
Flux.just(1, 2)
    .concatMap(s -> Flux.just(s * 10, s * 100))
    .log()
    .subscribe((e)-> System.out.println("e = " + e))
    ;

/*
[ INFO] (main) onSubscribe(FluxConcatMapNoPrefetch.FluxConcatMapNoPrefetchSubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(10)
e = 10
[ INFO] (main) onNext(100)
e = 100
[ INFO] (main) onNext(20)
e = 20
[ INFO] (main) onNext(200)
e = 200
[ INFO] (main) onComplete()
*/
concat
Flux.concat(Flux.just(1, 2), Flux.just("a", "b"))
    .log()
    .subscribe();

/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(a)
[ INFO] (main) onNext(b)
[ INFO] (main) onComplete()
*/
concatWith
Flux.just(1, 2).concatWith(Flux.just(3, 4))
    .log()
    .subscribe
    
/*
[ INFO] (main) onSubscribe(FluxConcatArray.ConcatArraySubscriber)
[ INFO] (main) request(unbounded)
[ INFO] (main) onNext(1)
[ INFO] (main) onNext(2)
[ INFO] (main) onNext(3)
[ INFO] (main) onNext(4)
[ INFO] (main) onComplete()
*/
transform
AtomicInteger atomic = new AtomicInteger(0);

Flux<String> flux = Flux.just("a", "b", "c")
    .transform(values -> {
        if (atomic.incrementAndGet() == 1) {
            //如果是:第一次调用,老流中的所有元素转成大写
            return values.map(String::toUpperCase);
        } else {
            //如果不是第一次调用,原封不动返回
            return values;
        }
    });

//transform 无defer,不会共享外部变量的值。 无状态转换; 原理,无论多少个订阅者,transform只执行一次
//transform 有defer,会共享外部变量的值。   有状态转换; 原理,无论多少个订阅者,每个订阅者transform都只执行一次
flux.subscribe(v -> System.out.println("订阅者1:v = " + v));
flux.subscribe(v -> System.out.println("订阅者2:v = " + v));

/*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = A
订阅者2:v = B
订阅者2:v = C
*/
改成transformDeferred
 /*
订阅者1:v = A
订阅者1:v = B
订阅者1:v = C
订阅者2:v = a
订阅者2:v = b
订阅者2:v = c
*/   
defaultIfEmpty
/**
  * defaultIfEmpty:  静态兜底数据
  * switchIfEmpty:  空转换; 调用动态兜底方法;  返回新流数据
  */
void empty() {
    //Mono.just(null);//流里面有一个null值元素
    //Mono.empty();//流里面没有元素,只有完成信号/结束信号
    
    haha()
        .defaultIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;
        .subscribe(v -> System.out.println("v = " + v));

    haha()
        .switchIfEmpty(hehe())//如果发布者元素为null,指定默认值,否则用发布者的值;
        .subscribe(v -> System.out.println("v = " + v));

}

Mono<String> hehe() {
    return Mono.just("兜底数据...");
}

Mono<String> haha() {
    return Mono.empty();
}
merge
/**
     * concat: 连接; A流 所有元素和 B流所有元素拼接
     * merge:合并; A流 所有元素和 B流所有元素 按照时间序列合并
     * mergeWith:
     * mergeSequential: 按照哪个流先发元素排队
     */
@Test
void merge() throws IOException {

    Flux.mergeSequential();

    Flux.merge(
        Flux.just(1, 2, 3).delayElements(Duration.ofSeconds(1)),
        Flux.just("a", "b").delayElements(Duration.ofMillis(1500)),
        Flux.just("haha", "hehe", "heihei", "xixi").delayElements(Duration.ofMillis(500)))
        .log()
        .subscribe();

    Flux.just(1, 2, 3).mergeWith(Flux.just(4, 5, 6));

    System.in.read();
}
zip
/**
  * zip: 无法结对的元素会被忽略;
  * 最多支持8流压缩;
  */
void zip (){
    //Tuple:元组;
    // Flux< Tuple2:<Integer,String> >

    Flux.just(1,2,3)
        .zipWith(Flux.just("a","b","c","d"))
        .map(tuple -> {
            Integer t1 = tuple.getT1(); // 元组中的第一个元素
            String t2 = tuple.getT2();  // 元组中的第二个元素
            return t1 + "==>" + t2;
        })
        .log()
        .subscribe(v-> System.out.println("v = " + v));


}

11、超时与重试

Flux.just(1)
    .delayElements(Duration.ofSeconds(3))
    .log()
    .timeout(Duration.ofSeconds(2))
    .retry(3)  // 把流从头到尾重新请求1次
    .map(i -> "haha-" + i)
    .subscribe(e-> System.out.println("e = " + e))
    ;

LockSupport.park();

/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
[ERROR] (parallel-7) Operator called default onErrorDropped - reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
Caused by: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 2000ms in 'log' (and no fallback has been configured)
	at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:296)
...
*/
Flux.just(1)
    .delayElements(Duration.ofSeconds(3))
    .log()
    .timeout(Duration.ofSeconds(2))
    .retry(3)  // 把流从头到尾重新请求1次
    .onErrorReturn(2) // 上面重试失败后, 会抛出异常, 这里在抛出异常的情况下返回2
    .map(i -> "haha-" + i)
    .subscribe(e-> System.out.println("e = " + e))
    ;

LockSupport.park();

/*
[ INFO] (main) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (main) request(unbounded)
[ INFO] (parallel-1) cancel()
[ INFO] (parallel-1) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-1) request(unbounded)
[ INFO] (parallel-3) cancel()
[ INFO] (parallel-3) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-3) request(unbounded)
[ INFO] (parallel-5) cancel()
[ INFO] (parallel-5) onSubscribe(MonoDelayUntil.DelayUntilCoordinator)
[ INFO] (parallel-5) request(unbounded)
[ INFO] (parallel-7) cancel()
e = haha-2
*/

12、Sinks工具类

// Sinks.many(); // 发送Flux数据
// Sinks.one();  // 发送Mono数据

// Sinks: 接受器,数据管道,所有数据顺着这个管道往下走的

Sinks.many().unicast();   // 单播    这个管道只能绑定单个订阅者(消费者)
Sinks.many().multicast(); // 多播    这个管道能绑定多个订阅者
Sinks.many().replay();    // 重放    这个管道能重放元素。是否给后来的订阅者把之前的元素依然发给它;
单播/多播/重放/背压
// Sinks.Many<Object> many = Sinks.many()
//         .multicast() //多播
//         .onBackpressureBuffer(); //背压队列

//默认订阅者,从订阅的那一刻开始接元素

//发布者数据重放; 底层利用队列进行缓存之前数据
Sinks.Many<Object> many = Sinks.many().replay().limit(3);

new Thread(() -> {
    for (int i = 0; i < 10; i++) {
        many.tryEmitNext("a-" + i);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}).start();


//订阅
many.asFlux().subscribe(v -> System.out.println("v1 = " + v));

new Thread(() -> {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    many.asFlux().subscribe(v -> System.out.println("v2 = " + v));
}).start();
缓存
Flux<Integer> cache = Flux.range(1, 10)
    .delayElements(Duration.ofSeconds(1)) // 不调缓存默认就是缓存所有
    .cache(2);                            // 缓存两个元素; 默认全部缓存

// 立即订阅
cache.subscribe();

new Thread(()->{
    // 5s后再去订阅
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    cache.subscribe(v-> System.out.println("v = " + v));
}).start();

LockSupport.park();

/*
v = 3
v = 4
v = 5
v = 6
v = 7
v = 8
v = 9
v = 10
*/

13、阻塞式api

block
Integer integer = Flux.just(1, 2, 4)
    .map(i -> i + 10)
    .blockLast();
System.out.println(integer);

List<Integer> integers = Flux.just(1, 2, 4)
    .map(i -> i + 10)
    .collectList()
    .block(); // 也是一种订阅者; BlockingMonoSubscriber
System.out.println("integers = " + integers);

/*
14
integers = [11, 12, 14]
*/
// 百万数据,8个线程,每个线程处理100,进行分批处理一直处理结束
Flux.range(1,1000000)
    .buffer(100)
    .parallel(8)
    .runOn(Schedulers.newParallel("yy"))
    .log()
    .flatMap(list->Flux.fromIterable(list))
    .collectSortedList(Integer::compareTo)
    .subscribe(v-> System.out.println("v = " + v));

LockSupport.park();

14、Context api

//Context-API: https://projectreactor.io/docs/core/release/reference/#context
//ThreadLocal在响应式编程中无法使用。
//响应式中,数据流期间共享数据,Context API: Context:读写 ContextView:只读;
static void threadlocal() {

    //支持Context的中间操作
    Flux.just(1, 2, 3)
        .transformDeferredContextual((flux, context) -> {
            System.out.println("flux = " + flux);
            System.out.println("context = " + context);
            return flux.map(i -> i + "==>" + context.get("prefix"));
        })
        //上游能拿到下游的最近一次数据
        .contextWrite(Context.of("prefix", "哈哈"))
        //ThreadLocal共享了数据,上游的所有人能看到; Context由下游传播给上游
        .subscribe(v -> System.out.println("v = " + v));


    // 以前 命令式编程

    // controller -- service -- dao
    // 响应式编程 dao(10:数据源) --> service(10) --> controller(10); 从下游反向传播


}

WebFlux

  • Reactor核心HttpHandler 原生API;

  • DispatcherHandler 原理;

    • DispatcherHandler 组件分析
    • DispatcherHandler 请求处理流程
    • 返回结果处理
    • 异常处理
    • 视图解析
      • 重定向
      • Rendering
  • 注解式 - Controller

    • 兼容老版本方式
    • 新版本变化
      • SSE
      • 文件上传
  • 错误响应

    • @ExceptionHandler
      • ErrorResponse: 自定义 错误响应
      • ProblemDetail:自定义PD返回
  • WebFlux配置

    • @EnableWebFlux
    • WebFluxConfigurer

WebFlux:底层完全基于netty+reactor+springweb 完成一个全异步非阻塞的web响应式框架

底层:异步 + 消息队列(内存) + 事件回调机制 = 整套系统

优点:能使用少量资源处理大量请求;

0、组件对比

API功能Servlet-阻塞式WebWebFlux-响应式Web
前端控制器DispatcherServletDispatcherHandler
处理器ControllerWebHandler/Controller
请求、响应ServletRequestServletResponseServerWebExchange: ServerHttpRequest、ServerHttpResponse
过滤器Filter(HttpFilter)WebFilter
异常处理器HandlerExceptionResolverDispatchExceptionHandler
Web配置@EnableWebMvc@EnableWebFlux
自定义配置WebMvcConfigurerWebFluxConfigurer
返回结果任意Mono、Flux、任意
发送REST请求RestTemplateWebClient

Mono: 返回[ 0 |1 ]数据流

Flux:返回[ N ]数据流

1、WebFlux

底层基于Netty实现的Web容器与请求/响应处理机制

参照:https://docs.spring.io/spring-framework/reference/6.0/web/webflux.html

1、引入

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.6</version>
</parent>


<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
</dependencies>

Context 响应式上下文数据传递; 由下游传播给上游;

以前: 浏览器 --> Controller --> Service --> Dao: 阻塞式编程

现在: Dao(数据源查询对象【数据发布者】) --> Service --> Controller --> 浏览器: 响应式

大数据流程: 从一个数据源拿到大量数据进行分析计算;

ProductVistorDao.loadData()

.distinct()

.map()

.filter()

.handle()

.subscribe();

;//加载最新的商品浏览数据

img

2、Reactor Core

1、HttpHandler、HttpServer
public static void main(String[] args) throws IOException {
    //快速自己编写一个能处理请求的服务器

    //1、创建一个能处理Http请求的处理器。 参数:请求、响应; 返回值:Mono<Void>:代表处理完成的信号
    HttpHandler handler = (ServerHttpRequest request,
                           ServerHttpResponse response)->{
        URI uri = request.getURI();
        System.out.println(Thread.currentThread()+"请求进来:"+uri);
        //编写请求处理的业务,给浏览器写一个内容 URL + "Hello~!"
        //            response.getHeaders();    // 获取响应头
        //            response.getCookies();    // 获取Cookie
        //            response.getStatusCode(); // 获取响应状态码;
        //            response.bufferFactory(); // buffer工厂
        //            response.writeWith()      // 把xxx写出去
        //            response.setComplete();   // 响应结束, 该方法返回Mono<Void>

        //创建 响应数据的 DataBuffer
        DataBufferFactory factory = response.bufferFactory();

        //数据Buffer
        DataBuffer buffer = factory.wrap(new String(uri + " => Hello!").getBytes());

		// 数据的发布者:Mono<DataBuffer>、Flux<DataBuffer>
        // 需要一个 DataBuffer 的发布者
        return response.writeWith(Mono.just(buffer));
    };

    //2、启动一个服务器,监听8080端口,接受数据,拿到数据交给 HttpHandler 进行请求处理
    ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);


    //3、启动Netty服务器
    HttpServer.create()
        .host("localhost")
        .port(8080)
        .handle(adapter) //用指定的处理器处理请求
        .bindNow(); //现在就绑定

    System.out.println("服务器启动完成....监听8080,接受请求");
    System.in.read();
    System.out.println("服务器停止....");


}

3、DispatcherHandler

SpringMVC: DispatcherServlet;

SpringWebFlux: DispatcherHandler

1、请求处理流程
  • HandlerMapping:请求映射处理器; 保存每个请求由哪个方法进行处理
  • HandlerAdapter:处理器适配器;反射执行目标方法
  • HandlerResultHandler:处理器结果处理器;

SpringMVC: DispatcherServlet 有一个 doDispatch() 方法,来处理所有请求;

WebFlux: DispatcherHandler 有一个 handle() 方法,来处理所有请求;

public Mono<Void> handle(ServerWebExchange exchange) { 
    if (this.handlerMappings == null) {
        return createNotFoundError();
    }
    if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
        return handlePreFlight(exchange);
    }
    return Flux.fromIterable(this.handlerMappings) //拿到所有的 handlerMappings
        // 找每一个mapping看谁能处理请求
        .concatMap(mapping -> mapping.getHandler(exchange)) 
        .next() // 直接触发获取元素; 拿到流的第一个元素; 找到第一个能处理这个请求的handlerAdapter
        .switchIfEmpty(createNotFoundError()) // 如果没拿到这个元素,则响应404错误;
        // 异常处理,一旦前面发生异常,调用处理异常
        .onErrorResume(ex -> handleDispatchError(exchange, ex)) 
        // 调用方法处理请求,得到响应结果
        .flatMap(handler -> handleRequestWith(exchange, handler)); 
}
  • 1、请求和响应都封装在 ServerWebExchange 对象中,由handle方法进行处理
  • 2、如果没有任何的请求映射器; 直接返回一个: 创建一个未找到的错误; 404; 返回Mono.error;终结流
  • 3、跨域工具,是否跨域请求,跨域请求检查是否复杂跨域,需要预检请求;
  • 4、Flux流式操作,先找到HandlerMapping,再获取handlerAdapter,再用Adapter处理请求,期间的错误由onErrorResume触发回调进行处理;

源码中的核心两个:

  • handleRequestWith: 编写了handlerAdapter怎么处理请求
  • handleResult: String、User、ServerSendEvent、Mono、Flux …

concatMap: 先挨个元素变,然后把变的结果按照之前元素的顺序拼接成一个完整流

private <R> Mono<R> createNotFoundError() {
    Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);
    return Mono.error(ex);
}
Mono.defer(() -> {
    Exception ex = new ResponseStatusException(HttpStatus.NOT_FOUND);
    return Mono.error(ex);
}); //有订阅者,且流被激活后就动态调用这个方法; 延迟加载;

4、注解开发

1、目标方法传参

https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/arguments.html

Controller method argumentDescription
ServerWebExchange封装了请求和响应对象的对象; 自定义获取数据、自定义响应
ServerHttpRequest, ServerHttpResponse请求、响应
WebSession访问Session对象
java.security.Principal
org.springframework.http.HttpMethod请求方式
java.util.Locale国际化
java.util.TimeZone + java.time.ZoneId时区
@PathVariable路径变量
@MatrixVariable矩阵变量
@RequestParam请求参数
@RequestHeader请求头;
@CookieValue获取Cookie
@RequestBody获取请求体,Post、文件上传
HttpEntity封装后的请求对象
@RequestPart获取文件上传的数据 multipart/form-data.
java.util.Map, org.springframework.ui.Model, and org.springframework.ui.ModelMap.Map、Model、ModelMap
@ModelAttribute
Errors, BindingResult数据校验,封装错误
SessionStatus + class-level @SessionAttributes
UriComponentsBuilderFor preparing a URL relative to the current request’s host, port, scheme, and context path. See URI Links.
@SessionAttribute
@RequestAttribute转发请求的请求域数据
Any other argument所有对象都能作为参数:1、基本类型 ,等于标注@RequestParam 2、对象类型,等于标注 @ModelAttribute
2、返回值写法

sse和websocket区别:

  • SSE:单工;请求过去以后,等待服务端源源不断的数据
  • websocket:双工: 连接建立后,可以任何交互;
Controller method return valueDescription
@ResponseBody把响应数据写出去,如果是对象,可以自动转为json
HttpEntity, ResponseEntityResponseEntity:支持快捷自定义响应内容
HttpHeaders没有响应内容,只有响应头
ErrorResponse快速构建错误响应
ProblemDetailSpringBoot3;
String就是和以前的使用规则一样;forward: 转发到一个地址redirect: 重定向到一个地址配合模板引擎
View直接返回视图对象
java.util.Map, org.springframework.ui.Model以前一样
@ModelAttribute以前一样
Rendering新版的页面跳转API; 不能标注 @ResponseBody 注解
void仅代表响应完成信号
Flux, Observable, or other reactive type使用 text/event-stream 完成SSE效果
Other return values未在上述列表的其他返回值,都会当成给页面的数据;

5、文件上传

https://docs.spring.io/spring-framework/reference/6.0/web/webflux/controller/ann-methods/multipart-forms.html

class MyForm {

	private String name;

	private MultipartFile file;

	// ...

}

@Controller
public class FileUploadController {

	@PostMapping("/form")
	public String handleFormUpload(MyForm form, BindingResult errors) {
		// ...
	}

}

现在

@PostMapping("/")
public String handle(@RequestPart("meta-data") Part metadata, 
		@RequestPart("file-data") FilePart file) { 
	// ...
}

6、错误处理

@ExceptionHandler(ArithmeticException.class)
public String error(ArithmeticException exception){
    System.out.println("发生了数学运算异常"+exception);

    //返回这些进行错误处理;
    //        ProblemDetail:  建造者:声明式编程、链式调用
    //        ErrorResponse : 

    return "炸了,哈哈...";
}

7、RequestContext

8、自定义Flux配置

WebFluxConfigurer

容器中注入这个类型的组件,重写底层逻辑

@Configuration
public class MyWebConfiguration {

    //配置底层
    @Bean
    public WebFluxConfigurer webFluxConfigurer(){

        return new WebFluxConfigurer() {
            @Override
            public void addCorsMappings(CorsRegistry registry) {
                registry.addMapping("/**")
                        .allowedHeaders("*")
                        .allowedMethods("*")
                        .allowedOrigins("localhost");
            }
        };
    }
}

9、Filter

@Component
public class MyWebFilter implements WebFilter {
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();

        System.out.println("请求处理放行到目标方法之前...");
        Mono<Void> filter = chain.filter(exchange); //放行


        //流一旦经过某个操作就会变成新流

        Mono<Void> voidMono = filter.doOnError(err -> {
                    System.out.println("目标方法异常以后...");
                }) // 目标方法发生异常后做事
                .doFinally(signalType -> {
                    System.out.println("目标方法执行以后...");
                });// 目标方法执行之后

        //上面执行不花时间。
        return voidMono; //看清楚返回的是谁!!!
    }
}

http://www.kler.cn/a/576846.html

相关文章:

  • Windows、macOS和Linux系统的统计文件夹下的文件数量的方法
  • 笔记:代码随想录算法训练营day38: LeetCode322. 零钱兑换、279.完全平方数、139.单词拆分;多重背包
  • 数学建模:MATLAB强化学习
  • MacBook上API调⽤⼯具推荐
  • 线性代数笔记28--奇异值分解(SVD)
  • 【从零开始学习计算机科学】数字逻辑(五) Verilog HDL语言
  • Lab17_ Blind SQL injection with out-of-band data exfiltration
  • MTK Android12 桌面上显示文件管理器图标
  • 深入剖析 ConcurrentHashMap:高并发场景下的高效哈希表
  • 查看k8s集群的资源使用情况
  • Azure云生态系统详解:核心服务、混合架构与云原生概念
  • 一文了解基于AUTOSAR的ECU传感器信号处理全流程
  • [PWNME 2025] PWN 复现
  • 数据结构(回顾)
  • 安装CUDA12.1和torch2.2.1下的DKG
  • 基于cross-attention算法关联文本和图像、图像和动作
  • Logstash同步MySQL到ES
  • 从0到1入门Linux
  • MongoDB(一) - MongoDB安装教程(Windows + Linux)
  • STM32使用无源蜂鸣器