(六)Reactive-Stream 响应式流
一、Reactive Streams
基于异步、消息驱动的全事件回调系统:响应式系统
Reactive Streams(响应式流)是一种处理异步数据流的规范和模式。它具有以下特点和优势:
-
异步非阻塞:能够在不阻塞线程的情况下处理数据,提高系统的吞吐量和响应能力。
-
背压处理:当生产者产生数据的速度快于消费者处理速度时,能够有效地进行背压控制,避免数据丢失或系统过载。
-
函数式风格:通常采用函数式编程的方式进行数据处理和转换,代码更易于理解和维护。
-
事件驱动:基于事件驱动的方式,能够灵活地应对各种数据变化和事件触发。
响应式流在现代的分布式系统、异步编程以及处理大量实时数据的场景中得到了广泛应用,例如在 Web 开发、物联网、大数据处理等领域,可以帮助开发者更高效地处理异步数据流,提升系统的性能和可靠性。
推荐阅读:
响应式宣言:https://www.reactivemanifesto.org/zh-CN
二、API Components
1. Publisher:发布者;产生数据流;
2. Subscriber:订阅者; 消费数据流;
3. Subscription:订阅关系;
订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。
4. Processor:处理器;
处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。
注意:这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。
1.创建发布者 SubmissionPublisher(Publisher的一个测试发布者类BaseSubscriber)
2.创建订阅者 Flow.Processor(创建处理器)
3.绑定关系 发布者.subscirber(订阅者)
4.发布者发布数据 (发布者.submit(数据))
public static class myProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String>{
}
三、编程思维转换
以前的编程模型在干什么?怎么编码?编码的时候注意什么问题?
function a(String arg[]){
//业务处理. 抛出异常
//数据返回
}
方法调用:
1、给他什么样的数据; 传参
2、怎么处理数据: 业务
3、处理后返回结果: 结果
哲学角度:
万物皆是数据,数据处理; 数据结构 + 算法 = 程序
一个数据/一堆数据(流) === 流操作 === 新数据/新流
四、背压
向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线速度,会对上游发送反馈信号一样。
在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。
中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。
这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。
背压模式:引入一个缓冲区(消息队列),就能实现全系统、全异步、不阻塞、不等待、实时响应。数据是自流动的,不靠迭代被动流动;
推拉模型:
推:流模式;上游有数据,自动推给下游
拉:迭代器;自己遍历,自己拉取数据;for循环
什么问题都可以评论区留言,看见都会回复的
如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区
多多支持吧!!!
点赞加藏评论,是对小编莫大的肯定。抱拳了!