当前位置: 首页 > article >正文

响应式流规范

原文详见官网:Reactive Stream

Reactive Streams

​ Reactive Streams的目的是为异步流处理提供一个无阻塞背压的标准。最新版本可在Maven Central上获得:

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.3</version>
</dependency>
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck</artifactId>
  <version>1.0.3</version>
  <scope>test</scope>
</dependency>

Goals, Design and Scope

​ 在异步系统中,处理数据流(尤其是其容量未预先确定的“实时”数据)需要特别小心。最突出的问题是需要仔细控制资源消耗,以便快速数据源不会压倒流目的地。为了能够在协作网络主机或单个机器内的多个CPU核心上并行使用计算资源,需要异步。

​ Reactive Streams的主要目标是管理跨异步边界的流数据交换-考虑将元素传递到另一个线程或线程池-同时确保接收端不会被迫缓冲任意数量的数据。换句话说,背压是这个模型的一个组成部分,以便允许在线程之间进行调解的队列被限制。如果背压信号是同步的,异步处理的好处将被否定(另请参阅Reactive Manifesto),因此已经注意要求Reactive Streams实现的所有方面都具有完全非阻塞和异步行为。

​ 本说明书的目的是允许创建许多符合的实现,这些实现凭借遵守规则将能够平滑地互操作,从而在流应用的整个处理图上保留上述益处和特性。

​ 应该注意的是,流操纵(变换、分裂、合并等)的精确性质是不可预测的。本规范未涵盖。反应流只关心在不同的API组件之间调解数据流。在它们的开发过程中,已经注意确保可以表达组合流的所有基本方式。总之,Reactive Streams是JVM的面向流的库的标准和规范:

  • 处理可能无限数量的元素
  • 按顺序
  • 在组件之间异步传递元素
  • 强制性非阻塞背压

Reactive Streams规范由以下部分组成:

  1. API指定实现反应流的类型,并实现不同实现之间的互操作性。
  2. 技术兼容性工具包(TCK)是用于实现一致性测试的标准测试套件。
  3. 实现可以自由地实现规范中未涵盖的其他功能,只要它们符合API要求并通过TCK中的测试

API 组件

​ API由以下组件组成,这些组件需要由Reactive Stream实现提供:

  1. Publisher
  2. Subscriber
  3. Subscription
  4. Processor

​ 发布服务器是可能无限数量的序列元素的提供者,根据从其订阅服务器接收到的需求发布这些元素。响应于对 Publisher.subscribe(Subscriber) 的调用,对 Subscriber 上的方法的可能调用序列由以下协议给出:

onSubscribe onNext* (onError | onComplete)?

​ 这意味着onSubscribe总是被发送信号, 随后是可能无限数量的onNext信号(如Subscriber所请求的),如果存在故障,则随后是onError信号,或者当没有更多的元件可用时,则随后是onComplete信号-只要Subscription不被取消,所有这些都是如此。

注意事项
  • 以下规范使用来自https://www.ietf.org/rfc/rfc2119.txt的大写字母绑定词

术语

名词描述
Signal作为名词:onSubscribeonNextonCompleteonErrorrequest(n)cancel方法之一。作为一个动词:调用/调用信号。
Demand作为一个名词,订阅服务器请求的元素的聚合数量,但发布服务器尚未交付(满足)。作为一个动词,request-ing 更多元素的行为。
Synchronous(ly)在调用线程上执行。
Return normally只向调用方返回声明类型的值。向Subscriber发送失败信号的唯一法律的方法是通过onError方法。
Responsivity准备/响应能力。在本文件中用于表示不同的组件不应损害彼此的响应能力。
Non-obstructing描述在调用线程上尽可能快地执行的方法的质量。这意味着,例如,避免了繁重的计算和其他会使调用者的执行线程停止的事情。
Terminal state对于发布者:当发出onCompleteonError信号时。对于订阅者:当收到onCompleteonError时。
NOP对调用线程没有可检测到的影响的执行,因此可以安全地调用任意次数。
Serial(ly)在信号的上下文中,不重叠。在JVM的上下文中,对对象上的方法的调用是串行的,当且仅当这些调用之间存在happens-before关系(也意味着调用不重叠)。当异步执行调用时,将使用诸如但不限于原子、监视器或锁之类的技术来实现用于建立happens-before关系的协调。
Thread-safe可以安全地同步或异步调用,而不需要外部同步来确保程序的正确性。

