当前位置: 首页 > article >正文

Coroutine 基础四 —— CoroutineScope 与 CoroutineContext

1、定位

CoroutineContext,协程上下文。协程用到的所有信息都属于协程上下文,根据功能不同,划分成了不同的分类。管理流程用的是 Job,管理线程用的是 ContinuationInterceptor,等等。

CoroutineScope 的定位有两点:

  1. CoroutineScope 是 CoroutineContext 的容器,因此 CoroutineScope 可以提供对应协程的上下文信息
  2. CoroutineScope 内还定义了 launch 和 async 两个启动协程的函数,因此可以用来启动协程

2、GlobalScope

GlobalScope 是一个特殊的 CoroutineScope:

@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

特殊之处:

  • GlobalScope 是一个单例
  • GlobalScope 没有内置的 Job

其他的 CoroutineScope 都会有一个内置的 Job:

@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

但是 GlobalScope 没有:

fun main() = runBlocking<Unit> {
    println(GlobalScope.coroutineContext[Job])
    val job = GlobalScope.launch { // this:CoroutineContext
        println(coroutineContext[Job])
    }
    println("job parent: ${job.parent}")
    delay(100)
}

运行结果:

null
StandaloneCoroutine{Active}@488e7dc8
job parent: null

GlobalScope 没有内置 Job,导致由其启动的协程,也就是 job 没有父协程。

GlobalScope 可以让开发者不用自己创建 CoroutineScope 也能启动协程。很多协程都是有绑定的生命周期的,比如某些协程与界面绑定生命周期,当界面关闭时,我们希望协程也自动取消。我们就可以用与界面生命周期绑定的 CoroutineScope(如 MainScope、LifecycleScope)来启动协程。但有时候,协程的生命周期是不跟任何应用组件绑定的,或者说它要一直存活到应用程序结束。此时就可以使用 GlobalScope 来启动协程。GlobalScope 启动的所有协程之间,与 GlobalScope 也没有任何的关联(取消、异常相互没有影响)。

3、从挂起函数里获取 CoroutineContext

我们在协程内很容易的就能通过 CoroutineScope 获取 CoroutineContext:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
    }
    job.join()
}

但是,如果把这个工作抽取到挂起函数中,由于挂起函数没有 CoroutineScope 类型的 this,因此它只能声明为 CoroutineScope 的扩展函数才能拿到 CoroutineContext:

private suspend fun CoroutineScope.showDispatcher() {
    println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
}

但是,由于挂起函数最终都是要在协程中运行的,因此它是可以通过协程的 CoroutineScope 访问到 CoroutineContext 的,这是一个合理诉求。那也不能对所有要访问 CoroutineContext 的挂起函数都声明为 CoroutineScope 的扩展函数吧?这时候 Kotlin 出手了,为挂起函数加了一个属性 kotlin.coroutines.coroutineContext,需要手动导入:

private suspend fun showDispatcher() {
    delay(100)
    println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
}

该属性的 get() 是一个挂起函数:

@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
	// get 的实现在别的位置,不是一直都抛异常
    get() {
        throw NotImplementedError("Implemented as intrinsic")
    }

除此之外,还有一个函数 currentCoroutineContext() 可以返回这个 coroutineContext:

public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext

该函数用于在特殊情况下避免命名冲突。这个特殊情况是在协程中调用 Flow,并且 Flow 的参数中需要访问 coroutineContext 的时候。

flow() 的参数是一个挂起函数:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

在这个挂起函数中可以访问到 coroutineContext 属性:

private fun flowFun() {
    flow<String> {
        coroutineContext
    }
}

但是假如在协程中使用 flow:

private fun flowFun() {
    flow<String> {
        // kotlin.coroutines.coroutineContext
        coroutineContext
    }
    
    GlobalScope.launch {
        flow<String> {
            // CoroutineScope 内的 coroutineContext 属性
            coroutineContext
        }
    }
}

协程内的 coroutineContext 指向 CoroutineScope 内的 coroutineContext:

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}

这是因为 flow 的大括号内部实际上是一个双重环境:

GlobalScope.launch { // this:CoroutineScope
    flow<String> { // 挂起函数的环境
        // CoroutineScope 内的 coroutineContext 属性
        coroutineContext
    }
}

双重环境是指协程提供的 CoroutineScope 以及 flow 参数的挂起函数的环境,两个环境都有 coroutineContext 这个变量,但两个变量并不是同一个对象,也就是名字相同但对象不同,发生了命名冲突。前者是 CoroutineScope 接口内的 coroutineContext 属性,后者是 kotlin.coroutines.coroutineContext 这个顶级属性。成员属性的优先级比顶级属性的优先级更高,因此最终拿到的就是 CoroutineScope 接口中的属性。

