Coroutine 基础二 —— 结构化并发(一)
1、“一个协程”到底指什么
为了讲结构化并发,需要先讲父子协程;讲父子协程,就需要先讲什么是“一个协程”。
课程用线程作为对比来引入协程概念。使用线程时,通常会认为 Thread 对象就是线程,除了 Thread 这个单词本身就是线程的意思之外,更本质的原因是,通过 Thread 对象,可以实现对线程这个抽象概念的管理。比如控制线程的运行流程(start、interrupt、join 等),获取线程的状态(isAlive 等等)。
那协程是否有一个对应的对象,就像 Thread 之于线程那样呢?严格来讲,并没有。Thread 类提供了所有对线程的操作,但是协程没有一个一模一样的类。因为协程做了细致的职责划分,使得没有一个类能囊括所有职责。
通常我们认为,Job 与 CoroutineScope 可以视为一个协程,但是它们的职责不同:
- launch() 返回的 Job 与 async() 返回的 Job 的子类 Deferred 用于控制协程的执行流程相关的,它可以控制协程运行流程(start、interrupt、join 等),也可以获取协程的状态(isActive、isCancelled、isCompleted),还能获取父子协程(通过 parent 与 children 属性)
- launch() 与 async() 大括号内的 CoroutineScope 对象是一个顶级的协程管理器,它可以启动一个全新的协程(Job.start() 只有在以 LAZY 模式创建的协程才需要使用),还可以获取到 ContinuationInterceptor 和 Job,并且通过追溯源码和打印 log 能发现,该 CoroutineScope 对象,实际上与 launch() 与 async() 返回的 Job 或 Deferred 是同一个对象。之所以是同一个对象(AbstractCoroutine)还做责任拆分,目的是为了让 API 更加精准,或者说为了避免 API 污染,不让没用的 API 出现在不该出现的地方(返回值是 Job 说明该位置只需要进行流程控制,而不需要 CoroutineScope 进行全方位的控制)
CoroutineScope 担任的是每个协程中大总管的角色,而 Job 仅负责流程相关的这部分职责,二者是有从属关系的。
课程中还提到 launch() 与 async() 闭包的大括号的内容也可以视为一个协程,只不过从技术角度讲没什么价值,只是在思考和探讨流程时非常便捷。
2、父子协程,以及协程间的并行和等待
结构化并发,其实就是父子协程的生命周期的各种关联。本节两个大问题:
- 父子关系是如何确立的
- 协程结束时,对父子协程会发生怎样的自动化影响
最直接的父子协程关系,就是在一个协程内部启动另一个协程:
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
var innerJob: Job? = null
val job = scope.launch { // this:CoroutineScope
innerJob = launch {
// 这里要做延时,因为协程的父子关系会随着协程运行完毕而解绑,
// 倘若不进行延时,在打印 log 时,由于协程已经运行完,那么
// 二者就不再是父子协程,就会输出 false
delay(500)
}
}
val children = job.children
println("children count: ${children.count()}") // 1
println("innerJob === children.first(): ${innerJob === children.first()}") // true
println("innerJob?.parent === job: ${innerJob?.parent === job}") // true
}
这样 innerJob 就是 job 的子协程,本质上是将 job.children 设置为 innerJob,将 innerJob.parent 设置为 job。
父子协程是如何拿到对方的 Job 实现上述的关系挂接的呢?
首先,innerJob 是在父协程 job 的 launch 内启动的,因此父协程能拿到子协程 Job 对象 —— innerJob。
其次,innerJob 在通过 launch 启动子协程时,本质上是 this.launch {…},this 是父协程 {} 内指示的 CoroutineScope,该 CoroutineScope 内包含该协程的 Job 对象 —— job。因此,子协程可以通过 this 拿到父协程的 Job 对象赋值给子协程的 parent 属性。
下面将启动子协程的代码进行修改:
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
var innerJob: Job? = null
val job = scope.launch {
// 使用 scope 启动子协程
innerJob = scope.launch {
delay(500)
}
}
}
使用 scope 代替隐式的 this 启动子协程,再进行关系验证,发现此时 innerJob 不再是 job 的子协程了。因为通过 scope 启动协程,拿到的就不再是通过 this 的 CoroutineScope 内含有的 Job —— job 对象,而是 scope 内含有的 Job 对象了,也就是说,此时 innerJob 的父协程就是 scope 内含有的 Job。也就是说,此时 job 与 innerJob 是兄弟协程而不再是父子协程了。
或许你会有疑问,scope 就是一个 CoroutineScope,它没有启动协程,哪里来的 Job 呢?看 CoroutineScope 的构造函数:
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job()) // + 表示合并
如果参数传入的 CoroutineContext 没有 Job,会自动创建 Job 对象。所以,即便没启动协程,CoroutineScope 内也是含有 Job 对象的。
综上,我们固有观念中认为的,以代码结构的嵌套认定协程的父子关系的做法是不正确的,应该通过启动协程的 CoroutineScope 认定协程的父子关系。
原本判断时,通过代码结构认定父子协程能够成立的原因是,内部协程通过隐式 this 启动刚好使用了外部协程的 CoroutineScope:
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
var innerJob: Job? = null
val job = scope.launch {
innerJob = /*this.*/launch {
delay(500)
}
}
}
明确父子协程关系的意义在于结构化并发(取消、异常管理、结束),结构化结束是指父协程会等待所有子协程都运行完毕后再结束自己。所有协程之间都是并行关系,包括兄弟、父子协程,以及没有任何关系的协程。
通过一个例子来了解结构化结束,也就是父协程会等待子协程结束后再结束(父协程的代码哪怕已经运行完了,父协程也会等待所有子协程都完成后再完成):
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
// 父子协程是并行运行的,但是因为子协程等待 100,父协程需要等待
// 子协程运行完毕后再结束自己,所以在 job.join() 的位置,runBlocking
// 协程会等待 job 大概 100ms 的时间
val job = scope.launch {
launch {
delay(100)
}
}
val startTime = System.currentTimeMillis()
// 这里必须 join 一下,因为 runBlocking 开启的协程不是 job 的
// 父协程,它不会等待 job 运行完再结束
job.join()
val duration = System.currentTimeMillis() - startTime
println("Duration: $duration") // 输出在 100ms 左右
}
结构化结束的意义就在于这种表现,比如在做初始化工作时,一些后续工作需要等待初始化完成才能进行,此时可以:
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
// 初始化放在协程中进行,这样应用可以正常启动而不必被初始化流程卡着
val initJob = scope.launch {
// 初始化 1
launch { }
// 初始化 2
launch { }
// 其他初始化...
}
// 某些工作依赖初始化流程,因此在执行工作之前,先 join 一下等待初始化工作完成,
// 后续工作才能安全执行
scope.launch {
initJob.join()
// 后续工作...
}
}
3、线程结束
结束线程有两种方式:协作式(也称交互式)结束与强行结束。
stop() 会导致不可预期的结果(不管线程运行到哪里都直接暴力停止线程),给程序带来极大的不稳定性,因此被废弃了。
interrupt() 是协作式的结束线程,调用后会将线程的中断状态标记 isInterrupted 置为 true,线程内部通过检查该标记决定如何结束线程:
fun main() = runBlocking {
val thread = object : Thread() {
override fun run() {
var count = 0
while (true) {
if (isInterrupted) {
// 清理工作...做完后 return 结束线程
return
}
// 耗时任务...
count++
if (count % 100_000_000 == 0) {
println(count)
}
if (count % 1_000_000_000 == 0) {
break
}
}
println("Thread: I'm done!")
}
}.apply { start() }
Thread.sleep(500)
thread.interrupt()
}
需要注意的是,在检测到 isInterrupted 为 true,结束线程之前,需要做清理/收尾工作。比如线程原本是执行为图片添加滤镜工作的,中断时图片滤镜尚未添加完毕,就需要回退到原始图片状态。如果不进行收尾工作,那么 interrupt() 就与 stop() 没有多大区别了。
检查中断标记还可以使用 interrupted(),与 isInterrupted 标记不同的是,该方法会在调用时将 isInterrupted 置为 false。
此外,还应注意等待相关代码,比如在 Thread.sleep() 的过程中调用了 interrupt(),sleep() 会直接抛出 InterruptedException:
fun main() = runBlocking {
val thread = object : Thread() {
override fun run() {
println("Thread: I'm running!")
sleep(200)
println("Thread: I'm done!")
}
}.apply { start() }
thread.interrupt()
}
Thread: I'm running!
Exception in thread "Thread-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at com.kotlin.coroutine._2_structured_concurrency._4_thread_interruptKt$main$1$thread$1.run(4_thread_interrupt.kt:9)
为什么会出现这种情况,看源码:
public static void sleep(long millis, int nanos)
throws InterruptedException {
...
// The JLS 3rd edition, section 17.9 says: "...sleep for zero
// time...need not have observable effects."
if (millis == 0 && nanos == 0) {
// ...but we still have to handle being interrupted.
if (Thread.interrupted()) {
throw new InterruptedException();
}
return;
}
...
}
sleep() 内检测到中断标记位置位就会抛 InterruptedException,这实际上是一种被动接收中断的处理方式。因为外部已经调用 interrupt() 要结束线程了,那么你也没必要再继续执行 sleep() 的等待过程了,通过抛出异常的方式立即结束线程,可以在 catch 中进行线程结束时的收尾工作。这也是 Java 中为什么强制要求对 Thread.sleep() 添加 try-catch:
fun main() = runBlocking {
val thread = object : Thread() {
override fun run() {
println("Thread: I'm running!")
try {
sleep(200)
} catch (e: InterruptedException) {
// 由于 sleep() 内通过 interrupted() 检查标记位,此时 isInterrupted 被重置为 false
println("isInterrupted: $isInterrupted")
// 清理收尾工作,比如恢复对象状态、回收资源、关闭 IO 流、关闭数据库和网络连接等
println("Clearing...")
return
}
println("Thread: I'm done!")
}
}.apply { start() }
thread.interrupt()
}
运行结果:
Thread: I'm running!
isInterrupted: false
Clearing...
与 Thread.sleep() 类似的,所有涉及等待操作的方法,都会抛出 InterruptedException,比如 Object.wait()、Thread.join()、CountDownLatch.await(),目的也都是方便我们可以及时的以交互式的形式结束线程。
4、协程的取消
4.1 协作式取消
协程取消与线程的停止类似,都是交互式的,通过协程的 isActive 标记来判断(类似于线程的 isInterrupted):
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
if (!isActive) {
// 清理工作...做完后 return 结束线程
return@launch
}
// 耗时任务...
count++
if (count % 100_000_000 == 0) {
println(count)
}
if (count % 1_000_000_000 == 0) {
break
}
}
}
delay(1000)
job.cancel()
}
isActive 是 CoroutineScope 的扩展属性:
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public val CoroutineScope.isActive: Boolean
get() = coroutineContext[Job]?.isActive ?: true
coroutineContext[Job] 可以获取 CoroutineContext 中的 Job:
public interface Job : CoroutineContext.Element {
/**
* Key for [Job] instance in the coroutine context.
*/
public companion object Key : CoroutineContext.Key<Job>
}
也可以使用 CoroutineContext 的扩展属性 job 获得 Job 对象:
public val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't contain Job in it: $this")
二者的区别是 coroutineContext[Job] 返回的是 Job?,而扩展属性 CoroutineContext.job 返回的是 Job。拿到这个 Job 就可以访问它的 isActive 属性用于判断协程是否处于活动状态。当然,除了通过 Job,也可以像前面那样直接用 CoroutineScope.isActive 判断,还可以使用 CoroutineContext.isActive 判断:
public val CoroutineContext.isActive: Boolean
get() = get(Job)?.isActive ?: true
此外,与结束线程不同的点是,结束协程不用 return,而是抛出 CancellationException,协程会接住这个异常并把自己取消:
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
if (!coroutineContext.isActive) {
// 抛出 CancellationException 结束协程
throw CancellationException()
}
count++
if (count % 100_000_000 == 0) {
println(count)
}
if (count % 1_000_000_000 == 0) {
break
}
}
}
delay(2000)
job.cancel()
}
虽然从上述例子的运行结果来看,结束协程使用 return 或抛出 CancellationException 都可以,但是唯一正确的做法只有抛出 CancellationException。因为 return 只是结束当前协程的代码块,但是要结构化取消协程(结束相关的父子协程),只能通过抛 CancellationException 实现。
假如在结束协程时,不需要做清理工作,官方提供了 ensureActive() 便捷方法:
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
// 一共三种写法
// coroutineContext.ensureActive()
// coroutineContext.job.ensureActive()
ensureActive()
/*if (!coroutineContext.isActive) {
// 抛出 CancellationException 结束协程
throw CancellationException()
}*/
count++
if (count % 100_000_000 == 0) {
println(count)
}
if (count % 1_000_000_000 == 0) {
break
}
}
}
delay(2000)
job.cancel()
}
三种写法最终调用的都是 Job 的扩展函数:
public fun Job.ensureActive(): Unit {
if (!isActive) throw getCancellationException()
}
4.2 delay 不用进行协作
下面再看一段代码:
fun main() = runBlocking<Unit> {
// 由于 runBlocking 开启的是主线程的协程,为了不影响主线程的运行,所以启动
// 子协程时手动调整到 Default 上,但不论调与不调,对 Demo 的运行结果无影响,
// 只是提醒一下正确的启动方式,不要过多占用主线程
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
println("count: ${count++}")
delay(500)
}
}
delay(3000)
job.cancel()
}
这样输出几个数字后协程就会被取消:
count: 0
count: 1
count: 2
count: 3
count: 4
count: 5
虽然可以取消,但是似乎协程并没有像线程那样体现出“协作”取消的样子,调用一个 cancel() 就直接取消掉了,是因为不用协作式也可以取消协程吗?
其实不是的,本质上是因为协程内部的 delay(500) 与 Thread.sleep() 类似,在遇到取消的情况时,会抛出 CancellationException 异常结束协程。
因此你可以看到,协程取消与线程停止的方式是类似的,都是两种情况:
- 正常的协作式取消,在协程内部检查 isActive 标记位,如为 false 需要做收尾工作并抛出 CancellationException 结束协程(线程是检查 isInterrupted)
- 如果在执行挂起函数时,如协程的 delay() 时(注意是除了 suspendCoroutine 之外所有的挂起函数都是以这种方式),协程已经被取消,那么会通过抛出 CancellationException 的方式结束协程(线程是 sleep()、join() 等,抛出 InterruptedException)
第 2 点需要注意,在处理线程时,通常是通过 try-catch 捕获 InterruptedException 时,在 catch 内做收尾工作并通过 return 结束线程。但是在协程中,不要照搬通过 try-catch 捕获 CancellationException,因为这样会导致协程无法结构化取消:
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
println("count: ${count++}")
try {
delay(500)
} catch (e: CancellationException) {
println("Cancelled!")
}
}
}
job.cancel()
}
虽然能打印 catch 中的 log,但是协程并为真的结束:
...
count: 580069
Cancelled!
count: 580070
Cancelled!
count: 580071
Cancelled!
count: 580072
Cancelled!
count: 580073
...
所以在协程里,不光是不用像线程里一样 try-catch,写了可能反而会导致问题,因此要小心谨慎。如果是除了 CancellationException 以外的异常,可以通过 try-catch 处理。如果是 CancellationException,在 catch 时切记要在处理完收尾清理工作之后再将其抛出:
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
println("count: ${count++}")
try {
delay(500)
} catch (e: CancellationException) {
// 收尾清理工作...
println("Cancelled!")
// 为了保证协程能正常取消,最后需要再将 CancellationException
// 抛出。虽然看着有点奇怪,但这就是正常的,甚至常用的套路
throw e
}
}
}
job.cancel()
}
还有另一种常用方式就是不管是否是正常的结束(即不用 catch 判断),统统用 finally 处理:
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
var count = 0
while (true) {
println("count: ${count++}")
try {
delay(500)
} finally {
// 清理收尾工作...
println("Clearing...")
}
}
}
job.cancel()
}
协程取消抛出异常这种方式的作用范围更广。线程是只对 Thread.sleep()、Thread.join()、Object.wait() 这种等待式的方法抛 InterruptedException,而协程是对几乎所有挂起函数都抛 CancellationException,除了 suspendCoroutine 这个函数,它是不支持协作式取消的。也就是说,你在调用 cancel() 取消协程时,正在运行 suspendCoroutine,那么这个函数是没有反应的,不会取消协程。
4.3 结构化取消
协程的取消是结构化的,父协程的取消会带着所有子协程一起取消。
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val parentJob = scope.launch {
val childJob = launch {
println("Child job started")
delay(3000)
println("Child job finished")
}
}
delay(1000)
// 取消父协程,会导致其子协程 childJob 也被取消
parentJob.cancel()
delay(5000)
}
子协程因为被取消,只输出了第一句 log:
Child job started
Process finished with exit code 0
以上是最基本的结构化取消示例,现在有个问题,子协程能拒绝父协程的取消吗?
回顾取消的本质,取消协程时,调用 cancel() 会将当前协程内的 isActive 变为 false,然后在执行到对 isActive 的检查点时(在运行到对 isActive 的检查点之前,协程内的代码还是正常运行的),如果检测到 isActive 为 false 了就会抛出 CancellationException 结束当前协程。
那么在父协程取消时,会触发所有子协程的 cancel(),大致的流程如下:
- 外部对父协程调用 cancel() 进行取消,此时父协程将自己的 isActive 设置为 false,并且对所有子协程也调用 cancel() 让它们的 isActive 也变为 false
- 父协程与所有子协程在运行到 isActive 的代码检查点后,抛出 CancellationException。这个抛异常的过程,各个协程都是自己抛自己的,互相不影响
需要注意,每个协程抛异常的时间点是不确定的,因为每个协程运行的情况不同,虽然大家的 isActive 都被设置为 false 了,但是如果没运行到 isActive 的检查点之前是不会抛异常的。比如说,当前协程如果在执行 delay() 这种会检查 isActive 的函数,那可能该协程马上就抛异常;但如果协程在运行一般的业务代码,那么它就会运行一段时间,直到遇到 isActive 检查点再抛异常;甚至,协程后续没有检查点,那么它就会一直运行完,不抛异常。因此父协程早于或晚于子协程抛异常都是正常的。
现在再回到前面的问题,子协程可以拒绝父协程的取消要求吗?理论上可以,但是没有实际意义。协程取消时会做的三件事:
- 调用 cancel()
- cancel() 内把 isActive 置为 false
- 协程内设置 isActive 检查点进行协作式取消
前两点子协程是无法控制的,因为在父协程的 cancel() 中执行了,就剩下最后一点可以做文章:
fun main() = runBlocking<Unit> {
val scope = CoroutineScope(EmptyCoroutineContext)
val parentJob = scope.launch {
val childJob = launch {
println("Child job started")
try {
delay(3000)
} catch (e: CancellationException) {
// 这里不抛 CancellationException 即可,但是后续的 delay 也得这么干
}
println("Child job finished")
}
}
delay(1000)
// 取消父协程,会导致其子协程 childJob 也被取消
parentJob.cancel()
delay(5000)
}
极为不推荐这种行为。因为强行不让子协程取消,不仅打乱了协程正常的执行流程,还会拖着其父协程也无法停止(父协程会等待所有子协程完成才停止)。
4.4 不配合取消 NonCancellable
直接看下面的代码:
fun main() = runBlocking {
val scope = CoroutineScope(EmptyCoroutineContext)
val parentJob = scope.launch {
val childJob1 = launch {
println("Child job1 started")
delay(3000)
println("Child job1 finished")
}
val childJob2 = launch(NonCancellable) {
println("Child job2 started")
delay(3000)
println("Child job2 finished")
}
val childJob3 = launch(Job()) {
println("Child job3 started")
delay(3000)
println("Child job3 finished")
}
}
delay(1500)
parentJob.cancel()
delay(6000)
}
直接取消父协程,三个子协程的取消情况如何呢?输出结果:
Child job1 started
Child job2 started
Child job3 started
Child job3 finished
Child job2 finished
Process finished with exit code 0
只有 childJob1 被取消了,其余两个子协程没有因为父协程的取消而被取消掉。
先解释 childJob3 没被取消掉的原因,这个前面讲过,使用 launch 启动协程时,如果在参数中指定了 Job,那么启动的协程的父协程就是这个 Job。因此 childJob3 的父协程是 Job() 而不是 parentJob。所以 parentJob.cancel() 不会取消掉不是它子协程的 childJob3。
再看 childJob2,实际上原理是一样的,NonCancellable 是一个 Job 的单例:
public object NonCancellable : AbstractCoroutineContextElement(Job), Job {...}
NonCancellable 就是专门用于阻断父子协程的取消链条的(实际上与 childJob2 一样,都是阻断了父子协程关系),不能像普通 Job 那样使用。像 parent 和 children 属性都被赋值为空,也就是说 NonCancellable 不能作为内部协程(上例中的 childJob2)的父协程,连用于取消的 cancel() 都被废弃了:
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
override fun cancel(cause: Throwable?): Boolean = false // neve
NonCancellable 就是用于开启一些不希望被取消的任务的。那再思考,什么样的任务不希望被取消?一般有三类:
-
收尾工作:协程被调用 cancel() 之后,真正退出协程之前需要做的清理、收尾工作。由于收尾工作中可能会调用挂起函数(比如 Jetpack 的 Room 在进行相关数据库操作时用的是挂起函数的方式),而在协程已经要取消,isActive 为 false 的情况下,运行挂起函数会抛出 CancellationException 导致收尾工作被中断。为了让调用了挂起函数的收尾工作免于这种中断,需要使用 NonCancellable:
fun main() = runBlocking<Unit> { launch { if (!isActive) { // 结束协程前的收尾工作不希望被打断,因此通常会使用 withContext(NonCancellable) 包起来 withContext(NonCancellable) { // 收尾工作……用 delay 表示收尾工作中可能会调用到的挂起函数。比如这里如果使用 Jetpack // 的 Room 进行数据库操作。收尾时需要向数据库写入标记,这个写入操作就是一个挂起函数 delay(1000) } } }
-
如果取消将会很难收尾的业务代码。既然取消不好收尾,那干脆就不要取消了。
-
与协程的流程无关的操作,比如日志工作。日志与协程执行的业务代码无关,那么也就不需要随着协程的取消而取消。只不过这种情况下通常是用 launch(NonCancellable) 与协程并行,而不是 withContext(NonCancellable) 与协程串行挡着协程
注意 NonCancellable 通常就是搭配 withContext() 使用的,它不是给 launch、async 或其他协程构建器设计的。
在收尾工作的外面加上 withContext(NonCancellable) 防止收尾工作被打断。这里注意,只有收尾工作包含挂起函数时才需要。
第二点,比如我们在协程中调用写文件的挂起函数:
suspend fun writeInfo() = withContext(Dispatchers.IO) {
// 1
// write to file
// 2
}
由于 writeInfo() 被 withContext() 包着且内部没有挂起函数,因此当调用 writeInfo() 的协程要被取消时,要么就是在代码 1 处,还没开始写文件,或者是在代码 2 处,文件已经写完。这样满足我们写文件不写一半,要么全写完,要么全没写的诉求。因此这种情况下,不用再做其他收尾工作。
但假如 writeInfo() 内部多加一个逻辑,比如要在写一段文件之后,从数据库中读取一段数据,将数据整合后继续写入文件中:
suspend fun writeInfo() = withContext(Dispatchers.IO) {
// 1.write to file
// 2.read from database(Room suspend function)
// 3.continue to write to file
}
假如在做第 1 步时,协程被取消了,那么运行到第 2 步的挂起函数时,就会抛异常使得文件只写了一部分。
为了避免这种情况,有两种解决方案。一是将挂起函数用 try-catch 包起来,在 catch 中做收尾工作,把第 1 步写的内容撤销掉:
suspend fun writeInfo() = withContext(Dispatchers.IO) {
// 1.write to file
try {
// 2.read from database(Room suspend function)
} catch (e: CancellationException) {
// rollback step1
throw e
}
// 3.continue to write to file
}
但是撤销已经写入文件的内容这个操作不太好做,因此有第二种方案,给 withContext() 加上 NonCancellable 不支持取消:
suspend fun writeInfo() = withContext(Dispatchers.IO + NonCancellable) {
// 1.write to file
// 2.read from database(Room suspend function)
// 3.continue to write to file
}