规范

1. Publisher (Code)
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
IDRule
1onNext发送到PublisherSubscriber的总数必须始终小于或等于由SubscriberSubscription请求的元素的总数。
💡此规则的目的是明确发布者不能发送比订阅者请求的元素更多的元素。这条规则有一个隐含但重要的后果:由于请求只能在接收到之后才能被满足,因此请求元素和接收元素之间存在happens-before关系。
2一个Publisher MAY信号少于请求的onNext,并通过调用SubscriptiononComplete终止onError
💡该规则的目的是明确表示发布者不能保证它能够生成所请求的元素数量;它可能无法生成所有元素;它可能处于失败状态;它可能是空的或已经完成。
3发送给onSubscribeonNextonErroronCompleteSubscriber必须串行发送。
💡此规则的目的是允许信号的信令(包括来自多个线程的信号),前提是且仅当在每个信号之间的关系建立之前发生。
4如果Publisher失败,则必须发出onError信号。
💡该规则的目的是明确表示,如果发布者检测到无法继续,则有责任通知其订阅者-订阅者必须有机会清理资源或以其他方式处理发布者的故障。
5如果Publisher成功终止(有限流),则必须发送onComplete信号。
💡此规则的目的是明确发布者负责通知其订阅者它已到达终端状态-订阅者然后可以根据此信息采取行动;清理资源等。
6如果PublisheronError上发出onCompleteSubscriber信号,则必须考虑取消SubscriberSubscription
💡此规则的目的是确保无论订阅被取消、发布者发出onError或onComplete信号,订阅都得到相同的处理。
7一旦终端状态已被发信号(onErroronComplete),则要求不再出现其他信号。
💡此规则的目的是确保onError和onComplete是发布服务器和订阅服务器对之间交互的最终状态。
8如果一个Subscription被取消,它的Subscriber必须最终停止被发送信号。
💡该规则的目的是确保发布者在调用Subscription.cancel()时尊重订阅者取消订阅的请求。最终的原因是因为信号可能由于异步而具有传播延迟。
9Publisher.subscribe必须在提供的onSubscribe上调用Subscriber,然后再向Subscriber发出任何其他信号,并且必须正常返回,除非提供的Subscribernull,在这种情况下,它必须向调用者抛出java.lang.NullPointerException,对于所有其他情况,唯一法律的方式来发出失败信号(或拒绝Subscriber)是通过调用onError(在调用onSubscribe之后)。
💡此规则的目的是确保onSubscribe总是在任何其他信号之前发出信号,以便订阅服务器在收到信号时可以执行初始化逻辑。onSubscribe最多只能被调用一次,[见2.12]。如果提供的Subscribernull,那么除了向调用者发出信号之外,没有其他地方可以发出信号,这意味着必须抛出java.lang.NullPointerException。可能出现的情况示例:有状态发布服务器可能不堪重负、受限于有限数量的基础资源、资源耗尽或处于终止状态。
10Publisher.subscribe可以根据需要多次调用,但每次必须使用不同的Subscriber[见2.12]。
💡此规则的目的是让subscribe的调用方注意,不能假定通用发布服务器和通用订阅服务器支持多次附加。此外,它还要求无论调用多少次,都必须维护subscribe的语义。
11Publisher可以支持多个Subscriber,并决定每个Subscription是单播还是组播。
💡此规则的目的是使发布者实现能够灵活地决定它们将支持多少个订阅者(如果有的话),以及如何分发元素。
2. Subscriber (Code)
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
IDRule
1Subscriber MUST信号请求通过Subscription.request(long n)接收onNext信号。
💡该规则的目的是确定订阅者有责任决定何时以及能够并愿意接收多少元素。为了避免可重入的订阅方法导致的信号重新排序,强烈建议同步订阅服务器实现在任何信号处理的最后调用订阅方法。建议订阅者请求他们能够处理的内容的上限,因为一次只请求一个元素会导致固有的低效“停止并等待”协议。
2如果Subscriber怀疑它对信号的处理会对其Publisher的响应性产生负面影响,则建议它异步发送信号。
💡该规则的目的是,从执行的角度来看,订阅服务器不应阻碍发布服务器的进程。换句话说,订阅服务器不应使发布服务器接收不到CPU周期。
3Subscriber.onComplete()Subscriber.onError(Throwable t)不得调用SubscriptionPublisher上的任何方法。
💡此规则的目的是在处理完成信号期间防止发布者、订阅者和订阅者之间的循环和竞争条件。
4Subscriber.onComplete()Subscriber.onError(Throwable t)在收到信号后必须考虑取消订阅。
💡此规则的目的是确保订阅服务器遵守发布服务器的终端状态信号。订阅在收到onComplete或onError信号后不再有效。
5如果一个Subscriber已经有一个活动的Subscription.cancel(),那么在一个Subscription信号之后,它必须在给定的onSubscribe上调用Subscription
💡此规则的目的是防止两个或多个单独的发布服务器尝试与同一订阅服务器交互。强制执行此规则意味着可以防止资源泄漏,因为额外的订阅将被取消。不遵守此规则可能会导致违反发布者规则1等。这种违规行为可能导致难以诊断的错误。
6如果不再需要Subscriber,则必须调用Subscription.cancel()
💡此规则的目的是确定订阅者不能在不再需要订阅时将其丢弃,他们必须调用cancel,以便该订阅所持有的资源可以安全,及时地回收。例如,订阅服务器只对特定的元素感兴趣,然后订阅服务器会取消其订阅,以向发布服务器发出完成订阅的信号。
7订阅服务器必须确保对订阅的请求和取消方法的所有调用都是串行执行的。
💡此规则的目的是当且仅当每个调用之间建立了happens-before关系时,才允许调用request和cancel方法(包括从多个线程)。
8在调用了Subscriber之后,如果仍有请求的元素挂起,则必须准备好接收一个或多个onNext信号[见3.12]。Subscription.cancel()不保证立即执行底层清洁操作。
💡此规则的目的是强调在调用cancel和发布者观察到取消之间可能存在延迟。
9一个Subscriber必须准备好接收一个带有或不带有前一个onComplete呼叫的Subscription.request(long n)信号。
💡此规则的目的是确定完成与需求流无关-这允许流提前完成,并避免轮询完成的需要*。*
10一个Subscriber必须准备好接收一个带有或不带有前一个onComplete呼叫的Subscription.request(long n)信号。
💡此规则的目的是确定发布服务器故障可能与信号请求完全无关。这意味着订阅服务器不需要轮询以确定发布服务器是否无法满足其请求。
11一个Subscriber必须确保所有对它的信号方法的调用都发生在处理相应的信号之前。也就是说,订阅服务器必须负责将信号正确发布到其处理逻辑。
💡此规则的目的是确定订阅服务器实现有责任确保其信号的异步处理是线程安全的。参见第17.4.5节中Happens-Before的JMM定义。
12对于给定的Subscriber.onSubscribeSubscriber必须最多调用一次(基于对象相等性)。
💡此规则的目的是确定必须假设同一订阅者最多只能订阅一次。注意object equalitya.equals(b)
13调用onSubscribeonNextonErroronComplete必须正常返回,除非任何提供的参数是null,在这种情况下,它必须向调用者抛出java.lang.NullPointerException,对于所有其他情况,Subscriber发出失败信号的唯一法律的方法是取消其Subscription。如果违反了这个规则,任何与SubscriptionSubscriber相关的都必须被认为是取消的,并且调用者必须以适合运行时环境的方式提出这个错误条件。
💡此规则的目的是为订阅服务器的方法建立语义,以及在违反此规则的情况下允许发布服务器执行的操作。«以适合运行时环境的方式引发此错误条件»可能意味着记录错误-或以其他方式使某人或某物意识到情况-因为错误无法向出错的订阅服务器发出信号。
3. Subscription (Code)
public interface Subscription {
    public void request(long n);
    public void cancel();
}
IDRule
1Subscription.requestSubscription.cancel只能在其Subscriber上下文中调用。
💡此规则的目的是建立订阅代表订阅服务器和发布服务器之间的唯一关系[参见2.12]。订阅服务器控制何时请求元素以及何时不再需要更多元素。
2Subscription必须允许SubscriberSubscription.requestonNext中同步调用onSubscribe
💡这条规则的目的是明确request的实现必须是可重入的,以避免在requestonNext(以及最终的onComplete / onError)之间相互递归的情况下出现堆栈溢出。这意味着发布者可以是synchronous,即在调用onNext的线程上发送request的信号。
3Subscription.request必须在PublisherSubscriber之间的可能同步递归上设置一个上限。
💡这条规则的目的是通过对requestonNext(最终是onComplete / onError)之间的相互递归设置上限来补充[见3.2]。建议实现将这种相互递归限制在1(ONE)的深度-为了节省堆栈空间。一个不受欢迎的同步开放递归的例子是Subscriber.onNext -Subscriber. request-Subscriber.onNext -…,因为它会导致调用线程的堆栈溢出。
4Subscription.request应尊重来电者的回应,及时回复。
💡这个规则的目的是建立request是一个非阻塞的方法,并且应该在调用线程上尽可能快地执行,因此避免繁重的计算和其他会使调用者的执行线程停止的事情。
5Subscription.cancel必须通过及时返回来尊重其调用者的响应性,必须是幂等的,并且必须是线程安全的。
💡这个规则的目的是建立cancel是一个非阻塞的方法,并且应该在调用线程上尽可能快地执行,因此避免繁重的计算和其他会使调用者的执行线程停止的事情。此外,也很重要的是,可以多次调用它而不会产生任何不良影响。
6Subscription被取消后,额外的Subscription.request(long n)必须是NOP。
💡这一规则的目的是在取消订阅和随后请求更多元素的不操作之间建立因果关系。
7Subscription被取消后,额外的Subscription.request(long n)必须是NOP。
💡本规则的意图由3.5取代。
8Subscription没有被取消时,Subscription.request(long n)必须向相应的订阅者注册要产生的给定数量的附加元素。
💡此规则的目的是确保request-ing是一个加法操作,并确保将对元素的请求传递给发布服务器。
9当Subscription未被取消时,如果参数<=0,Subscription.request(long n)必须用java.lang.IllegalArgumentException发出onError信号。原因消息应该解释非正请求信号是非法的。
💡此规则的目的是防止错误的实现在没有引发任何异常的情况下继续操作。请求负数或0个元素,因为请求是累加的,很可能是代表订阅服务器的错误计算的结果
10Subscription没有被取消时,Subscription.request(long n)可以同步调用这个(或其他)用户上的onNext
💡此规则的目的是建立允许创建同步发布者,即在调用线程上执行其逻辑的发布者。
11Subscription未被取消时,Subscription.request(long n)可以同步调用此(或其他)订户上的onCompleteonError
💡此规则的目的是建立允许创建同步发布者,即在调用线程上执行其逻辑的发布者。
12Subscription没有被取消时,Subscription.cancel()必须请求Publisher最终停止发送其Subscriber。该操作不需要立即影响Subscription
💡此规则的目的是建立取消订阅的愿望最终得到发布者的尊重,承认在收到信号之前可能需要一些时间。
13Subscription没有被取消时,Subscription.cancel()必须请求Publisher最终删除对相应订阅者的任何引用。
💡此规则的目的是确保订阅服务器在其订阅不再有效后可以正确地进行垃圾收集。不鼓励重新订阅同一个Subscriber对象[参见2.12],但本规范并没有强制禁止它,因为这意味着必须无限期地存储先前取消的订阅。
14Subscription没有被取消时,调用Subscription.cancel可能会导致Publisher(如果有状态)转换到shut-down状态,如果此时不存在其他Subscription[见1.9]。
💡此规则的目的是允许发布者响应于来自现有订阅者的取消信号,在新订阅者的onComplete之后发送onErroronSubscribe信号。
15调用Subscription.request必须正常返回。
💡该规则的目的是不允许实现在响应cancel被调用时抛出异常。
16调用Subscription.request必须正常返回。
💡该规则的目的是不允许实现在响应cancel被调用时抛出异常。
17Subscription必须支持对request的无限数量的调用,并且必须支持高达263-1(`java.lang.Long.MAX_VALUE`)的需求。等于或大于263-1(java.lang.Long.MAX_VALUE)的需求可能被Publisher视为“有效无界”。
💡此规则的目的是建立订阅服务器可以在任何数量的request调用中请求无限数量的元素,以大于0的任何增量[参见3.9]。由于在合理的时间内(每纳秒1个元素将需要292年),使用当前或可预见的硬件无法实现满足2^63-1的需求,因此允许发布者停止跟踪超过此点的需求。

