Coroutine 基础四 —— CoroutineScope 与 CoroutineContext
1、定位
CoroutineContext,协程上下文。协程用到的所有信息都属于协程上下文,根据功能不同,划分成了不同的分类。管理流程用的是 Job,管理线程用的是 ContinuationInterceptor,等等。
CoroutineScope 的定位有两点:
- CoroutineScope 是 CoroutineContext 的容器,因此 CoroutineScope 可以提供对应协程的上下文信息
- CoroutineScope 内还定义了 launch 和 async 两个启动协程的函数,因此可以用来启动协程
2、GlobalScope
GlobalScope 是一个特殊的 CoroutineScope:
@DelicateCoroutinesApi
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
特殊之处:
- GlobalScope 是一个单例
- GlobalScope 没有内置的 Job
其他的 CoroutineScope 都会有一个内置的 Job:
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
但是 GlobalScope 没有:
fun main() = runBlocking<Unit> {
println(GlobalScope.coroutineContext[Job])
val job = GlobalScope.launch { // this:CoroutineContext
println(coroutineContext[Job])
}
println("job parent: ${job.parent}")
delay(100)
}
运行结果:
null
StandaloneCoroutine{Active}@488e7dc8
job parent: null
GlobalScope 没有内置 Job,导致由其启动的协程,也就是 job 没有父协程。
GlobalScope 可以让开发者不用自己创建 CoroutineScope 也能启动协程。很多协程都是有绑定的生命周期的,比如某些协程与界面绑定生命周期,当界面关闭时,我们希望协程也自动取消。我们就可以用与界面生命周期绑定的 CoroutineScope(如 MainScope、LifecycleScope)来启动协程。但有时候,协程的生命周期是不跟任何应用组件绑定的,或者说它要一直存活到应用程序结束。此时就可以使用 GlobalScope 来启动协程。GlobalScope 启动的所有协程之间,与 GlobalScope 也没有任何的关联(取消、异常相互没有影响)。
3、从挂起函数里获取 CoroutineContext
我们在协程内很容易的就能通过 CoroutineScope 获取 CoroutineContext:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val job = scope.launch {
println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
}
job.join()
}
但是,如果把这个工作抽取到挂起函数中,由于挂起函数没有 CoroutineScope 类型的 this,因此它只能声明为 CoroutineScope 的扩展函数才能拿到 CoroutineContext:
private suspend fun CoroutineScope.showDispatcher() {
println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
}
但是,由于挂起函数最终都是要在协程中运行的,因此它是可以通过协程的 CoroutineScope 访问到 CoroutineContext 的,这是一个合理诉求。那也不能对所有要访问 CoroutineContext 的挂起函数都声明为 CoroutineScope 的扩展函数吧?这时候 Kotlin 出手了,为挂起函数加了一个属性 kotlin.coroutines.coroutineContext
,需要手动导入:
private suspend fun showDispatcher() {
delay(100)
println("Dispatcher: ${coroutineContext[ContinuationInterceptor]}")
}
该属性的 get() 是一个挂起函数:
@SinceKotlin("1.3")
@Suppress("WRONG_MODIFIER_TARGET")
@InlineOnly
public suspend inline val coroutineContext: CoroutineContext
// get 的实现在别的位置,不是一直都抛异常
get() {
throw NotImplementedError("Implemented as intrinsic")
}
除此之外,还有一个函数 currentCoroutineContext() 可以返回这个 coroutineContext:
public suspend inline fun currentCoroutineContext(): CoroutineContext = coroutineContext
该函数用于在特殊情况下避免命名冲突。这个特殊情况是在协程中调用 Flow,并且 Flow 的参数中需要访问 coroutineContext 的时候。
flow() 的参数是一个挂起函数:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
在这个挂起函数中可以访问到 coroutineContext 属性:
private fun flowFun() {
flow<String> {
coroutineContext
}
}
但是假如在协程中使用 flow:
private fun flowFun() {
flow<String> {
// kotlin.coroutines.coroutineContext
coroutineContext
}
GlobalScope.launch {
flow<String> {
// CoroutineScope 内的 coroutineContext 属性
coroutineContext
}
}
}
协程内的 coroutineContext 指向 CoroutineScope 内的 coroutineContext:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
这是因为 flow 的大括号内部实际上是一个双重环境:
GlobalScope.launch { // this:CoroutineScope
flow<String> { // 挂起函数的环境
// CoroutineScope 内的 coroutineContext 属性
coroutineContext
}
}
双重环境是指协程提供的 CoroutineScope 以及 flow 参数的挂起函数的环境,两个环境都有 coroutineContext 这个变量,但两个变量并不是同一个对象,也就是名字相同但对象不同,发生了命名冲突。前者是 CoroutineScope 接口内的 coroutineContext 属性,后者是 kotlin.coroutines.coroutineContext
这个顶级属性。成员属性的优先级比顶级属性的优先级更高,因此最终拿到的就是 CoroutineScope 接口中的属性。
为了解决这个问题,Kotlin 才提供了 currentCoroutineContext() 帮助我们拿到那个顶级的 coroutineContext 属性。
4、coroutineScope() 与 supervisorScope()
coroutineScope() 的作用在其注释上已经明确写出:创建一个 CoroutineScope 并且通过这个 Scope 调用指定的挂起代码块。该 Scope 会从外部的 Scope 中继承 coroutineContext,使用该 coroutineContext 的 Job 作为新启动 Job 的父 Job。
这个功能描述跟 launch 非常像,都是启动一个子协程,并且协程内部的 coroutineContext 从外部继承,同时在协程内部创建一个新的 Job。
但是二者的区别很关键,也导致它俩的功能定位不同:
- coroutineScope() 没有参数,因此无法通过参数定制 CoroutineContext;但 launch() 有参数,可以指定 ContinuationInterceptor,也可以指定父 Job
- coroutineScope() 是挂起函数,会等内部代码执行完毕再返回,即与 coroutineScope() 所在协程是串行关系;而 launch() 只是协程的启动器,启动协程后 launch 也就执行完了,被启动的协程与 launch 所在的协程是并行关系
示例代码:
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
coroutineScope {
delay(1000)
println("Duration within coroutineScope: ${System.currentTimeMillis() - startTime}")
// 假如内部启动了多个子协程,coroutineScope 启动的这个父协程会等待它们执行完毕
}
println("Duration of coroutineScope: ${System.currentTimeMillis() - startTime}")
val startTime1 = System.currentTimeMillis()
val job = launch {
delay(1000)
println("Duration within launch: ${System.currentTimeMillis() - startTime1}")
}
println("Duration of launch: ${System.currentTimeMillis() - startTime1}")
job.join()
}
运行结果:
Duration within coroutineScope: 1013
Duration of coroutineScope: 1021
Duration of launch: 2
Duration within launch: 1007
结果表明 coroutineScope() 确实是串行的。
coroutineScope() 最常用的场景是在挂起函数里提供一个 CoroutineScope 的上下文:
private suspend fun someFun() {
coroutineScope {
launch {
}
}
}
假如想在挂起函数中用 launch/async 启动协程,由于挂起函数内没有 CoroutineScope 类型的 this,因此需要有一个东西能提供,这个东西就是 coroutineScope()。
不要想着为什么不用 CoroutineScope 的扩展函数:
private suspend fun CoroutineScope.someFun() {
launch {
}
}
因为上一节说过,这样做会造成 coroutineContext 的命名冲突。
coroutineScope() 与 launch() 还有一个不是很重要的区别:coroutineScope() 的返回值是代码块内的最后一行,而 launch() 的返回值是 Job。这会引发 coroutineScope() 的第二个应用场景,对串行模块的封装。
利用这一点,可以使用 coroutineScope() 直接返回 async 的运行结果:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val job = scope.launch {
val result = coroutineScope {
val deferred = async { "Coroutine" }
deferred.await()
}
println("result: $result")
}
job.join()
}
进一步延伸,可以使用 coroutineScope() 封装业务代码:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val job = scope.launch {
val result = coroutineScope {
// 业务代码,举个简单例子
val deferred1 = async { "Coroutine" }
val deferred2 = async { "Scope" }
deferred1.await() + deferred2.await()
}
println("result: $result")
}
job.join()
}
这样封装的好处是,在 coroutineScope() 外使用 try-catch 就可以捕获到 coroutineScope() 内协程抛出的异常,不至于影响 coroutineScope() 所在的整棵协程树:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val job = scope.launch {
val result = try {
coroutineScope {
val deferred1 = async { "Coroutine" }
// 假如这个协程抛异常了
val deferred2 = async { throw RuntimeException("Error!") }
deferred1.await() + deferred2.await()
}
} catch (e: Exception) {
e.message
}
println("result: $result")
}
job.join()
}
运行结果:
result: Error!
能对 coroutineScope() 进行 try-catch 的原因是它是一个挂起函数,当内部协程运行时,该函数处于挂起状态,如果内部协程发生异常,外部协程是可以感知到的。
因此可以使用 coroutineScope() 封装业务代码,这个在前面讲 async 的时候也介绍过,假如在一大段代码中,有一小部分代码是通过多个子协程去执行某一个子流程的,通常会使用 coroutineScope() 把它们括起来。这样即便这些子协程发生了异常,你也可以通过 try-catch 进行异常处理,而不至于让整个协程树都被毁掉。
supervisorScope() 里面用的是类似于 SupervisorJob 的 Job(实质上不是 SupervisorJob,但是功能完全一样)。
5、再谈 withContext()
withContext() 与 coroutineScope() 的唯一不同在于,withContext() 允许填参数:
// CoroutineScope.kt:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
// 确保 block 只被调用一次
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
// Copy CopyableThreadContextElement if necessary
val newContext = oldContext.newCoroutineContext(context)
// always check for cancellation of new context
// 验证新的上下文是否已经被取消了,如果是就抛 CancellationException 进入取消流程
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
// 如果新旧协程上下文相同,那么后续执行的 return 部分与 coroutineScope() 内的 return 相同
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(coroutine.context, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
看 FAST PATH #1 部分,如果 withContext() 填的 context 参数没有改变 oldContext,比如 EmptyCoroutineContext,那么 withContext() 与 coroutineScope() 的运行效果是几乎一样的(除了 ensureActive())。
再看 FAST PATH #2,如果新旧上下文的 ContinuationInterceptor 没变,就用 UndispatchedCoroutine 启动一个新协程。
最后,SLOW PATH 就是指,如果线程都改变了,就用 DispatchedCoroutine 启动一个新协程。
所以 withContext() 可以看作是一个可以定制 CoroutineContext 的 coroutineScope()。另一个角度,它还可以看作是串行的 launch() 或 async()。
具体开发时,withContext() 的使用场景就是临时切换 CoroutineContext 的。
withContext() 虽然在底层还是开了协程,但是在上层,我们通常会认为它就是串行的切换了一个线程环境。
6、CoroutineName
fun main() = runBlocking<Unit> {
val name = CoroutineName("MyCoroutine")
// 1.给单个协程指定 CoroutineName
launch(name) {
// 两种获取 CoroutineName 的方式
println("CoroutineName: ${coroutineContext[CoroutineName]}")
println("CoroutineName: ${coroutineContext[CoroutineName]?.name}")
}
// 2.给 CoroutineScope 指定 CoroutineScope,由该 CoroutineScope 启动的所有协程默认用这个名字
val scope = CoroutineScope(Dispatchers.IO + name)
scope.launch {
println("CoroutineName: ${coroutineContext[CoroutineName]}")
}
delay(1000)
}
运行结果:
CoroutineName: CoroutineName(MyCoroutine)
CoroutineName: CoroutineName(MyCoroutine)
CoroutineName: MyCoroutine
此功能多用于测试与调试时,为想要监测的协程设置一个易于查看的名称。
7、CoroutineContext 的加减和 get()
7.1 +
+
是运算符重载,调用的是 CoroutineContext 的 plus():
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
plus() 返回的 CombinedContext 会将两个 CoroutineContext 合并成一个:
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable
Element 也是 CoroutineContext 的一个子接口:
public interface Element : CoroutineContext
比如进行如下操作时:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.IO + Job())
println("$scope")
}
输出:
CoroutineScope(coroutineContext=[JobImpl{Active}@24273305, Dispatchers.IO])
然后再给 CoroutineScope 添加一个 CoroutineName:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.IO + Job() + CoroutineName("MyCoroutine"))
println("$scope")
}
输出如下:
CoroutineScope(coroutineContext=[JobImpl{Active}@5a42bbf4, CoroutineName(MyCoroutine), Dispatchers.IO])
如果有多个相同类型的对象相加,那么新的会替换掉旧的:
fun main() = runBlocking<Unit> {
val job1 = Job()
val job2 = Job()
val scope = CoroutineScope(Dispatchers.IO + job1 + CoroutineName("MyCoroutine") + job2)
println("job1: $job1, job2: $job2")
println("$scope")
}
运行结果:
job1: JobImpl{Active}@5d3411d, job2: JobImpl{Active}@2471cca7
CoroutineScope(coroutineContext=[CoroutineName(MyCoroutine), JobImpl{Active}@2471cca7, Dispatchers.IO])
通过哈希值判断,job2 替换了 job1。
需要注意,相同类型的 CoroutineContext 元素不能直接相加,比如 job1 + job2
这样编译器就会报错,因为 Job 在对加号进行重载时添加了限制:
@Deprecated(message = "Operator '+' on two Job objects is meaningless. " +
"Job is a coroutine context element and `+` is a set-sum operator for coroutine contexts. " +
"The job to the right of `+` just replaces the job the left of `+`.",
level = DeprecationLevel.ERROR)
public operator fun plus(other: Job): Job = other
两个 CoroutineName 和 CoroutineExceptionHandler 可以直接相加。
7.2 get()
coroutineContext[Job]
的中括号实际上是通过 get() 实现的:
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
*/
public operator fun <E : Element> get(key: Key<E>): E? // 得到 Key 的泛型类型对象
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
*/
public interface Key<E : Element>
}
get() 的参数传入的 Job 并不是指 Job 类型,而是 Job 接口的伴生对象 Key:
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job>
}
由于 Kotlin 中访问一个类的伴生对象可以直接通过类名访问,比如 Job.Key,再进一步简写为只有类名 Job。因此 coroutineContext[Job]
实际上是调用 coroutineContext.get(Job)
,这个 Job 是 Job 类的伴生对象,也就是 Key,所以最终效果是 coroutineContext.get(Key)
,拿到的就是 Job 对象。
对于其他 CoroutineContext.Element 也是类似的,比如 CoroutineExceptionHandler:
public interface CoroutineExceptionHandler : CoroutineContext.Element {
/**
* Key for [CoroutineExceptionHandler] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
...
}
以及 CoroutineName:
public data class CoroutineName(
/**
* User-defined coroutine name.
*/
val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
/**
* Key for [CoroutineName] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<CoroutineName>
...
}
还有 ContinuationInterceptor 等等。在获取 ContinuationInterceptor 时,可以直接用 coroutineContext[ContinuationInterceptor]
:
fun main() = runBlocking<Unit> {
val interceptor = coroutineContext[ContinuationInterceptor]
}
但是如果想获取 ContinuationInterceptor 的子类 CoroutineDispatcher 对象时,可以使用强转的方式:
fun main() = runBlocking<Unit> {
// 注意 get() 返回的是可空类型,因此不要遗忘了 ?
val dispatcher: CoroutineDispatcher? =
coroutineContext[ContinuationInterceptor] as CoroutineDispatcher?
}
或者直接通过 CoroutineDispatcher 的伴生对象:
@OptIn(ExperimentalStdlibApi::class)
fun main() = runBlocking<Unit> {
val dispatcher: CoroutineDispatcher? = coroutineContext[CoroutineDispatcher]
}
但是必须加上 @OptIn(ExperimentalStdlibApi::class)
这个注解,因为 CoroutineDispatcher 的伴生对象 Key 有点不一样,它现在是有 @ExperimentalStdlibApi 注解的:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/** @suppress */
@ExperimentalStdlibApi
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor,
{ it as? CoroutineDispatcher })
}
8、自定义 CoroutineContext
协程提供了多种 CoroutineContext:ContinuationInterceptor、Job、CoroutineExceptionHandler、CoroutineName。
如果上述内容都不能满足要为 CoroutineContext 附加专属信息或专属功能的需求,就需要自定义 CoroutineContext。
自定义 CoroutineContext 一般无需直接实现 CoroutineContext,可以继承系统提供的半成品 AbstractCoroutineContextElement:
// public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element
class MyContext : AbstractCoroutineContextElement(MyContext) {
companion object Key : CoroutineContext.Key<MyContext>
// 自定义的功能
suspend fun log() {
println("Current coroutine: $coroutineContext")
}
}
使用:
fun main() = runBlocking<Unit> {
launch(MyContext()) {
coroutineContext[MyContext]?.log()
}
delay(100)
}
运行结果:
Current coroutine: [com.kotlin.coroutine._3_scope_context.MyContext@61a52fbd, StandaloneCoroutine{Active}@33a10788, BlockingEventLoop@7006c658]