Kotlin 协程基础九 —— SharedFlow 与 StateFlow
SharedFlow 与 StateFlow 是 Flow 的两个变种,它们把 Flow 的功能从数据流的收集改成了事件流和状态的订阅,这就把适用场景切换到了一个非常实用的范围。
1、SharedFlow 的效果与适用场景
SharedFlow 是一种特殊的 Flow,而 StateFlow 是一种特殊的 SharedFlow,因此我们要先来看 SharedFlow。
事件订阅就是一种特殊类型的数据收集,用数据收集功能是可以实现出数据订阅功能的。事件订阅的 API,Flow 已经有了,就是 SharedFlow。
前面讲过一个 launchIn 操作符,它可以立即用指定的 CoroutineScope 启动一个协程,然后在该协程内调用这个 Flow 的 collect() 启动收集流程。类似地,还有一个 shareIn 操作符也会启动协程,在协程里调用 Flow 的 collect()。只不过,它与 launchIn 有两点区别:
- shareIn 并不是一定在第一时间就启动 Flow 的收集,收集的启动时间是通过参数指定的
- shareIn 除了会在内部启动协程和收集 Flow 的数据之外,还会创建一个新的 Flow 并返回,返回值类型为 SharedFlow。这个 SharedFlow 会拿到上游被启动的 Flow 的数据,作为一个转发者将上游 Flow 的数据转发给收集它的 FlowCollector
示例代码:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
val job = scope.launch {
sharedFlow.collect {
println("SharedFlow: $it")
}
}
job.join()
}
sharedFlow 其实自己不会生产数据,只是将上游 Flow 里的每条数据都转发给它的 FlowCollector,也就是 collect() 后的大括号参数。并且,在它被多次调用 collect() 时,每个 collect() 各自的 FlowCollector 都会收到统一上游的 Flow,也就是 flow 里发送的数据。这意味着,发送流程与数据收集流程被分开了。从这一点看,SharedFlow 相比于传统 Flow,更像 Channel(Channel 就是数据发送与收集分开的)。但与 Channel 不同的是,SharedFlow 不是瓜分式的,每条数据都能发送到每一个进行中的 collect() 中:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
val job1 = scope.launch {
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3
可以看到,两个 sharedFlow 在两个协程中都接收到 flow 的完整数据。
普通的 Flow 是每次调用 collect() 时都完整的跑一次流程,从这些独立的流程里分别发送数据;而 SharedFlow 只跑一次流程,每次 collect() 时都统一发送这个流里的数据。用代码演示这种区别:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
val job1 = scope.launch {
delay(500)
flow.collect {
println("Flow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
delay(1000)
flow.collect {
println("Flow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
Flow in Coroutine 1: 1
Flow in Coroutine 2: 1
Flow in Coroutine 1: 2
Flow in Coroutine 2: 2
Flow in Coroutine 1: 3
Flow in Coroutine 2: 3
可以看到,虽然在收集之前用 delay 增加了延时,但对于普通的 Flow 而言,只是延后了 Flow 收集数据的起始时间,所有数据还是能正常收到的。但倘若给 SharedFlow 增加收集的延时:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
val job1 = scope.launch {
// 延时 500ms 再开始收集
delay(500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
// 延时 1000ms 再开始收集
delay(1000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3
可以看到,由于收集晚了,导致 Coroutine 1 内的 SharedFlow 没能拿到数据 1,Coroutine 2 内的 SharedFlow 没能拿到数据 2、3。根本原因是调用 shareIn() 时会启动上游开始发射数据,下游调用延时,使得收集过程在某些数据已经发射出去之后才开始,因此收集不到这些已经发出的数据。
在 Kotlin 官方说法里,Channel 是热的,Flow 是冷的(前面说过,Channel 数据的发送与读取是相互独立的,不读取也会发送数据,因此叫热流;Flow 只在调用 collect() 被调用时才会启动数据发送流程,因此叫冷流)。而 SharedFlow 是一种特殊的 Flow,它是热的。前面的演示代码已经说明了,SharedFlow 的活跃状态与它是否正在被调用 collect() 来收集数据是无关的,其活跃状态是独立的。
但抛开官方的说法,思考技术本质,SharedFlow 实际上也是冷的,它也是在被调用了 collect() 之后才会启动自己的数据流,只不过它的数据生产逻辑比较特别。它是把上游的 Flow 转发给下游,而上游 Flow 的流程与下面的 SharedFlow 的 collect() 是否调用是无关的。这样造成的现象就是看起来好像 SharedFlow 是在被数据收集之前就已经启动数据流了,但在技术上,它也是在调用了 collect() 之后才启动的。真正独立启动的是它依赖的上游的 Flow 的数据流,因此从技术本质的角度来讲,SharedFlow 还是冷的,只不过看起来和用起来像热的。
又举了一个代码示例说明、验证上面的话,看的费劲,先略过……
shareIn() 的适用场景:
- 数据流的共享:一个 Flow 如果想要被多次收集的时候都可以共享相同的数据生产流程,可以使用 shareIn() 将 Flow 转换成 SharedFlow
- 数据生产的提前启动:比如 Flow 的生产流程中包含耗时的初始化工作,你不希望在 collect() 的时候再开始初始化,而想让它提前启动,这样在 collect() 时就不用等待初始化过程的耗时了,此时也可以使用 shareIn() 将 Flow 转换成 SharedFlow 来达到这种目的
- 事件流订阅:最典型的应用场景,后续解释
使用场景的根本需求都是对流程的分拆。流程分拆了,你就可以在下游多次收集的时候都从上游的同一个生产流程里拿数据;流程分拆了,你就可以在下游还没开始收集的时候启动上游的生产。
伴随着流程分拆,SharedFlow 可能会漏数据。正常的 Flow 都是从第一条开始收集的,但是前面举得例子也能看到,因为生产流程是独立的,所以在生产之后才开始收集的话,就会漏掉之前的数据。因此 SharedFlow 还有一个限制,就是适用于对从头开始收集数据没有需求的场景,比如事件订阅。SharedFlow 最典型的应用场景就是事件流,Kotlin 官方将 SharedFlow 的活跃的 FlowCollector 称为订阅者。
总结,SharedFlow 的效果就是将数据流的生产与消费分拆了,这个效果可以让它满足各种特殊的需求。比如数据流的共享,比如提前启动生产等等。当然,由于流程的分拆,它也可能会漏掉早期发送的数据。一句话总结,SharedFlow 比较适用于事件流订阅的业务需求。
2、shareIn 操作符
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
shareIn() 有三个参数:
- scope 用于指定从哪个协程上下文中启动协程,与 launchIn() 的参数含义相同
- started:控制共享何时开始和停止的策略
- replay:重新发送给新订阅者的值的数量(不能为负,默认为零)
由于第二个参数会用到第三个参数,因此我们先来说第三个参数 replay。
replay 参数的作用类似于 buffer 操作符,但又不完全一样。buffer() 的主要应用场景是缓存上游发射出的、不能被下游及时消费掉的数据。假如我们对两个 SharedFlow 做收集前的延时,让它们漏掉一些上游发送的数据:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly)
val job1 = scope.launch {
// 延时 500ms 错过 1
delay(500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
// 延时 1000ms 错过 1 和 2
delay(1000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
那么 job1 内的 sharedFlow 会漏掉数据 1,job2 内的 sharedFlow 会漏掉数据 1、2:
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3
设置 replay 为 1,相当于为 flow 设置了一个容量为 1 的缓冲区,这样下游可以收集到更多数据了:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}.buffer(2)
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly, 1)
val job1 = scope.launch {
delay(500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
delay(1000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行代码发现每个 sharedFlow 都能多接收 1 条数据:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3
这是 replay 与 buffer() 相似的部分。而不同的部分,我们给 job2 的延时增加到 5000ms,也就是上游数据生产完毕,job1 的 sharedFlow 也消费完毕后再开始收集数据:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}.buffer(2)
val sharedFlow = flow.shareIn(scope, SharingStarted.Eagerly, 1)
val job1 = scope.launch {
delay(500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
delay(5000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 3
可以看到 job2 内的 sharedFlow 仍然收到了一条数据,这就是 replay 的另一个作用:对于已经消费过的数据,也依然缓冲下来,用来给后面新订阅的 collect() 用。这里如果设置 replay 为 0,那么 job2 内的 sharedFlow 就收不到数据;如果设置为 2,就能收到 2、3 两条数据;如果设置为 3,就能收到 1、2、3 三条数据。
buffer() 这个缓冲的定义是来不及消费的、生产过快的数据,而像 replay 这种用过了还不扔,要重复利用的,应该叫 cache,也就是缓存。也就是说,replay 兼具缓冲与缓存功能。
第二个参数 SharingStarted 用来设置 shareIn() 所提供的 SharedFlow 背后的用于生产数据的 Flow 的启动时间,它有三个可选方案:
- Eagerly 表示立即启动,也就是在 shareIn() 被调用,创建 SharedFlow 对象的同时,启动上游生产数据的 Flow
- Lazily 表示第一个订阅者调用 collect() 的时候才启动上游 Flow
- WhileSubscribed() 设置上游数据流结束和重启的规则,是一种复杂化的 Lazily,不仅是在第一次订阅的时候启动上游数据流,还会在下游所有的订阅全部结束之后,把上游 Flow 的生产流程也结束掉。这之后如果再来新的订阅,它会重新启动上游的数据流
WhileSubscribed() 是一个函数,它可以通过参数控制具体的规则,在介绍这些参数之前,我们先通过一个例子介绍 SharedFlow 的结束:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.WhileSubscribed(), 2)
val job1 = scope.launch {
delay(500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
delay(5000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 2: 3
运行结果似乎并不像预期的那样,Coroutine 2 内的 sharedFlow 的收集工作是在上游 flow 发射结束且 Coroutine 1 内的 sharedFlow 结束收集之后才开始的。按照 WhileSubscribed() 的功能描述,此时 flow 应该在 Coroutine 1 内的 sharedFlow 结束收集之后停止,然后在 Coroutine 2 内的 sharedFlow 调用 collect() 时重启开启,收到的数据也应该是所有数据 1、2、3,但是现在只有 2、3。
这是因为 SharedFlow 的订阅流程是不会自动结束的,即它的 collect() 不会随着上游的 Flow 的生产过程的结束而一起结束。它会一直吊在 collect() 代码的结尾处,其返回值类型为 Nothing:
public interface SharedFlow<out T> : Flow<T> {
public val replayCache: List<T>
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
返回值是 Nothing 表示它永远都不会返回了,要么抛异常,要么永远运行下去。虽然是这样,但是并不意味着它无法结束,当 collect() 这个挂起函数所在的协程被取消时,该函数也“有望”被结束。当然,我们要再强调一次,协程的取消是结构化取消,也是协作式的取消。挂起函数所在的协程被取消并不一定强制地结束挂起函数,而是需要挂起函数在内部配合外部的取消操作。SharedFlow 作为协程官方 API,自然是对这一点进行了适配的:
internal open class SharedFlowImpl<T>(
private val replay: Int,
private val bufferCapacity: Int,
private val onBufferOverflow: BufferOverflow
) : AbstractSharedFlow<SharedFlowSlot>(), MutableSharedFlow<T>, CancellableFlow<T>, FusibleFlow<T> {
@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
val collectorJob = currentCoroutineContext()[Job]
while (true) {
var newValue: Any?
while (true) {
newValue = tryTakeValue(slot) // attempt no-suspend fast path first
if (newValue !== NO_VALUE) break
awaitValue(slot) // await signal that the new value is available
}
// 如果 isActive 为 false 就抛出 CancellationException
collectorJob?.ensureActive()
collector.emit(newValue as T)
}
} finally {
freeSlot(slot)
}
}
}
所以,我们小结一下:SharedFlow 不会随着上游数据流的结束而返回,而是永远不返回。但它会随着外部协程的取消而抛出 CancellationException 结束。
好的,现在我们切回 WhileSubscribed() 的话题,顺便验证关于 SharedFlow 结束的结论是否正确:
private fun sharedFlowSample() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val flow = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
}
val sharedFlow = flow.shareIn(scope, SharingStarted.WhileSubscribed(), 2)
val job1 = scope.launch {
val parent = this
launch {
// 在 4s 时结束父协程 job1 以触发 WhileSubscribed() 的机制
delay(4000)
parent.cancel()
}
delay(1500)
sharedFlow.collect {
println("SharedFlow in Coroutine 1: $it")
}
}
val job2 = scope.launch {
delay(5000)
sharedFlow.collect {
println("SharedFlow in Coroutine 2: $it")
}
}
joinAll(job1, job2)
}
运行结果:
SharedFlow in Coroutine 1: 1
SharedFlow in Coroutine 1: 2
SharedFlow in Coroutine 1: 3
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 2: 3
SharedFlow in Coroutine 2: 1
SharedFlow in Coroutine 2: 2
SharedFlow in Coroutine 2: 3
可以看到,这一次由于在 Coroutine 1 内启动的子协程在第 4 秒时取消了 Coroutine 1,导致 collect() 被取消,这样由于 WhileSubscribed() 的作用,flow 也在 Coroutine 1 的 sharedFlow 结束后而结束。接着 Coroutine 2 内的 sharedFlow 调用 collect() 时才会重启 flow 收到数据 1、2、3,当然 flow 重启没有丢掉缓存的数据,因此 replay 配置的 2 仍然有效,在收到 flow 重启发出的 1、2、3 之前,会收到缓存的 2 和 3。
另外还有一点,在设置 WhileSubscribed() 之后,如果所有 collect() 都结束了,这个结束不仅会导致下一个新到来的 collect() 重启上游的生产,还会导致上游正在进行中的生产过程立即结束,因为它已经没用了,为了节省资源,就将其立即结束掉。
最后来说 WhileSubscribed() 的参数:
@Suppress("FunctionName")
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
WhileSubscribed 的两个参数都起一个延时的作用:
- stopTimeoutMillis:对于结束的延时。在所有 collect() 都结束之后,延时 stopTimeoutMillis 指定的时长,才判定为真正的结束
- replayExpirationMillis:缓存失效时间(仅针对缓存,不针对缓冲)。在最后一个 collect() 结束,并且设置的 stopTimeoutMillis 也到时后,如果再过 replayExpirationMillis 这么长的时间还没有新的 collect() 被调用,刚才之前缓存的数据才会被丢弃掉,缓存会被清空
3、MutableSharedFlow
SharedFlow 把应用场景从数据流的收集收窄到事件流的订阅,所以它很自然地就有了一个额外的需求 —— 从外部发送数据。由此带来两个问题:
- 为什么需要外部数据源
- 如何实现外部数据源发射数据
内部外部指的是 flow {…} 的内部还是外部。在 flow {…} 内部由于含有 FlowCollector 这个 this,因此可以直接调用 emit() 来发送数据。但此前我们学过的知识中,是没办法在 flow {…} 外部调用 emit() 发射数据的。
事件流有一个典型场景是 UI 事件的订阅,比如用户订阅按钮点击事件。假如现在有一个 SharedFlow 对象,可以从它的点击监听回调中调用一下 emit(),而不是像现在这样只能从上游的 Flow 里把事件发送出来。这是一个很正常的需求,但是 Flow 没有,SharedFlow 本身也没有,只不过可以通过调用 MutableSharedFlow() 来创建一个 MutableSharedFlow 对象以实现该需求。
MutableSharedFlow 是 SharedFlow 和 FlowCollector 的子接口:
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T>
因此可以使用 FlowCollector 的 emit() 发射数据。
为什么需要从外部发送数据?因为事件流与数据流不同,从外部发送数据本来就是个正常的需求,比如用户点击。
MutableSharedFlow() 就是除了 shareIn() 之外,另一种创建 SharedFlow 的方式。shareIn() 用于把普通 Flow 转换为 SharedFlow,而 MutableSharedFlow() 是直接创建一个 MutableSharedFlow 对象出来,可以用任何的外部数据源通过调用 emit() 发送数据。因此它不会限制你有一个数据源。
MutableSharedFlow() 创建了一个更灵活、更强大的 SharedFlow 对象。shareIn() 内部实际上也是使用了 MutableSharedFlow() 来产生 SharedFlow 对象:
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
MutableSharedFlow 的优点是可以从外部发送数据,且可以有多个数据源,但似乎不能像 SharedFlow 那样从 flow() 内部发送数据了,这算是一个缺点吗?
啰嗦几把一大堆,最后说想表达的核心意思是 MutableSharedFlow 对 shareIn() 完全是功能范围增加,而不是功能范围收窄……语言表达能力真的差,绕来绕去说出这么个美味的屁话,傻逼
flow() 为什么只能从内部发送数据?技术角度上说,是因为只有它的 block 参数提供了 FlowCollector 这个 Receiver 类型,因此只有在 lambda 表达式内部才能调用 emit() 发射数据:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
从业务和框架设计角度考虑,为什么 Flow 要设计成在 flow() 内部才能调用 emit() 来发送数据?因为 Flow 是数据流,数据流本来就是提前设计好数据规则,然后执行规则时发送一条条数据,它本来就不需要从外部发送。在不需要从外部发送的情况下,如果提供了从外部发送数据的能力,容易让开发者不小心写出错误代码。
但 SharedFlow 是数据流,本来就需要从各个地方发送数据,因此 Flow 的限制就没有必要了。所以,MutableSharedFlow 就做成可以从任何协程里手动调用 emit() 来发送数据了。
再说,通过 shareIn() 转换的 SharedFlow 也不是在其内部发送数据,而是在上游的 flow() 内部,对于 SharedFlow 来说也是外部。而 MutableSharedFlow 虽然同样不能在自己的内部发送数据,也不能通过 flow() 提供上游数据,但是它可以在任意协程内调用 emit() 发送数据,这相比于 SharedFlow 是一种功能范围的增加,而不是收窄。
MutableSharedFlow() 作为 shareIn() 的底层支持,它与 transform() 支持 map() 一样,都是会扩展功能而不是收窄功能。所有的底层函数都是功能更强大、使用更灵活,而上层函数是使用更方便。上层函数能做到的事,底层函数更能做到。
在 MutableSharedFlow() 与 shareIn() 的选择问题上,它俩本质是一个东西。如果需要提供一个事件流,可以用 MutableSharedFlow() 创建一个事件流;如果已经有了一个生产数据流的 Flow,可以直接用 shareIn(),省去创建 MutableSharedFlow() 的麻烦。
MutableSharedFlow() 参数讲解:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
三个参数的作用:
- replay:前面讲 shareIn() 参数时已经说过,不多赘述,只需注意它既有缓冲效果又有缓存效果。实际上 shareIn() 的参数就是用于调用 MutableSharedFlow() 时使用的
- extraBufferCapacity:额外缓冲(buffer)容量。假如你设置 replay 为 5,extraBufferCapacity 为 3,那么这个 MutableSharedFlow 的缓存尺寸是 5,缓冲尺寸是 8(即数据消费完后继续存下来给新订阅用的最多有 5 条,消费不及时最多可以缓冲上游的 8 条)
- onBufferOverflow:缓冲溢出策略,与前面说过的 buffer 操作符以及 Channel 的缓冲溢出策略是一样的。需要注意只针对缓冲,与缓存无关
MutableSharedFlow 还有一个简单函数 asSharedFlow() 可以将 MutableSharedFlow 转换成 SharedFlow。该函数基于一个典型需求,当你需要把 MutableSharedFlow 暴露出来给外部订阅,但并不希望让外部也来发送数据时,通过它返回一个只能读不能写的 SharedFlow。
4、StateFlow
前面说过,SharedFlow 是一种特殊的 Flow,把应用场景从数据流的收集收窄到事件流的订阅。而 StateFlow 是特殊的 SharedFlow,把应用场景进一步收窄,从事件流的订阅收窄到状态的订阅。
public interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
public val value: T
}
StateFlow 是 SharedFlow 的子接口,新增的 value 属性只保存 SharedFlow 最新的值,即最新事件的数据值。因此 StateFlow 可以认为是只保存仅保存最新一个事件的事件流,也可以认为 StateFlow 是缓冲和缓存大小都是 1 的 SharedFlow。
一个只缓存一条数据的事件流,并且这个缓存的数据还能直接访问,那它就是一个带有监听功能的状态。
使用上也非常简单:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
// 参数指定初始值,内部实际上是给 StateFlow 发送了第一条数据
val mutableStateFlow = MutableStateFlow("Kotlin Coroutine")
val job1 = scope.launch {
// 订阅状态
mutableStateFlow.collect {
println("State: $it")
}
}
val job2 = scope.launch {
delay(1000)
// 修改状态值
mutableStateFlow.emit("协程")
}
joinAll(job1, job2)
}
运行结果:
State: Kotlin Coroutine
State: 协程
StateFlow 有一个与 SharedFlow 的 shareIn() 类似的函数 —— stateIn(),它可以将 Flow 转换为 StateFlow:
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
val config = configureSharing(1)
val result = CompletableDeferred<StateFlow<T>>()
scope.launchSharingDeferred(config.context, config.upstream, result)
return result.await()
}
由于 StateFlow 是一个只保存最新数据,缓冲和缓存都是 1,溢出策略自然是丢弃老数据。因此在 stateIn() 的参数中不需要缓冲与缓存的配置参数,只需要一个 CoroutineScope 就可以了。
StateFlow 中还有一个与 SharedFlow 的 asSharedFlow() 对应的函数 asStateFlow(),功能类似,不多赘述。