Subscription由一个Publisher和一个Subscriber共享,目的是调解这对之间的数据交换。这就是为什么subscribe()方法不返回创建的Subscription,而是返回void;Subscription只通过Subscriber回调传递给onSubscribe

4.Processor (Code)
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
IDRule
1Processor代表一个处理阶段,它既是Subscriber也是Publisher,必须遵守两者的契约。
💡此规则的目的是确定处理器的行为符合发布服务器规范和订阅服务器规范,并受其约束。
2Processor可以选择恢复onError信号。如果它选择这样做,它必须认为Subscription被取消,否则它必须立即将onError信号传播到它的订户。
💡这条规则的目的是告知实现可能不仅仅是简单的转换

​ 虽然不是强制性的,但当/如果它的最后一个Processor取消了它们的Subscription时,取消Subscriber的上游Subscription可能是一个好主意, 以使抵消信号向上游传播。

异步与同步处理

​ Reactive Streams API规定所有元素(onNext)或终止信号(onErroronComplete)的处理不得阻塞Publisher。但是,每个on*处理程序都可以同步或异步处理事件。示例:

nioSelectorThreadOrigin map(f) filter(p) consumeTo(toNioSelectorOutput)

​ 它有一个双循环的起点和一个双循环的终点。让我们假设起点和终点都是选择器事件循环。Subscription.request(n)必须从目的地链接到原点。现在,每个实现都可以选择如何做到这一点。下面的代码使用管道字符|来表示线程边界(队列和调度),使用R#来表示资源(可能是线程)。