为了解决这个问题,Kotlin 才提供了 currentCoroutineContext() 帮助我们拿到那个顶级的 coroutineContext 属性。

4、coroutineScope() 与 supervisorScope()

coroutineScope() 的作用在其注释上已经明确写出:创建一个 CoroutineScope 并且通过这个 Scope 调用指定的挂起代码块。该 Scope 会从外部的 Scope 中继承 coroutineContext,使用该 coroutineContext 的 Job 作为新启动 Job 的父 Job。

这个功能描述跟 launch 非常像,都是启动一个子协程,并且协程内部的 coroutineContext 从外部继承,同时在协程内部创建一个新的 Job。

但是二者的区别很关键,也导致它俩的功能定位不同:

  • coroutineScope() 没有参数,因此无法通过参数定制 CoroutineContext;但 launch() 有参数,可以指定 ContinuationInterceptor,也可以指定父 Job
  • coroutineScope() 是挂起函数,会等内部代码执行完毕再返回,即与 coroutineScope() 所在协程是串行关系;而 launch() 只是协程的启动器,启动协程后 launch 也就执行完了,被启动的协程与 launch 所在的协程是并行关系

示例代码:

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    coroutineScope {
        delay(1000)
        println("Duration within coroutineScope: ${System.currentTimeMillis() - startTime}")
        // 假如内部启动了多个子协程,coroutineScope 启动的这个父协程会等待它们执行完毕
    }
    println("Duration of coroutineScope: ${System.currentTimeMillis() - startTime}")

    val startTime1 = System.currentTimeMillis()
    val job = launch {
        delay(1000)
        println("Duration within launch: ${System.currentTimeMillis() - startTime1}")
    }
    println("Duration of launch: ${System.currentTimeMillis() - startTime1}")

    job.join()
}

运行结果:

Duration within coroutineScope: 1013
Duration of coroutineScope: 1021
Duration of launch: 2
Duration within launch: 1007

结果表明 coroutineScope() 确实是串行的。

coroutineScope() 最常用的场景是在挂起函数里提供一个 CoroutineScope 的上下文:

private suspend fun someFun() {
    coroutineScope { 
        launch { 
            
        }
    }
}

假如想在挂起函数中用 launch/async 启动协程,由于挂起函数内没有 CoroutineScope 类型的 this,因此需要有一个东西能提供,这个东西就是 coroutineScope()。

不要想着为什么不用 CoroutineScope 的扩展函数:

private suspend fun CoroutineScope.someFun() {
    launch { 

    }
}

因为上一节说过,这样做会造成 coroutineContext 的命名冲突。

coroutineScope() 与 launch() 还有一个不是很重要的区别:coroutineScope() 的返回值是代码块内的最后一行,而 launch() 的返回值是 Job。这会引发 coroutineScope() 的第二个应用场景,对串行模块的封装。

利用这一点,可以使用 coroutineScope() 直接返回 async 的运行结果:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        val result = coroutineScope {
            val deferred = async { "Coroutine" }
            deferred.await()
        }
        println("result: $result")
    }
    job.join()
}

进一步延伸,可以使用 coroutineScope() 封装业务代码:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        val result = coroutineScope {
            // 业务代码,举个简单例子
            val deferred1 = async { "Coroutine" }
            val deferred2 = async { "Scope" }
            deferred1.await() + deferred2.await()
        }
        println("result: $result")
    }
    job.join()
}

这样封装的好处是,在 coroutineScope() 外使用 try-catch 就可以捕获到 coroutineScope() 内协程抛出的异常,不至于影响 coroutineScope() 所在的整棵协程树:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        val result = try {
            coroutineScope {
                val deferred1 = async { "Coroutine" }
                // 假如这个协程抛异常了
                val deferred2 = async { throw RuntimeException("Error!") }
                deferred1.await() + deferred2.await()
            }
        } catch (e: Exception) {
            e.message
        }
        println("result: $result")
    }
    job.join()
}

运行结果:

result: Error!

能对 coroutineScope() 进行 try-catch 的原因是它是一个挂起函数,当内部协程运行时,该函数处于挂起状态,如果内部协程发生异常,外部协程是可以感知到的。

因此可以使用 coroutineScope() 封装业务代码,这个在前面讲 async 的时候也介绍过,假如在一大段代码中,有一小部分代码是通过多个子协程去执行某一个子流程的,通常会使用 coroutineScope() 把它们括起来。这样即便这些子协程发生了异常,你也可以通过 try-catch 进行异常处理,而不至于让整个协程树都被毁掉。

supervisorScope() 里面用的是类似于 SupervisorJob 的 Job(实质上不是 SupervisorJob,但是功能完全一样)。

5、再谈 withContext()

withContext() 与 coroutineScope() 的唯一不同在于,withContext() 允许填参数:

