当前位置: 首页 > article >正文

(六)Reactive-Stream 响应式流

一、Reactive Streams

 

基于异步、消息驱动的全事件回调系统:响应式系统

 Reactive Streams(响应式流是一种处理异步数据流的规范和模式。它具有以下特点和优势:

  • 异步非阻塞:能够在不阻塞线程的情况下处理数据,提高系统的吞吐量和响应能力。

  • 背压处理:当生产者产生数据的速度快于消费者处理速度时,能够有效地进行背压控制,避免数据丢失或系统过载。

  • 函数式风格:通常采用函数式编程的方式进行数据处理和转换,代码更易于理解和维护。

  • 事件驱动:基于事件驱动的方式,能够灵活地应对各种数据变化和事件触发。

        响应式流在现代的分布式系统、异步编程以及处理大量实时数据的场景中得到了广泛应用,例如在 Web 开发、物联网、大数据处理等领域,可以帮助开发者更高效地处理异步数据流,提升系统的性能和可靠性。

推荐阅读:

响应式宣言:https://www.reactivemanifesto.org/zh-CN

二、API Components

1. Publisher:发布者;产生数据流;

2. Subscriber:订阅者; 消费数据流;

3. Subscription:订阅关系;

         订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。

4. Processor:处理器;

         处理器是同时实现了发布者和订阅者接口的组件。它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

注意:这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。

1.创建发布者        SubmissionPublisher(Publisher的一个测试发布者类BaseSubscriber)

2.创建订阅者        Flow.Processor(创建处理器)

3.绑定关系            发布者.subscirber(订阅者)

4.发布者发布数据  (发布者.submit(数据))

public static class myProcessor extends SubmissionPublisher<String> implements Flow.Processor<String,String>{

}

三、编程思维转换

        以前的编程模型在干什么?怎么编码?编码的时候注意什么问题?

function a(String arg[]){
   //业务处理. 抛出异常
  //数据返回
}

方法调用:

        1、给他什么样的数据; 传参

        2、怎么处理数据: 业务

        3、处理后返回结果: 结果

哲学角度:

        万物皆是数据,数据处理; 数据结构 + 算法 = 程序

        一个数据/一堆数据(流) === 流操作 === 新数据/新流

四、背压

        向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线速度,会对上游发送反馈信号一样。

        在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。

        中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。

        这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

背压模式:引入一个缓冲区(消息队列),就能实现全系统、全异步、不阻塞、不等待、实时响应。数据是自流动的,不靠迭代被动流动;

推拉模型:

        推:流模式;上游有数据,自动推给下游

        拉:迭代器;自己遍历,自己拉取数据;for循环

什么问题都可以评论区留言,看见都会回复的

如果你觉得本篇文章对你有所帮助的,把“文章有帮助的”打在评论区

多多支持吧!!!

点赞加藏评论,是对小编莫大的肯定。抱拳了!


http://www.kler.cn/a/592149.html

相关文章:

  • 霍尔传感器与电流互感器的区别
  • 男女搭配(数学思维)
  • 如何实现一个bind函数?
  • electron桌面应用多种快速创建方法
  • PyTorch入门指南:环境配置与张量初探
  • 3.19学习总结 题+java面向对象
  • 程序化广告行业(28/89):基于用户旅程的广告策略解析
  • 第三:go 操作mysql
  • 前端iView面试题及参考答案
  • PMP项目管理—相关方管理篇—补充内容
  • 【系统架构设计师】操作系统 - 特殊操作系统 ③ ( 微内核操作系统 | 单体内核 操作系统 | 内核态 | 用户态 | 单体内核 与 微内核 对比 )
  • k8s学习记录(三):Pod基础-Node选择
  • python系列之元组(Tuple)
  • MySQL配置文件my.cnf详解
  • Java 代码优化技巧:场景与实践
  • 【HarmonyOS Next】鸿蒙中App、HAP、HAR、HSP概念详解
  • 2025年智能系统、自动化与控制国际学术会议(ISAC 2025)
  • 云原生边缘计算:分布式智能的时代黎明
  • 抖音碰一碰发视频系统源码搭建全攻略-碰一碰拓客系统oem搭建
  • RuoYi框架连接SQL Server时解决“SSL协议不支持”和“加密协议错误”