nioSelectorThreadOrigin | map(f) | filter(p) | consumeTo(toNioSelectorOutput)
-------------- R1 ----  | - R2 - | -- R3 --- | ---------- R4 ----------------

​ 在这个例子中,3个消费者中的每一个,mapfilterconsumeTo异步地调度工作。它可以在同一个事件循环(蹦床),单独的线程,无论什么。

nioSelectorThreadOrigin map(f) filter(p) | consumeTo(toNioSelectorOutput)
------------------- R1 ----------------- | ---------- R2 ----------------

​ 这里,通过向NioSelectorOutput事件循环添加工作,异步调度的只是最后一步。mapfilter步骤在源线程上同步执行。或者另一种实现可以将操作融合到最终消费者:

nioSelectorThreadOrigin | map(f) filter(p) consumeTo(toNioSelectorOutput)
--------- R1 ---------- | ------------------ R2 -------------------------

​ 所有这些变体都是“异步流”。它们都有自己的位置,每个都有不同的权衡,包括性能和实现复杂性。

​ Reactive Streams契约允许实现灵活地管理资源和调度,并在非阻塞,异步,动态推拉流的范围内混合异步和同步处理。为了允许所有参与API元素的完全异步实现-Publisher/Subscription/Subscriber/Processor-这些接口定义的所有方法都返回void

队列边界

​ 一个基本的设计原则是,所有的缓冲区大小是有界的,这些界限必须是已知的,并由用户控制。这些界限用元素计数来表示(这反过来又转换为onNext的调用计数)。任何旨在支持无限流(特别是高输出速率流)的实现都需要始终沿着边界,以避免内存不足错误并限制资源使用。

​ 由于背压是强制性的,因此可以避免使用无界缓冲器。一般来说,队列可能无限制增长的唯一情况是发布方在很长一段时间内保持比订阅方更高的速率,但这种情况是由反压处理的。队列边界可以由订阅者用信号通知对适当数量的元素的需求来控制。在任何时间点,订阅者都知道:

  • 请求的元素总数:P
  • 已处理的元素数:N

​ 那么,可以到达的元素的最大数量是P - N,直到向发布者发出更多的需求信号。在订户也知道其输入缓冲器中的元素B的数量的情况下,则该界限可以被细化为P - B - N

​ 发布者必须尊重这些界限,而不管它所代表的源是否可以被反压。如果源的生产率不受影响(例如时钟滴答声或鼠标移动),发布者必须选择缓冲或删除元素以遵守强加的边界。

​ 在接收到一个元素之后,用信号通知对一个元素的需求的订户有效地实现了停止和等待协议,其中需求信号等同于确认。通过提供多个要素的需求,确认成本被摊销。值得注意的是,允许订阅者在任何时间点发出需求信号,从而避免发布者和订阅者之间不必要的延迟(即保持其输入缓冲区充满,而不必等待完整的往返)。