// CoroutineScope.kt:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
    // 确保 block 只被调用一次
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        val coroutine = ScopeCoroutine(uCont.context, uCont)
        coroutine.startUndispatchedOrReturn(coroutine, block)
    }
}
public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context
        // Copy CopyableThreadContextElement if necessary
        val newContext = oldContext.newCoroutineContext(context)
        // always check for cancellation of new context
        // 验证新的上下文是否已经被取消了,如果是就抛 CancellationException 进入取消流程 
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        // 如果新旧协程上下文相同,那么后续执行的 return 部分与 coroutineScope() 内的 return 相同
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(coroutine.context, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

看 FAST PATH #1 部分,如果 withContext() 填的 context 参数没有改变 oldContext,比如 EmptyCoroutineContext,那么 withContext() 与 coroutineScope() 的运行效果是几乎一样的(除了 ensureActive())。

再看 FAST PATH #2,如果新旧上下文的 ContinuationInterceptor 没变,就用 UndispatchedCoroutine 启动一个新协程。

最后,SLOW PATH 就是指,如果线程都改变了,就用 DispatchedCoroutine 启动一个新协程。

所以 withContext() 可以看作是一个可以定制 CoroutineContext 的 coroutineScope()。另一个角度,它还可以看作是串行的 launch() 或 async()。

具体开发时,withContext() 的使用场景就是临时切换 CoroutineContext 的。

withContext() 虽然在底层还是开了协程,但是在上层,我们通常会认为它就是串行的切换了一个线程环境。

6、CoroutineName

fun main() = runBlocking<Unit> {
    val name = CoroutineName("MyCoroutine")
    // 1.给单个协程指定 CoroutineName
    launch(name) {
        // 两种获取 CoroutineName 的方式
        println("CoroutineName: ${coroutineContext[CoroutineName]}")
        println("CoroutineName: ${coroutineContext[CoroutineName]?.name}")
    }

    // 2.给 CoroutineScope 指定 CoroutineScope,由该 CoroutineScope 启动的所有协程默认用这个名字
    val scope = CoroutineScope(Dispatchers.IO + name)
    scope.launch {
        println("CoroutineName: ${coroutineContext[CoroutineName]}")
    }

    delay(1000)
}

运行结果:

CoroutineName: CoroutineName(MyCoroutine)
CoroutineName: CoroutineName(MyCoroutine)
CoroutineName: MyCoroutine

此功能多用于测试与调试时,为想要监测的协程设置一个易于查看的名称。

7、CoroutineContext 的加减和 get()

7.1 +

+ 是运算符重载,调用的是 CoroutineContext 的 plus():

	public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

plus() 返回的 CombinedContext 会将两个 CoroutineContext 合并成一个:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable

Element 也是 CoroutineContext 的一个子接口:

	public interface Element : CoroutineContext

比如进行如下操作时:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(Dispatchers.IO + Job())
    println("$scope")
}

输出:

CoroutineScope(coroutineContext=[JobImpl{Active}@24273305, Dispatchers.IO])

然后再给 CoroutineScope 添加一个 CoroutineName:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(Dispatchers.IO + Job() + CoroutineName("MyCoroutine"))
    println("$scope")
}

输出如下:

CoroutineScope(coroutineContext=[JobImpl{Active}@5a42bbf4, CoroutineName(MyCoroutine), Dispatchers.IO])

如果有多个相同类型的对象相加,那么新的会替换掉旧的:

fun main() = runBlocking<Unit> {
    val job1 = Job()
    val job2 = Job()
    val scope = CoroutineScope(Dispatchers.IO + job1 + CoroutineName("MyCoroutine") + job2)
    println("job1: $job1, job2: $job2")
    println("$scope")
}

运行结果:

job1: JobImpl{Active}@5d3411d, job2: JobImpl{Active}@2471cca7
CoroutineScope(coroutineContext=[CoroutineName(MyCoroutine), JobImpl{Active}@2471cca7, Dispatchers.IO])

通过哈希值判断,job2 替换了 job1。

需要注意,相同类型的 CoroutineContext 元素不能直接相加,比如 job1 + job2 这样编译器就会报错,因为 Job 在对加号进行重载时添加了限制:

	@Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
        "Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
        "The job to the right of `+` just replaces the job the left of `+`.",
        level = DeprecationLevel.ERROR)
    public operator fun plus(other: Job): Job = other

两个 CoroutineName 和 CoroutineExceptionHandler 可以直接相加。

7.2 get()

coroutineContext[Job] 的中括号实际上是通过 get() 实现的:

public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E? // 得到 Key 的泛型类型对象
    
    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>
}

get() 的参数传入的 Job 并不是指 Job 类型,而是 Job 接口的伴生对象 Key:

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job>
}

