当前位置: 首页 > article >正文

reactor框架使用时,数据流请求流程

1. 我们在Flux打开时,可以看到

      public abstract class Flux<T> implements CorePublisher<T> {

2. 

public interface CorePublisher<T> extends Publisher<T> {
    void subscribe(CoreSubscriber<? super T> subscriber);
}

Publisher的关键时有个subscribe方法。这个方法就是在reactor的subscribe的时候会调用到这里。

3. 这里subscribe的参数有个CoreSubscriber。其实这个方法基本上最终其实会调用到CoreSubscriber的onSubscribe方法。

随便看对void subscribe(CoreSubscriber<? super T> subscriber);的方法的实现

选择Flux举例。看到最终调用结果:

4. 那么onSubscribe怎么调用下游呢?

注意,void onSubscribe(Subscription s);里面又传入了Subscription。

public interface Subscription {
        public void request(long n);
    public void cancel();
}

以Flux.just(1),为例,其实所有的onSubscribe方法会调用到Subscription的request方法。

5. request方法,最后调用到了onnext方法

所以数据流程如下:

subscribe()->subscribe(CoreSubscriber<? super T> subscriber)->CoreSubscriber.onSubscribe->Subscription.request(n)->CoreSubscriber.onNext()

6. 因此,onNext的调用前,数据的准备可以在Subscription.request的方法逻辑中内部进行准备,当准备好了再调用onNext方法。如果是分批request的,也就是说CoreSubscriber.onSubscribe逻辑中是分批次调用Subscription.request(n),则每个批次的Subscription.request(n)中都可以等待数据好了再调用onNext方法。

7. 也就是说,数据调用onNext之前,都可以准备好再调用。但是一旦onNext调用以后,就尽量不能阻塞住后续流程了。如果后续流程中有阻塞的情况,就要用publishon和subscribeon了,让阻塞的内容在单独的线程池中执行。

8. 对zipWith方法的理解。摘自chatgpt。不清楚是否正确

如果某个 Flux 中的数据项尚未准备好,zipWith 会挂起合并操作,直到另一个 Flux 中的数据也准备好为止。只要 zipWith 中的两个流的每一对数据项都准备好了,它才会触发 onNext()

因此这里其实Reactor框架其实netty线程还是在做其他的事情,当都准备好了,才会利用netty线程,进行onNext的处理


http://www.kler.cn/a/518347.html

相关文章:

  • 非根目录部署 nextjs 项目,资源文件 请求404 的问题
  • 《CPython Internals》阅读笔记:p360-p377
  • C# OpenCV机器视觉:利用CNN实现快速模板匹配
  • 延迟之争:LLM服务的制胜关键
  • 连接 OpenAI 模型:基础操作
  • 【C++】类与对象初级应用篇:打造自定义日期类与日期计算器(2w5k字长文附源码)
  • 前端性能优化 — 保姆级 Performance 工具使用指南
  • python生成图片和pdf,快速
  • 【Uniapp-Vue3】图片lazy-load懒加载
  • Alfresco Content Services docker自动化部署操作
  • flatten-maven-plugin 统一版本管理插件
  • 大厂案例——腾讯蓝鲸DevOps类应用的设计与实践
  • Unity URP 获取/设置 Light-Indirect Multiplier
  • 考研机试题:打印日期
  • 健康AI应用的逆袭:如何用“死亡时钟”撬动用户增长和媒体关注,实现应用榜快速排名第六
  • 【数据结构】_不带头非循环单向链表
  • 安全扫描Django项目解决存在敏感信息常见问题
  • redis主从集群中的哨兵机制
  • 探索 Web3 技术:如何推动数字身份的自主管理
  • 第4章 神经网络【1】——损失函数
  • css-设置元素的溢出行为为可见overflow: visible;
  • SpringBoot集成Flink-CDC,实现对数据库数据的监听
  • 解锁罗技键盘新技能:轻松锁定功能键(罗技K580)
  • NFT Insider #166:Nifty Island 推出 AI Agent Playground;Ronin 推出1000万美元资助计划
  • jQuery阶段总结(二维表+思维导图)
  • Vue 3 30天精进之旅:Day 03 - Vue实例