http://www.kler.cn/a/316810.html

相关文章:

  • 小面馆叫号取餐流程 佳易王面馆米线店点餐叫号管理系统操作教程
  • 生成 Django 中文文档 PDF 版
  • 43.第二阶段x86游戏实战2-提取游戏里面的lua
  • 软件测试——认识测试
  • SSE (Server-Sent Events) 服务器实时推送详解
  • 速通LoRA:《LoRA: Low-Rank Adaptation of Large Language Models》全文解读
  • 胤娲科技:谷歌DeepMind祭出蛋白质设计新AI——癌症治疗迎来曙光
  • DoppelGanger++:面向数据库重放的快速依赖关系图生成
  • JavaScript语法特点
  • linux 使用mdadm 创建raid0 nvme 磁盘
  • 深入理解SpringBoot(一)----SpringBoot的启动流程分析
  • 邮储银行:面向金融行业的移动应用安全风险监测案例
  • 【docker】命令之容器操作
  • C++:布尔类型,引用,堆区空间
  • 力扣232:用栈实现队列
  • 【Proteus仿真】基于51单片机的宠物喂食系统设计
  • JSON合并工具
  • JVM-类加载器的双亲委派模型详解
  • 前后端数据交互 笔记03(get和post方法)
  • 使用 Azure Functions 开发 Serverless 应用:详解与实战
  • LeetCode 1014. 最佳观光组合 一次遍历数组,时间复杂度O(n)
  • 【matlab】将程序打包为exe文件(matlab r2023a为例)
  • Linux文件IO(三)-Linux系统如何管理文件
  • 【基础知识】网络套接字编程
  • QT-MOC元对象系统详解
  • 【小程序】微信小程序课程 -1 安装与配置