由于 Kotlin 中访问一个类的伴生对象可以直接通过类名访问,比如 Job.Key,再进一步简写为只有类名 Job。因此 coroutineContext[Job] 实际上是调用 coroutineContext.get(Job),这个 Job 是 Job 类的伴生对象,也就是 Key,所以最终效果是 coroutineContext.get(Key),拿到的就是 Job 对象。

对于其他 CoroutineContext.Element 也是类似的,比如 CoroutineExceptionHandler:

public interface CoroutineExceptionHandler : CoroutineContext.Element {
    /**
     * Key for [CoroutineExceptionHandler] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    ...
}

以及 CoroutineName:

public data class CoroutineName(
    /**
     * User-defined coroutine name.
     */
    val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
    /**
     * Key for [CoroutineName] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<CoroutineName>

    ...
}

还有 ContinuationInterceptor 等等。在获取 ContinuationInterceptor 时,可以直接用 coroutineContext[ContinuationInterceptor]

fun main() = runBlocking<Unit> {
    val interceptor = coroutineContext[ContinuationInterceptor]
}

但是如果想获取 ContinuationInterceptor 的子类 CoroutineDispatcher 对象时,可以使用强转的方式:

fun main() = runBlocking<Unit> {
    // 注意 get() 返回的是可空类型,因此不要遗忘了 ?
    val dispatcher: CoroutineDispatcher? =
        coroutineContext[ContinuationInterceptor] as CoroutineDispatcher?
}

或者直接通过 CoroutineDispatcher 的伴生对象:

@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking<Unit> {
    val dispatcher: CoroutineDispatcher? = coroutineContext[CoroutineDispatcher]
}

但是必须加上 @OptIn(ExperimentalStdlibApi::class) 这个注解,因为 CoroutineDispatcher 的伴生对象 Key 有点不一样,它现在是有 @ExperimentalStdlibApi 注解的:

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

    /** @suppress */
    @ExperimentalStdlibApi
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
        ContinuationInterceptor,
        { it as? CoroutineDispatcher })
    }

8、自定义 CoroutineContext

协程提供了多种 CoroutineContext:ContinuationInterceptor、Job、CoroutineExceptionHandler、CoroutineName。

如果上述内容都不能满足要为 CoroutineContext 附加专属信息或专属功能的需求,就需要自定义 CoroutineContext。

自定义 CoroutineContext 一般无需直接实现 CoroutineContext,可以继承系统提供的半成品 AbstractCoroutineContextElement:

// public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
class MyContext : AbstractCoroutineContextElement(MyContext) {
    companion object Key : CoroutineContext.Key<MyContext>

    // 自定义的功能
    suspend fun log() {
        println("Current coroutine: $coroutineContext")
    }
}

使用:

fun main() = runBlocking<Unit> {
    launch(MyContext()) {
        coroutineContext[MyContext]?.log()
    }
    delay(100)
}

运行结果:

Current coroutine: [com.kotlin.coroutine._3_scope_context.MyContext@61a52fbd, StandaloneCoroutine{Active}@33a10788, BlockingEventLoop@7006c658]

http://www.kler.cn/a/469106.html

相关文章:

  • 大数据架构演变
  • C# 服务生命周期:Singleton、Scoped、Transient
  • 鸿蒙操作系统(HarmonyOS)
  • 指针 const 的组合
  • 【数据可视化-11】全国大学数据可视化分析
  • iOS - 线程与AutoreleasePoolPage
  • Java `computeIfAbsent` 方法
  • Flink源码解析之:Flink on k8s 客户端提交任务源码分析
  • 7. C语言 运算符详解
  • 【计算机网络安全】CA和安全电子邮件
  • 【前端面试题】前端中的两个外边距bug以及什么是BFC
  • Linux驱动开发:深入理解I2C时序(二)
  • 深入学习 Spring `@PostMapping` 处理表单参数与 JSON 参数
  • PyQt开发界面环境搭建
  • 【FlutterDart】页面切换 PageView PageController(9 /100)
  • 常用的数据结构API概览
  • LeetCode -Hot100 - 73. 矩阵置零
  • 瑞吉外卖项目学习笔记(十)修改套餐、删除套餐、起售和停售套餐
  • 云原生监控与日志管理:确保云原生应用的可靠性与性能
  • Spring MVC和servlet
  • 【2025最新计算机毕业设计】基于SSM的医院挂号住院系统(高质量源码,提供文档,免费部署到本地)【提供源码+答辩PPT+文档+项目部署】
  • 西安电子科技大学初/复试笔试、面试、机试成绩占比
  • 初学stm32 --- RTC实时时钟
  • Pytest钩子函数,测试框架动态切换测试环境
  • 《Rust权威指南》学习笔记(二)
  • Node.js中使用Joi 和 express-joi-validation进行数据验证和校验