Reactive StreamsReactor Core
Reactive Streams&Reactor Core
- 一、概述
- 1、问题
- 2、优势
- 3、发展
- 二、Reactive Streams
- 1、依赖
- 2、API
- 三、Project Reactor
- 1、概述
- 2、并发模型
- 3、入门
- 1)依赖
- 2)Flux和Mono
- 3)空流&错误流
- 4、订阅响应式流
- 1)常见订阅
- 2)自定义订阅
- 5、API
- 1)index
- 2)timestamp
- 3)any
- 4)map
- 5)filter
- 6)collect
- 7)distinct
- 8)flatMap
- 9)scan
- 10)thenMany
- 11)组合响应流
- 12)流元素批处理
- 13)sample(采样)
- 14)blockLast(响应流转化为阻塞流)
- 15)materialize(物化和非物化)
- 16)onErrorResume(错误处理)
- 17)defer(冷热数据流)
- 18)transform(组合&转换响应流)
- 6、编程方式创建流
- 1)push&create
- 2)generate
- 3)using-disposable
- 4)usingWhen
一、概述
1、问题
传统的命令式编程在面对当前需求时会有一些限制,比如,在应用负载较高时,应用需要有更高的可用性,并提供低的延迟时间。
1)资源消耗大
使用Servlet开发的单体应用,是基于传统的Thread per Request模型。当服务部署到Tomcat后,Tomcat有线程池,每个请求交给线程池中的一个线程来执行,如果执行过程中包括访问数据库,或者包括读取文件,则在调用数据库时或读取文件时,请求线程是阻塞的,即使是阻塞线程也是占用资源的,典型的每个线程要使用1MB的内存。如果有并发请求,则会同时有多个线程处于阻塞状态,每个线程占据一份资源。
同时,Tomcat的线程池大小决定了可以同时处理多少个请求。如果应用基于微服务架构,我们可以横向扩展,但是也有内存高占用的问题。因此,当并发数很大的时候,Thread per Request模型很消耗资源。
2)压垮客户端
服务A请求服务的数据,如果数据量很大,超过了服务能处理的程度,则导致服务OOM
2、优势
使用响应式编程的优势:
- 不用Thread per request模型,使用少量线程即可处理大量的请求
- 在执行I/O操作时不让线程等待
- 简化并行调用
- 支持背压,让客户端告诉服务端它可以处理多少负载。
3、发展
Reactive Streams:
Reactive Streams是个规范,它规范了“有非阻塞背压机制的异步的流处理”。真正正确理解异步、非阻塞并不容易。实际上Reactive Streams规范或者说它的第三方代码实现包含的内容更加丰富:除了non-blocking,还有:Composable、Deferred、Flow Controll、Resilient、Interruptible。
其中Composable就是函数式编程思想的体现。 可体会下Java8里的Stream API各种算子的参数,所以Lamda表达式是进行Reactive Streams实现的基本前提,否则很难想象臃肿的面向对象的Composable。有了JDK8的铺垫,Reactive Streams接口被JDK9定义在Flow里才是可能的
ReactiveX Java(Rx Java):
2011年,微软发布了NET的响应式扩展(Reactive Extensions,即ReactiveX或Rx),以方便异步、事件驱动的程序。ReactiveX混合了迭代模式和观察者模式。不同之处在于一个是推模式,一个是基于迭代器的拉模式。除了对变化事件的观察,完成事件和异常事件也会发布给订阅者。
ReactiveX的基本思想是事件是数据,数据是事件。响应式扩展被移植到几种语言和平台上,当然包括JavaScript、Python、C++、Swift和Java。ReactiveX很快成为一种跨语言的标准,将反应式编程引入到行业中。
RxJava 1.0于2014年11月发布。RxJava是其他Reactivex JVM端口的主干,比如Rx Scala、Rx Kotin、RxGroovy。它已经成为Android开发的核心技术,并且已经进入Java后端开发。许多RxJavaAdapter库,例如RxAndroid、RxJava JDBC、Rx Netty和RxJavaF X调整了几个Java框架,使之成为响应式的,并且可以开箱即用地使用RxJava
Akka:
Akka是一个受欢迎的框架,具有大量功能和大型社区。然而,Akka最初是作为Scala生态系统的一部分构建的,在很长一段时间内,它仅在基于Scala编写的解决方案中展示了它的强大功能。尽管Scala是一种基于JVM的语言,但它与Java明显不同。几年前,Akka直接开始支持Java,但出于某些原因,它在Java世界中不像在Scala世界中那么受欢迎。
Vert.x:
Vert.x框架也是构建高效响应式系统的强大解决方案。Vert.x的设计初衷是作为Node.js在Java虚拟机上的替代方法,它支持非阻塞和事件驱动。然而,Vert.x仅在几年前才开始具备竞争力。
Project Reactor:
既然 Spring 都提供了对 Reactive Streams 的实现,感觉其实上面列出的几个库已经没有太多的意义。各家对Reactive Streams规范的实现在细节上都有很大不同,因为Spring 的生态太强大了,如果没有特殊的需求,比如JDK小于8,那么我们的项目使用Project Reactor应该是较好的选择。
Project Reactor 到目前为止经历了 1.0、2.0、 3.0。其中1.0这个阶段还没有Reactive Stream的规范。在2.0开始Follow 规范并基本定型。3.0 感觉是个重构版,形成 Reactive-Streams-commons库。
有了Project Reactor这样的基础库,整个Spring组件基本都有了Reactive Style的版本,在这个基础上用Netty(或 Servet 3.1 Containe)+ Reactive Streams 适配层 + Spring Security Reactive + WebFlux + Spring Data Reactive Repository,就可以构建出重头到尾的 Reactive 应用。
从Spring Cloud的组件角度讲,也衍生出Reactive Discovery Client、Reactive Load Balancer、Blockhound, Reactor Debug、Improved Reactor Micrometer Support、Reactor Netty Metric…
二、Reactive Streams
是一种规范
访问地址:https://www.reactive-streams.org/、https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.4
Reactive Streams是一种基于异步流处理的标准化规范,旨在使流处理更加可靠、高效和响应式。反应式流的特点简单来说就是:基本特性(变化传递 + 数据流 + 声明式) + 高级特性(非阻塞回压 + 异步边界)
Reactive Streams的核心思想是让发布者(Publisher)和订阅者(Subscriber)之间进行异步流处理,以实现非阻塞的响应式应用程序。发布者可以产生任意数量的元素并将其发送到订阅者,而订阅者则可以以异步方式处理这些元素。Reactive Streams还定义了一些接口和协议,以确保流处理的正确性和可靠性。
1、依赖
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.4</version>
<scope>test</scope>
</dependency>
2、API
Publisher:定义了生产元素并将其发送给订阅者的方法。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Subscriber:定义了接收元素并进行处理的方法。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
Subscription:定义了订阅者和发布者之间的协议,包括请求元素和取消订阅等。
public interface Subscription {
public void request(long n);
public void cancel();
}
Processor:定义了同时实现Publisher和Subscriber接口的中间件,它可以对元素进行转换或者过滤等操作。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
三、Project Reactor
1、概述
Project Reactor由Reactor文档中列出的一组模块组成。主要组件是Reactor Core,其中包含响应式类型Flux和Mono,它们实现了Reactive Streams的Publisher接口以及一组可应用于这些类型的运算符。
其他模块:
- Reactor Test:提供一些实用程序来测试响应流
- Reactor Extra:提供一些额外的Flux运算符
- Reactor Netty:无阻塞且支持背压的TCP、HTTP和UDP的客户端和服务器
- Reactor Adapter:用于与其他响应式库,如RxJava和Akka Streams的适配
- Reactor Kafka:用于Kafka的响应式API,作为Kafka的生产者和消费者
2、并发模型
有两种在响应式链中切换执行某些的方式:publishOn和subscribeOn
区别如下:
- publishOn(Scheduler Scheduler):影响所有后续运算符的执行(只要未指定其他任何内容)
- subscribeOn(Scheduler Scheduler):根据链中最早的subscribeOn调用,更改整个操作符链所订阅的线程。它不影影响随后对publishOn的调用的行为
Schedulers类包含用于提供执行上下文的静态方法:
- parallel():为并行工作而调整的固定工作池,可创建与cpu内核数量一样多的工作线程池。
- single:单个可重用线程。此方法为所有调用方重用同一线程,直到调度程序被释放为止。如果您希望使用按调用专用线程,则可以为每个调用使用schedulers.newSingle()
- boundedElastic:动态创建一定数量的工作者,它限制了它可以创建的支持线程的数量,并且可以在线程可用时重新调度要排队的任务。这是包装同步阻塞调用的不错选择
- immediate():立即在执行线程上运行,而不切换执行上下文
- fromExecutorService(Executorservice):可用于从任何现有ExecutorService中创建调度程序
3、入门
1)依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.3.5.RELEASE</version>
</dependency>
2)Flux和Mono
Flux和Mono提供了许多工厂方法,可以根据已有的数据创建响应流。
Flux:
public static void main(String[] args) {
Flux.just("d1", "d2").subscribe(System.out::print);
Flux.fromArray(new String[]{"d1", "d2"}).subscribe(System.out::println);
Flux.fromIterable(Arrays.asList("d1", "d2")).subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.defer(() -> Flux.range(1, 3)).subscribe(System.out::println);
}
Mono:
public static void main(String[] args) {
Mono.just(1).subscribe(System.out::println);
Mono.justOrEmpty(Optional.empty()).subscribe(System.out::println);
Mono.defer(() -> Mono.just("hello")).subscribe(System.out::println);
Mono.fromCallable(() -> "fromCallable").subscribe(System.out::println);
Mono.fromSupplier(() -> "fromSupplier").subscribe(System.out::println);
Mono.fromFuture(() -> CompletableFuture.completedFuture("fromFuture")).subscribe(System.out::println);
Mono.fromCompletionStage(() -> CompletableFuture.completedFuture("fromCompletionStage")).subscribe(System.out::println);
Mono.fromRunnable(() -> {
System.out.println("fromRunnable");
}).subscribe(System.out::println);
}
3)空流&错误流
- empty():工厂方法,它们分别生成Flux或Mono的空实例
- never():方法会创建一个永远不会发出完成、数据或错误等信号的流
- error(Throwable):工厂方法创建一个序列,该序列在订阅时始终通过每个订阅者的onError方法传播错误,由于错误是在Flux或Mono声明期间被创建的,因此,每个订阅者都会收到相同的Throwable实例
public static void main(String[] args) {
Mono.never().subscribe(System.out::println);
Flux.never().subscribe(System.out::println);
Mono.empty().subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
// subscribe的第二个参数代表错误的捕获
Mono.error(new RuntimeException("Mono error"))
.subscribe(System.out::println, System.out::println);
Flux.error(new RuntimeException("Flux error"))
.subscribe(System.out::println, System.out::println);
}
4、订阅响应式流
Flux和Mono提供了对subscribe()方法的基于Lambda的重载,简化了订阅的开发。subscribe方法的所有重载都返回Disposable接口的实例,可以用于取消基础的订阅过程。
1)常见订阅
重载相关方法
public static void main(String[] args) {
Flux.range(100, 10).subscribe(e -> {
System.out.println("接收到:" + e);
}, ex -> {
System.out.println("发生异常:" + ex);
}, () -> {
System.out.println("执行完成");
});
}
2)自定义订阅
自定义Subscriber类:
public static void main(String[] args) {
Flux.range(100, 10).subscribe(new Subscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(Integer integer) {
System.out.println("接收到:" + integer);
subscription.request(1);
}
@Override
public void onError(Throwable t) {
System.out.println("发生异常:" + t);
}
@Override
public void onComplete() {
System.out.println("执行完成");
}
});
}
该定义订阅的方法存在一定问题。它打破了线性代码流,也容易出错。最困难的部分是需要自己管理背压并正确实现订阅者的所有TCK要求。在该示例中,就打破了有关订阅验证和取消这几个TCK要求。
建议扩展Project Reactor提供的BaseSubscriber类。在这种情况下订阅示例:
public static void main(String[] args) {
Flux.range(100, 10).subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnError(Throwable throwable) {
System.out.println("发生异常:" + throwable);
}
@Override
protected void hookOnComplete() {
System.out.println("执行完成");
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("接收到:" + value);
request(1);
}
});
}
使用BaseSubscriber类,实现符合TCK的订阅者更为容易。订阅者在本身拥有生命周期管理的宝贵资源时,会需要这种方法,例如,订阅者可能包装文件处理程序或连接到第三方服务的WebSocket链接。
5、API
SDK内部还有具体的弹珠图示意
如:Flux.collect(Supplier containerSupplier, BiConsumer<E, ? super T> collector)
1)index
public static void main(String[] args) {
Flux.range(100, 10)
.index()
.subscribe(System.out::println);
}
结果:
2)timestamp
public static void main(String[] args) {
Flux.range(100, 10)
.timestamp()
.subscribe(System.out::println);
}
结果:
3)any
public static void main(String[] args) {
Flux.just("456", "789", "123")
.all(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);
Flux.just("456", "789", "123")
.any(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);
Flux.just("456", "789", "123")
.hasElement("123").subscribe(System.out::println);
Flux.just("456", "789", "123")
.hasElements().subscribe(System.out::println);
Flux.just("456")
.hasElements().subscribe(System.out::println);
Flux.empty()
.hasElements().subscribe(System.out::println);
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
结果:
4)map
public static void main(String[] args) {
Flux.range(100, 10)
.map(w -> "map:" + w)
.subscribe(System.out::println);
}
结果:
5)filter
- take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有
- takeLast仅返回流的最后一个元素。
- takeUntil(Predicate)传递一个元素直到满足某个条件。
- elementAt(n)只可用于获取序列的第n个元素。
- single操作符从数据源发出单个数据项,也为空数据源发出NoSuchElementException错误信号,或者为具有多个元素的数据源发出IndexOutOfBoundsException信号。它不仅可以基于一定数量来获取或跳过元素,还可以通过带有Duration的skip(Duration)或take(Duration)操作符。
- takeUntilOther(Publisher)或skipUntilOther(Publisher)操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
public static void main(String[] args) {
// filter
Flux.range(100, 10)
.filter(e -> e.equals(105))
.subscribe(e -> {
System.out.printf(e + "\t");
});
System.out.println();
// take
Flux.range(100, 10)
.take(5)
.subscribe(e -> {
System.out.printf(e + "\t");
});
System.out.println();
// takeLast
Flux.range(100, 10)
.takeLast(3)
.subscribe(e -> {
System.out.printf(e + "\t");
});
System.out.println();
// takeUntil
Flux.range(100, 10)
.takeUntil(e -> e.equals(108))
.subscribe(e -> {
System.out.printf(e + "\t");
});
System.out.println();
// elementAt
Flux.range(100, 10)
.elementAt(2)
.subscribe(e -> {
System.out.printf(e + "\t");
});
System.out.println();
// single
Flux.range(100, 1)
.single()
.subscribe(e -> {
System.out.printf(e + "\t");
}, ex -> {
System.out.printf("ex:" + ex + "\t");
});
System.out.println();
Flux.range(100, 10)
.single()
.subscribe(e -> {
System.out.printf(e + "\t");
}, ex -> {
System.out.printf("ex:" + ex + "\t");
});
System.out.println();
Flux.empty()
.single()
.subscribe(e -> {
System.out.printf(e + "\t");
}, ex -> {
System.out.printf("ex:" + ex + "\t");
});
System.out.println();
// skipUntilOther&takeUntilOther
Mono<String> start = Mono.just("start").delayElement(Duration.ofSeconds(1));
Mono<String> stop = Mono.just("stop").delayElement(Duration.ofSeconds(3));
Flux.range(100, 10)
.delayElements(Duration.ofMillis(400))
.map(item -> "map:" + item)
.skipUntilOther(start)
.takeUntilOther(stop)
.subscribe(e -> {
System.out.printf(e + "\t");
});
try {
// 阻塞线程,避免延时任务提前结束
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
结果:
6)collect
收集列表中的所有元素,并使用Flux.collectList()和Flux.collectSortedList()将结果集合处理为Mono流是可能的。Flux.collectSortedList()不仅会收集元素,还会对它们进行排序。
请注意,收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗掉所有可用的内存。
public static void main(String[] args) {
Flux.just("123", "456", "789", "123")
.subscribe(System.out::println);
Flux.just("456", "789", "123")
.collectList().subscribe(System.out::println);
Flux.just("456", "789", "123")
.collectSortedList().subscribe(System.out::println);
Flux.just("456", "789", "123")
.collectMap(e -> "key-" + e, e -> "value-" + e).subscribe(System.out::println);
Flux.just("456", "789", "123")
.collectMap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);
Flux.just("456", "789", "123")
.collectMultimap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
结果:
7)distinct
public static void main(String[] args) {
AtomicInteger times = new AtomicInteger(0);
Flux.just("456", "789", "123")
.repeat(2)
.subscribe(e -> {
if (e.equalsIgnoreCase("456")) {
times.addAndGet(1);
System.out.print("\n" + "重复打印:" + times.get());
}
System.out.print("\t" + e);
});
times.set(0);
Flux.just("456", "789", "123")
.repeat(2)
.distinct()
.subscribe(e -> {
if (e.equalsIgnoreCase("456")) {
times.addAndGet(1);
System.out.print("\n" + "重复打印:" + times.get());
}
System.out.print("\t" + e);
});
System.out.print("\n" + "删除重复行:");
Flux.just("5456", "5789", "5123", "5456", "5789", "5123", "5456")
.distinct().subscribe(e -> {
System.out.printf(e + "\t");
}
);
System.out.print("\n" + "删除相邻行:");
Flux.just("5456", "5456", "5123", "5456", "5789", "5123", "5456")
.distinctUntilChanged().subscribe(e -> {
System.out.printf(e + "\t");
}
);
System.out.println();
Flux.empty()
.defaultIfEmpty("Flux#empty").subscribe(System.out::println);
try {
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
结果:
8)flatMap
- flatMap:将传入的元素转化为响应流后,再合并为一个新的响应流。会立即订阅新的流,不一定保证原始顺序,且允许来自不同子流的元素进行交错
- concatMap:整体与flatMap类似,但不会立即订阅新的流,会在生成下一个子流并订阅它之前等待每个内部完成,天生保留与源元素相同的顺序,不允许来自不同子流的元素交错
- flatMapSequential:整体与flatMap类似,但是会通过对所接收的元素进行排序来进行保留顺序,同样不允许来自不同子流的元素交错
@SneakyThrows
public static void main(String[] args) {
Random random = new Random();
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.flatMap(e -> Flux.fromIterable(e)
.delayElements(Duration.ofMillis(random.nextInt(200))))
.subscribe(System.out::println, null, () -> {
System.out.println("complete");
});
Thread.sleep(2000);
long l = System.currentTimeMillis();
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.concatMap(e -> Flux.fromIterable(e)
.delayElements(Duration.ofMillis(500)))
.subscribe(System.out::println, null, () -> {
System.out.println(System.currentTimeMillis() - l);
});
Thread.sleep(4000);
long l2 = System.currentTimeMillis();
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
.flatMapSequential(e -> Flux.fromIterable(e)
.delayElements(Duration.ofMillis(500)))
.subscribe(System.out::println, null, () -> {
System.out.println(System.currentTimeMillis() - l2);
});
System.in.read();
}
结果:
9)scan
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5)
.scan(0, (a, b) -> a + b).subscribe(System.out::println);
}
结果:
10)thenMany
public static void main(String[] args) {
Flux.just(1, 2, 3, 4, 5)
.thenMany(Flux.just(6, 7, 8, 9, 10)).subscribe(System.out::println);
}
结果:
11)组合响应流
- concat操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后第二个流执行相同的操作
- merge操作符将来自上游序列的数据合并到一个下游序列中。与concat操作符不同,上游数据源是立即(同时)被订阅的
- zip操作符号订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中
- combineLatest操作符与zip操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值
@SneakyThrows
public static void main(String[] args) {
Flux.concat(
Flux.range(1, 3),
Flux.just(5, 6, 7))
.subscribe(e -> {
System.out.println("concat:" + e);
});
Flux.merge(
Flux.range(1, 3).delayElements(Duration.ofMillis(500)),
Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)))
.subscribe(e -> {
System.out.println("merge:" + e);
});
Flux.zip(
Flux.range(1, 4).delayElements(Duration.ofMillis(500)),
Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)))
.subscribe(e -> {
System.out.println("zip:" + e);
});
Flux.combineLatest(
Flux.range(1, 4).delayElements(Duration.ofMillis(500)),
Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)),
(a, b) -> "combineLatest==>" + a + b)
.subscribe(System.out::println);
System.in.read();
}
结果:
12)流元素批处理
- 将元素缓冲(buffering)到容器(如list)中,结果流的类型为Flux<List>
- 通过开窗(windowing)方式将元素加入诸如Flux<Flux>等流中。请注意,现在的流信号不是值,而是可以处理的子流。
- 通过某些键将元素分组(grouping)到具有Flux<GroupedFlux<K,T>>类型的流中。每个新键都会触发一个新的GroupedFlux实例,并且具有该键的所有元素都将被推送到GroupedFlux类的该实例中。
可以基于以下场景进行缓冲和开窗操作:
- 处理元素的数量,比方说每10个元素;
- 一段时间,比方说每5分钟一次;
- 基于一些谓语,比方说在每个新的偶数之前切割;
- 基于来自其他Flux的一个事件,该事件控制着执行过程。
@SneakyThrows
public static void main(String[] args) {
Flux.range(1, 7)
.buffer(2)
.doOnNext(e -> {
System.out.println("buffer start");
})
.subscribe(System.out::println);
Flux.range(1, 7).window(2)
.doOnNext(e -> {
System.out.println("window start");
})
.subscribe(integerFlux -> {
// 由于是子流,所以需要再次订阅
integerFlux.subscribe(System.out::println);
});
Flux.range(1, 7)
.groupBy(e -> e < 4 ? "small" : "large")
.subscribe(e -> {
ArrayList<Integer> integers = new ArrayList<>();
e.scan(integers, (list, value) -> {
list.add(value);
return list;
})
.doOnComplete(() -> {
System.out.println(e.key() + " ==>: " + integers);
})
// 由于是子流,所以需要再次订阅
.subscribe();
});
System.in.read();
}
结果:
13)sample(采样)
可以让产生的流能够周期性的发出与时间窗口内最近看到的值相对应的数据项
@SneakyThrows
public static void main(String[] args) {
Flux.range(1, 5)
.delayElements(Duration.ofMillis(400))
.sample(Duration.ofMillis(1000))
.subscribe(System.out::println);
System.in.read();
}
结果:
14)blockLast(响应流转化为阻塞流)
- toIterable方法将响应式Flux转换为阻塞Iterable
- toStream方法将响应式Flux转换为阻塞Stream API。从Reactor3.2开始,在底层使用toIterable方法
- blockFirst方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
- blockLast方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在onError的情况下,它会在被阻塞的线程中抛出异常。
补充:
- blockFirst操作符和blockLast操作符具有方法重载,可用于设置线程阻塞的持续时间。这可以防止线程被无限阻塞。
- toIterable和toStream方法能够使用Queue来存储事件,这些事件可能比客户端代码阻塞Iterable或Stream更快到达。微批处理。
public static void main(String[] args) {
Flux.range(1, 5)
.delayElements(Duration.ofMillis(200)).toIterable().forEach(System.out::println);
List<String> collect = Flux.range(1, 5)
.delayElements(Duration.ofMillis(200)).toStream()
.map(e -> "stream:" + e)
.collect(Collectors.toList());
System.out.println(JSON.toJSONString(collect));
Flux.range(1, 5)
.delayElements(Duration.ofMillis(200))
.doOnEach(System.out::println).blockFirst();
Flux.range(10, 5)
.delayElements(Duration.ofMillis(200))
.doOnEach(System.out::println).blockLast();
}
结果:
15)materialize(物化和非物化)
使用materialize方法将流中的元素封装为Signa对象进行处理,使用dematerialize方法对Signa对象进行解封处理
@SneakyThrows
public static void main(String[] args) {
Flux.error(new IOException("error"))
.subscribe(e -> {
System.out.println("consumer==>" + e);
}, ex -> {
System.out.println("error==>" + ex);
});
Flux.error(new IOException("error2"))
.materialize()
.subscribe(e -> {
System.out.println("consumer==>" + e);
}, ex -> {
System.out.println("error==>" + ex);
});
Flux.error(new IOException("error2"))
.materialize()
.dematerialize()
.subscribe(e -> {
System.out.println("consumer==>" + e);
}, ex -> {
System.out.println("error==>" + ex);
});
System.in.read();
}
结果:
16)onErrorResume(错误处理)
onError信号是响应式流规范的一个组成部分,一种将异常传播给可以处理它的用户,但是,如果最终订阅者没有为onError信号定义处理程序,将会直接抛出异常。捕获异常主要有以下几种方式:
- subscribe操作符中的onError信号定义处理程序
- onErrorReturn操作符号捕获一个错误,并且用一个静态值或者根据异常类型选择静态值进行替换
- onErrorResume操作符捕获异常并执行额外的逻辑
- onErrorMap操作符捕获异常后,可以转化为一个新的异常
- retry可以在该响应流发生错误时,重新订阅该流,即再次执行相关的逻辑
@SneakyThrows
public static void main(String[] args) {
Flux.just(1, 2, 3, 5).flatMap(TestController::hand)
.subscribe(System.out::println, ex -> {
System.out.println("subscribe catch error:" + ex);
});
Thread.sleep(200);
System.out.println();
Flux.just(1, 2, 3, 5).flatMap(TestController::hand)
.onErrorReturn(0).subscribe(System.out::println);
Thread.sleep(200);
System.out.println();
Flux.just(1, 2, 3, 5).flatMap(TestController::hand)
.onErrorResume(e -> {
System.out.println("onErrorResume:" + e);
return Flux.just(0);
}).subscribe(System.out::println);
Thread.sleep(200);
System.out.println();
Flux.just(1, 2, 3, 5).flatMap(TestController::hand)
.onErrorMap(e -> {
System.out.println("onErrorMap:" + e);
return new RuntimeException("add onErrorMap" + e);
}).onErrorResume(ex -> {
System.out.println("onErrorResume:" + ex);
return Flux.just(0);
}).subscribe(System.out::println);
Thread.sleep(200);
System.out.println();
Flux.just(1, 2, 3, 5).flatMap(TestController::hand)
.retry(3)
.subscribe(System.out::println, ex -> {
System.out.println("retry catch error:" + ex);
});
System.in.read();
}
private static Flux<Integer> hand(Integer i) {
return Flux.defer(() -> {
if (i < 2) {
return Flux.just(i);
} else {
return Flux.error(new RuntimeException("error"));
}
});
}
结果:
17)defer(冷热数据流)
区别:
- 冷发布者行为方式:无论订阅者何时出现,都为该订阅者生成所有序列数据,没有订阅者就不会生成数据。每当订阅者出现时都会有一个新序列生成,而这些语义可以代表http请求。
- 热发布者行为方式:数据生成不依赖于订阅者而存在。因此,热发布者可能在第一个订阅者出现之前开始生成元素。
因此,当我们使用just工厂方法生成一个热发布者时,它的值只在构建发布者时计算一次,并且在新订阅者到达时不会重新计算。我们可以通过将just包装在defer中来将其转换为冷发行者。这样,即使just在初始化时生成值,这种初始化也只会在新订阅出现时发生。可以类比为一个lambda表达式,实例化时没有任何动作,只有当具体方法被调用时才会执行相关动作
@SneakyThrows
public static void main(String[] args) {
Flux<Integer> defer = Flux.defer(() -> {
System.out.println("defer hand");
return Flux.range(1, 5);
});
System.out.println("start");
defer.subscribe(System.out::println);
defer.subscribe(System.out::println);
System.in.read();
}
结果:
18)transform(组合&转换响应流)
当我们构建复杂的响应式工作流时,通常需要在几个不同的地方使用相同的操作符序列。transform操作符,可以将这些常见的部分提取到单独的对象中,并在需要时重用它们。transform操作符仅在流生命周期的组装阶段更新一次流行为,可以在响应式应用程序中实现代码重用。
区别于map,一个接收到的是T,一个是Flux
public static void main(String[] args) {
Function<Flux<Integer>, Flux<String>> function = f -> {
System.out.println("function");
return f.index()
.map(t -> t.getT1() + "=" + t.getT2());
};
Flux.range(1, 5)
.transform(function).subscribe(System.out::println);
System.in.read();
}
结果:
6、编程方式创建流
在实际的需求开发中,我们需要以一种更复杂的方法来在流中生成数据,或将对象的生命周期绑定到响应式流的生命周期中
1)push&create
push工厂方法能通过适配一个单线程生产者来编程创建Flux实例。create整体与push方法相同,都是起到桥接的作用,但是create能够支持不同线程发送的事件
@SneakyThrows
public static void main(String[] args) {
Flux.push(sink -> {
IntStream.range(10, 15)
.forEach(e -> {
if (e != 13) {
sink.next(e);
} else {
sink.onCancel(() -> System.out.println("cancel"));
}
});
})
.subscribe(e -> {
System.out.print(e + "\t");
});
Thread.sleep(1000);
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.print("\n" + "push == > ");
Flux.push(sink -> {
Runnable task = () -> {
for (int i = 10; i < 15; i++) {
sink.next(i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
sink.error(e);
}
}
};
for (int i = 0; i < 3; i++) {
executorService.submit(task);
}
}).subscribe(e -> {
System.out.print(e + "\t");
});
Thread.sleep(2000);
System.out.print("\n" + "create == > ");
Flux.create(sink -> {
Runnable task = () -> {
for (int i = 10; i < 15; i++) {
sink.next(i);
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
sink.error(e);
}
}
};
for (int i = 0; i < 3; i++) {
executorService.submit(task);
}
}).subscribe(e -> {
System.out.print(e + "\t");
});
System.in.read();
}
结果:
2)generate
generate工厂方法旨在基于生成器的内部处理状态创建复杂序列。
它可以使用一个初始值和一个函数,该函数可借助初始值的内部状态计算下一个状态,并将onNext信号发送给下游订订阅者
@SneakyThrows
public static void main(String[] args) {
AtomicInteger num = new AtomicInteger(1);
Flux.generate(sink -> {
sink.next("Hello");
if (num.get() == 3) {
sink.complete();
} else {
num.addAndGet(1);
}
}).subscribe(System.out::println);
// 创建一个斐波那契数列
Flux.generate(
() -> Tuples.of(1, 2),
(state, sink) -> {
sink.next(state.getT1() + state.getT2());
if (state.getT1() + state.getT2() > 50) {
sink.complete();
}
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
})
.subscribe(System.out::println);
System.in.read();
}
结果:
3)using-disposable
@SneakyThrows
public static void main(String[] args) {
Flux.using(
() -> {
System.out.println("create");
return Executors.newFixedThreadPool(3);
},
es -> Flux.range(1, 3),
(executorService -> {
System.out.println("shutdown");
executorService.shutdown();
})
).subscribe(System.out::println);
System.in.read();
}
结果:
4)usingWhen
基于usingWhen工厂包装响应式事务与using操作符类似,usingwhen操作符使我们能以响应式方式管理资源。区别在于using操作符会同步获取。usingWhen操作符响应式地获取受托管资源(通过订阅Pubisher的实例)。此外,usingWhen操作符接受不同的处理程序,以便应对主处理流终止的成功和失败。这些处理程序由发布者实现。
可以仅使用usingWhen一个操作符实现完全无阻塞的响应式事务。
@SneakyThrows
public static void main(String[] args) {
Flux.usingWhen(
Mono.fromSupplier(() -> {
System.out.println("create");
return Executors.newFixedThreadPool(1);
}),
es -> Flux.range(1, 3),
(executorService -> {
System.out.println("shutdown");
executorService.shutdown();
return Flux.empty();
})
).subscribe(System.out::println);
Flux.usingWhen(Mono.fromSupplier(() -> {
System.out.println("create");
return Executors.newFixedThreadPool(1);
}),
resource -> {
return Flux.concat(Flux.just(1, 2, 3),
Flux.error(new RuntimeException("Error")));
},
resource -> {
System.out.println("Completed successfully");
return Mono.fromRunnable(resource::shutdown);
},
resource -> {
System.out.println("Error occurred");
return Mono.fromRunnable(resource::shutdown);
},
resource -> {
System.out.println("Cancelled");
return Mono.fromRunnable(resource::shutdown);
})
.subscribe(System.out::println, System.out::println);
System.in.read();
}
结果: