当前位置: 首页 > article >正文

Coroutine 基础二 —— 结构化并发(一)

1、“一个协程”到底指什么

为了讲结构化并发,需要先讲父子协程;讲父子协程,就需要先讲什么是“一个协程”。

课程用线程作为对比来引入协程概念。使用线程时,通常会认为 Thread 对象就是线程,除了 Thread 这个单词本身就是线程的意思之外,更本质的原因是,通过 Thread 对象,可以实现对线程这个抽象概念的管理。比如控制线程的运行流程(start、interrupt、join 等),获取线程的状态(isAlive 等等)。

那协程是否有一个对应的对象,就像 Thread 之于线程那样呢?严格来讲,并没有。Thread 类提供了所有对线程的操作,但是协程没有一个一模一样的类。因为协程做了细致的职责划分,使得没有一个类能囊括所有职责。

通常我们认为,Job 与 CoroutineScope 可以视为一个协程,但是它们的职责不同:

  • launch() 返回的 Job 与 async() 返回的 Job 的子类 Deferred 用于控制协程的执行流程相关的,它可以控制协程运行流程(start、interrupt、join 等),也可以获取协程的状态(isActive、isCancelled、isCompleted),还能获取父子协程(通过 parent 与 children 属性)
  • launch() 与 async() 大括号内的 CoroutineScope 对象是一个顶级的协程管理器,它可以启动一个全新的协程(Job.start() 只有在以 LAZY 模式创建的协程才需要使用),还可以获取到 ContinuationInterceptor 和 Job,并且通过追溯源码和打印 log 能发现,该 CoroutineScope 对象,实际上与 launch() 与 async() 返回的 Job 或 Deferred 是同一个对象。之所以是同一个对象(AbstractCoroutine)还做责任拆分,目的是为了让 API 更加精准,或者说为了避免 API 污染,不让没用的 API 出现在不该出现的地方(返回值是 Job 说明该位置只需要进行流程控制,而不需要 CoroutineScope 进行全方位的控制)

CoroutineScope 担任的是每个协程中大总管的角色,而 Job 仅负责流程相关的这部分职责,二者是有从属关系的。

课程中还提到 launch() 与 async() 闭包的大括号的内容也可以视为一个协程,只不过从技术角度讲没什么价值,只是在思考和探讨流程时非常便捷。

2、父子协程,以及协程间的并行和等待

结构化并发,其实就是父子协程的生命周期的各种关联。本节两个大问题:

  • 父子关系是如何确立的
  • 协程结束时,对父子协程会发生怎样的自动化影响

最直接的父子协程关系,就是在一个协程内部启动另一个协程:

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    var innerJob: Job? = null
    val job = scope.launch { // this:CoroutineScope
        innerJob = launch {
            // 这里要做延时,因为协程的父子关系会随着协程运行完毕而解绑,
            // 倘若不进行延时,在打印 log 时,由于协程已经运行完,那么
            // 二者就不再是父子协程,就会输出 false
            delay(500)
        }
    }

    val children = job.children
    println("children count: ${children.count()}") // 1
    println("innerJob === children.first(): ${innerJob === children.first()}") // true
    println("innerJob?.parent === job: ${innerJob?.parent === job}") // true
}

这样 innerJob 就是 job 的子协程,本质上是将 job.children 设置为 innerJob,将 innerJob.parent 设置为 job。

父子协程是如何拿到对方的 Job 实现上述的关系挂接的呢?

首先,innerJob 是在父协程 job 的 launch 内启动的,因此父协程能拿到子协程 Job 对象 —— innerJob。

其次,innerJob 在通过 launch 启动子协程时,本质上是 this.launch {…},this 是父协程 {} 内指示的 CoroutineScope,该 CoroutineScope 内包含该协程的 Job 对象 —— job。因此,子协程可以通过 this 拿到父协程的 Job 对象赋值给子协程的 parent 属性。

下面将启动子协程的代码进行修改:

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    var innerJob: Job? = null
    val job = scope.launch {
        // 使用 scope 启动子协程
        innerJob = scope.launch {
            delay(500)
        }
    }
}

使用 scope 代替隐式的 this 启动子协程,再进行关系验证,发现此时 innerJob 不再是 job 的子协程了。因为通过 scope 启动协程,拿到的就不再是通过 this 的 CoroutineScope 内含有的 Job —— job 对象,而是 scope 内含有的 Job 对象了,也就是说,此时 innerJob 的父协程就是 scope 内含有的 Job。也就是说,此时 job 与 innerJob 是兄弟协程而不再是父子协程了。

或许你会有疑问,scope 就是一个 CoroutineScope,它没有启动协程,哪里来的 Job 呢?看 CoroutineScope 的构造函数:

@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job()) // + 表示合并

如果参数传入的 CoroutineContext 没有 Job,会自动创建 Job 对象。所以,即便没启动协程,CoroutineScope 内也是含有 Job 对象的。

综上,我们固有观念中认为的,以代码结构的嵌套认定协程的父子关系的做法是不正确的,应该通过启动协程的 CoroutineScope 认定协程的父子关系。

原本判断时,通过代码结构认定父子协程能够成立的原因是,内部协程通过隐式 this 启动刚好使用了外部协程的 CoroutineScope:

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    var innerJob: Job? = null
    val job = scope.launch {
        innerJob = /*this.*/launch {
            delay(500)
        }
    }
}

明确父子协程关系的意义在于结构化并发(取消、异常管理、结束),结构化结束是指父协程会等待所有子协程都运行完毕后再结束自己。所有协程之间都是并行关系,包括兄弟、父子协程,以及没有任何关系的协程。

通过一个例子来了解结构化结束,也就是父协程会等待子协程结束后再结束(父协程的代码哪怕已经运行完了,父协程也会等待所有子协程都完成后再完成):

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    // 父子协程是并行运行的,但是因为子协程等待 100,父协程需要等待
    // 子协程运行完毕后再结束自己,所以在 job.join() 的位置,runBlocking
    // 协程会等待 job 大概 100ms 的时间
    val job = scope.launch {
        launch {
            delay(100)
        }
    }

    val startTime = System.currentTimeMillis()
    // 这里必须 join 一下,因为 runBlocking 开启的协程不是 job 的
    // 父协程,它不会等待 job 运行完再结束
    job.join()
    val duration = System.currentTimeMillis() - startTime
    println("Duration: $duration") // 输出在 100ms 左右
}

结构化结束的意义就在于这种表现,比如在做初始化工作时,一些后续工作需要等待初始化完成才能进行,此时可以:

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    // 初始化放在协程中进行,这样应用可以正常启动而不必被初始化流程卡着
    val initJob = scope.launch {
        // 初始化 1
        launch {  }
        // 初始化 2
        launch {  }
        // 其他初始化...
    }
    
    // 某些工作依赖初始化流程,因此在执行工作之前,先 join 一下等待初始化工作完成,
    // 后续工作才能安全执行
    scope.launch { 
        initJob.join()
        // 后续工作...
    }
}

3、线程结束

结束线程有两种方式:协作式(也称交互式)结束与强行结束。

stop() 会导致不可预期的结果(不管线程运行到哪里都直接暴力停止线程),给程序带来极大的不稳定性,因此被废弃了。

interrupt() 是协作式的结束线程,调用后会将线程的中断状态标记 isInterrupted 置为 true,线程内部通过检查该标记决定如何结束线程:

fun main() = runBlocking {
    val thread = object : Thread() {
        override fun run() {
            var count = 0
            while (true) {
                if (isInterrupted) {
                    // 清理工作...做完后 return 结束线程
                    return
                }
                // 耗时任务...
                count++
                if (count % 100_000_000 == 0) {
                    println(count)
                }
                if (count % 1_000_000_000 == 0) {
                    break
                }
            }
            println("Thread: I'm done!")
        }
    }.apply { start() }
    Thread.sleep(500)
    thread.interrupt()
}

需要注意的是,在检测到 isInterrupted 为 true,结束线程之前,需要做清理/收尾工作。比如线程原本是执行为图片添加滤镜工作的,中断时图片滤镜尚未添加完毕,就需要回退到原始图片状态。如果不进行收尾工作,那么 interrupt() 就与 stop() 没有多大区别了。

检查中断标记还可以使用 interrupted(),与 isInterrupted 标记不同的是,该方法会在调用时将 isInterrupted 置为 false。

此外,还应注意等待相关代码,比如在 Thread.sleep() 的过程中调用了 interrupt(),sleep() 会直接抛出 InterruptedException:

fun main() = runBlocking {
    val thread = object : Thread() {
        override fun run() {
            println("Thread: I'm running!")
            sleep(200)
            println("Thread: I'm done!")
        }
    }.apply { start() }
    thread.interrupt()
}
Thread: I'm running!
Exception in thread "Thread-0" java.lang.InterruptedException: sleep interrupted
	at java.base/java.lang.Thread.sleep(Native Method)
	at com.kotlin.coroutine._2_structured_concurrency._4_thread_interruptKt$main$1$thread$1.run(4_thread_interrupt.kt:9)

为什么会出现这种情况,看源码:

	public static void sleep(long millis, int nanos)
    throws InterruptedException {
        ...
        // The JLS 3rd edition, section 17.9 says: "...sleep for zero
        // time...need not have observable effects."
        if (millis == 0 && nanos == 0) {
            // ...but we still have to handle being interrupted.
            if (Thread.interrupted()) {
              throw new InterruptedException();
            }
            return;
        }

        ...
    }

sleep() 内检测到中断标记位置位就会抛 InterruptedException,这实际上是一种被动接收中断的处理方式。因为外部已经调用 interrupt() 要结束线程了,那么你也没必要再继续执行 sleep() 的等待过程了,通过抛出异常的方式立即结束线程,可以在 catch 中进行线程结束时的收尾工作。这也是 Java 中为什么强制要求对 Thread.sleep() 添加 try-catch:

fun main() = runBlocking {
    val thread = object : Thread() {
        override fun run() {
            println("Thread: I'm running!")
            try {
                sleep(200)
            } catch (e: InterruptedException) {
                // 由于 sleep() 内通过 interrupted() 检查标记位,此时 isInterrupted 被重置为 false
                println("isInterrupted: $isInterrupted")
                // 清理收尾工作,比如恢复对象状态、回收资源、关闭 IO 流、关闭数据库和网络连接等
                println("Clearing...")
                return
            }
            println("Thread: I'm done!")
        }
    }.apply { start() }
    thread.interrupt()
}

运行结果:

Thread: I'm running!
isInterrupted: false
Clearing...

与 Thread.sleep() 类似的,所有涉及等待操作的方法,都会抛出 InterruptedException,比如 Object.wait()、Thread.join()、CountDownLatch.await(),目的也都是方便我们可以及时的以交互式的形式结束线程。

4、协程的取消

4.1 协作式取消

协程取消与线程的停止类似,都是交互式的,通过协程的 isActive 标记来判断(类似于线程的 isInterrupted):

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            if (!isActive) {
                // 清理工作...做完后 return 结束线程
                return@launch
            }
            // 耗时任务...
            count++
            if (count % 100_000_000 == 0) {
                println(count)
            }
            if (count % 1_000_000_000 == 0) {
                break
            }
        }
    }
    delay(1000)
    job.cancel()
}

isActive 是 CoroutineScope 的扩展属性:

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public val CoroutineScope.isActive: Boolean
    get() = coroutineContext[Job]?.isActive ?: true

coroutineContext[Job] 可以获取 CoroutineContext 中的 Job:

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job>
}

也可以使用 CoroutineContext 的扩展属性 job 获得 Job 对象:

public val CoroutineContext.job: Job get() = get(Job) ?: error("Current context doesn't contain Job in it: $this")

二者的区别是 coroutineContext[Job] 返回的是 Job?,而扩展属性 CoroutineContext.job 返回的是 Job。拿到这个 Job 就可以访问它的 isActive 属性用于判断协程是否处于活动状态。当然,除了通过 Job,也可以像前面那样直接用 CoroutineScope.isActive 判断,还可以使用 CoroutineContext.isActive 判断:

public val CoroutineContext.isActive: Boolean
    get() = get(Job)?.isActive ?: true

此外,与结束线程不同的点是,结束协程不用 return,而是抛出 CancellationException,协程会接住这个异常并把自己取消:

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            if (!coroutineContext.isActive) {
                // 抛出 CancellationException 结束协程
                throw CancellationException()
            }
            count++
            if (count % 100_000_000 == 0) {
                println(count)
            }
            if (count % 1_000_000_000 == 0) {
                break
            }
        }
    }
    delay(2000)
    job.cancel()
}

虽然从上述例子的运行结果来看,结束协程使用 return 或抛出 CancellationException 都可以,但是唯一正确的做法只有抛出 CancellationException。因为 return 只是结束当前协程的代码块,但是要结构化取消协程(结束相关的父子协程),只能通过抛 CancellationException 实现。

假如在结束协程时,不需要做清理工作,官方提供了 ensureActive() 便捷方法:

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            // 一共三种写法
//            coroutineContext.ensureActive()
//            coroutineContext.job.ensureActive()
            ensureActive()
            /*if (!coroutineContext.isActive) {
                // 抛出 CancellationException 结束协程
                throw CancellationException()
            }*/
            count++
            if (count % 100_000_000 == 0) {
                println(count)
            }
            if (count % 1_000_000_000 == 0) {
                break
            }
        }
    }
    delay(2000)
    job.cancel()
}

三种写法最终调用的都是 Job 的扩展函数:

public fun Job.ensureActive(): Unit {
    if (!isActive) throw getCancellationException()
}

4.2 delay 不用进行协作

下面再看一段代码:

fun main() = runBlocking<Unit> {
    // 由于 runBlocking 开启的是主线程的协程,为了不影响主线程的运行,所以启动
    // 子协程时手动调整到 Default 上,但不论调与不调,对 Demo 的运行结果无影响,
    // 只是提醒一下正确的启动方式,不要过多占用主线程
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            println("count: ${count++}")
            delay(500)
        }
    }
    delay(3000)
    job.cancel()
}

这样输出几个数字后协程就会被取消:

count: 0
count: 1
count: 2
count: 3
count: 4
count: 5

虽然可以取消,但是似乎协程并没有像线程那样体现出“协作”取消的样子,调用一个 cancel() 就直接取消掉了,是因为不用协作式也可以取消协程吗?

其实不是的,本质上是因为协程内部的 delay(500) 与 Thread.sleep() 类似,在遇到取消的情况时,会抛出 CancellationException 异常结束协程。

因此你可以看到,协程取消与线程停止的方式是类似的,都是两种情况:

  1. 正常的协作式取消,在协程内部检查 isActive 标记位,如为 false 需要做收尾工作并抛出 CancellationException 结束协程(线程是检查 isInterrupted)
  2. 如果在执行挂起函数时,如协程的 delay() 时(注意是除了 suspendCoroutine 之外所有的挂起函数都是以这种方式),协程已经被取消,那么会通过抛出 CancellationException 的方式结束协程(线程是 sleep()、join() 等,抛出 InterruptedException)

第 2 点需要注意,在处理线程时,通常是通过 try-catch 捕获 InterruptedException 时,在 catch 内做收尾工作并通过 return 结束线程。但是在协程中,不要照搬通过 try-catch 捕获 CancellationException,因为这样会导致协程无法结构化取消:

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            println("count: ${count++}")
            try {
                delay(500)
            } catch (e: CancellationException) {
                println("Cancelled!")
            }
        }
    }
    job.cancel()
}

虽然能打印 catch 中的 log,但是协程并为真的结束:

...
count: 580069
Cancelled!
count: 580070
Cancelled!
count: 580071
Cancelled!
count: 580072
Cancelled!
count: 580073
...

所以在协程里,不光是不用像线程里一样 try-catch,写了可能反而会导致问题,因此要小心谨慎。如果是除了 CancellationException 以外的异常,可以通过 try-catch 处理。如果是 CancellationException,在 catch 时切记要在处理完收尾清理工作之后再将其抛出:

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            println("count: ${count++}")
            try {
                delay(500)
            } catch (e: CancellationException) {
                // 收尾清理工作...
                println("Cancelled!")
                // 为了保证协程能正常取消,最后需要再将 CancellationException
                // 抛出。虽然看着有点奇怪,但这就是正常的,甚至常用的套路
                throw e
            }
        }
    }
    job.cancel()
}

还有另一种常用方式就是不管是否是正常的结束(即不用 catch 判断),统统用 finally 处理:

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Default) {
        var count = 0
        while (true) {
            println("count: ${count++}")
            try {
                delay(500)
            } finally {
                // 清理收尾工作...
                println("Clearing...")
            }
        }
    }
    job.cancel()
}

协程取消抛出异常这种方式的作用范围更广。线程是只对 Thread.sleep()、Thread.join()、Object.wait() 这种等待式的方法抛 InterruptedException,而协程是对几乎所有挂起函数都抛 CancellationException,除了 suspendCoroutine 这个函数,它是不支持协作式取消的。也就是说,你在调用 cancel() 取消协程时,正在运行 suspendCoroutine,那么这个函数是没有反应的,不会取消协程。

4.3 结构化取消

协程的取消是结构化的,父协程的取消会带着所有子协程一起取消。

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val parentJob = scope.launch {
        val childJob = launch {
            println("Child job started")
            delay(3000)
            println("Child job finished")
        }
    }
    delay(1000)
    // 取消父协程,会导致其子协程 childJob 也被取消
    parentJob.cancel()
    delay(5000)
}

子协程因为被取消,只输出了第一句 log:

Child job started

Process finished with exit code 0

以上是最基本的结构化取消示例,现在有个问题,子协程能拒绝父协程的取消吗?

回顾取消的本质,取消协程时,调用 cancel() 会将当前协程内的 isActive 变为 false,然后在执行到对 isActive 的检查点时(在运行到对 isActive 的检查点之前,协程内的代码还是正常运行的),如果检测到 isActive 为 false 了就会抛出 CancellationException 结束当前协程。

那么在父协程取消时,会触发所有子协程的 cancel(),大致的流程如下:

  • 外部对父协程调用 cancel() 进行取消,此时父协程将自己的 isActive 设置为 false,并且对所有子协程也调用 cancel() 让它们的 isActive 也变为 false
  • 父协程与所有子协程在运行到 isActive 的代码检查点后,抛出 CancellationException。这个抛异常的过程,各个协程都是自己抛自己的,互相不影响

需要注意,每个协程抛异常的时间点是不确定的,因为每个协程运行的情况不同,虽然大家的 isActive 都被设置为 false 了,但是如果没运行到 isActive 的检查点之前是不会抛异常的。比如说,当前协程如果在执行 delay() 这种会检查 isActive 的函数,那可能该协程马上就抛异常;但如果协程在运行一般的业务代码,那么它就会运行一段时间,直到遇到 isActive 检查点再抛异常;甚至,协程后续没有检查点,那么它就会一直运行完,不抛异常。因此父协程早于或晚于子协程抛异常都是正常的。

现在再回到前面的问题,子协程可以拒绝父协程的取消要求吗?理论上可以,但是没有实际意义。协程取消时会做的三件事:

  1. 调用 cancel()
  2. cancel() 内把 isActive 置为 false
  3. 协程内设置 isActive 检查点进行协作式取消

前两点子协程是无法控制的,因为在父协程的 cancel() 中执行了,就剩下最后一点可以做文章:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val parentJob = scope.launch {
        val childJob = launch {
            println("Child job started")
            try {
                delay(3000)
            } catch (e: CancellationException) {
                // 这里不抛 CancellationException 即可,但是后续的 delay 也得这么干
            }
            println("Child job finished")
        }
    }
    delay(1000)
    // 取消父协程,会导致其子协程 childJob 也被取消
    parentJob.cancel()
    delay(5000)
}

极为不推荐这种行为。因为强行不让子协程取消,不仅打乱了协程正常的执行流程,还会拖着其父协程也无法停止(父协程会等待所有子协程完成才停止)。

4.4 不配合取消 NonCancellable

直接看下面的代码:

fun main() = runBlocking {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val parentJob = scope.launch {
        val childJob1 = launch {
            println("Child job1 started")
            delay(3000)
            println("Child job1 finished")
        }

        val childJob2 = launch(NonCancellable) {
            println("Child job2 started")
            delay(3000)
            println("Child job2 finished")
        }

        val childJob3 = launch(Job()) {
            println("Child job3 started")
            delay(3000)
            println("Child job3 finished")
        }
    }
    delay(1500)
    parentJob.cancel()
    delay(6000)
}

直接取消父协程,三个子协程的取消情况如何呢?输出结果:

Child job1 started
Child job2 started
Child job3 started
Child job3 finished
Child job2 finished

Process finished with exit code 0

只有 childJob1 被取消了,其余两个子协程没有因为父协程的取消而被取消掉。

先解释 childJob3 没被取消掉的原因,这个前面讲过,使用 launch 启动协程时,如果在参数中指定了 Job,那么启动的协程的父协程就是这个 Job。因此 childJob3 的父协程是 Job() 而不是 parentJob。所以 parentJob.cancel() 不会取消掉不是它子协程的 childJob3。

再看 childJob2,实际上原理是一样的,NonCancellable 是一个 Job 的单例:

public object NonCancellable : AbstractCoroutineContextElement(Job), Job {...}

NonCancellable 就是专门用于阻断父子协程的取消链条的(实际上与 childJob2 一样,都是阻断了父子协程关系),不能像普通 Job 那样使用。像 parent 和 children 属性都被赋值为空,也就是说 NonCancellable 不能作为内部协程(上例中的 childJob2)的父协程,连用于取消的 cancel() 都被废弃了:

    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    override fun cancel(cause: Throwable?): Boolean = false // neve

NonCancellable 就是用于开启一些不希望被取消的任务的。那再思考,什么样的任务不希望被取消?一般有三类:

  1. 收尾工作:协程被调用 cancel() 之后,真正退出协程之前需要做的清理、收尾工作。由于收尾工作中可能会调用挂起函数(比如 Jetpack 的 Room 在进行相关数据库操作时用的是挂起函数的方式),而在协程已经要取消,isActive 为 false 的情况下,运行挂起函数会抛出 CancellationException 导致收尾工作被中断。为了让调用了挂起函数的收尾工作免于这种中断,需要使用 NonCancellable:

    fun main() = runBlocking<Unit> {
        launch {
            if (!isActive) {
            // 结束协程前的收尾工作不希望被打断,因此通常会使用 withContext(NonCancellable) 包起来
            withContext(NonCancellable) {
                // 收尾工作……用 delay 表示收尾工作中可能会调用到的挂起函数。比如这里如果使用 Jetpack 
                // 的 Room 进行数据库操作。收尾时需要向数据库写入标记,这个写入操作就是一个挂起函数
                delay(1000)
            }
        }
    }
    
  2. 如果取消将会很难收尾的业务代码。既然取消不好收尾,那干脆就不要取消了。

  3. 与协程的流程无关的操作,比如日志工作。日志与协程执行的业务代码无关,那么也就不需要随着协程的取消而取消。只不过这种情况下通常是用 launch(NonCancellable) 与协程并行,而不是 withContext(NonCancellable) 与协程串行挡着协程

注意 NonCancellable 通常就是搭配 withContext() 使用的,它不是给 launch、async 或其他协程构建器设计的。

在收尾工作的外面加上 withContext(NonCancellable) 防止收尾工作被打断。这里注意,只有收尾工作包含挂起函数时才需要。

第二点,比如我们在协程中调用写文件的挂起函数:

suspend fun writeInfo() = withContext(Dispatchers.IO) { 
    // 1
    // write to file
    // 2
} 

由于 writeInfo() 被 withContext() 包着且内部没有挂起函数,因此当调用 writeInfo() 的协程要被取消时,要么就是在代码 1 处,还没开始写文件,或者是在代码 2 处,文件已经写完。这样满足我们写文件不写一半,要么全写完,要么全没写的诉求。因此这种情况下,不用再做其他收尾工作。

但假如 writeInfo() 内部多加一个逻辑,比如要在写一段文件之后,从数据库中读取一段数据,将数据整合后继续写入文件中:

suspend fun writeInfo() = withContext(Dispatchers.IO) { 
    // 1.write to file
    // 2.read from database(Room suspend function)
    // 3.continue to write to file
} 

假如在做第 1 步时,协程被取消了,那么运行到第 2 步的挂起函数时,就会抛异常使得文件只写了一部分。

为了避免这种情况,有两种解决方案。一是将挂起函数用 try-catch 包起来,在 catch 中做收尾工作,把第 1 步写的内容撤销掉:

suspend fun writeInfo() = withContext(Dispatchers.IO) { 
    // 1.write to file
    try {
        // 2.read from database(Room suspend function)
    } catch (e: CancellationException) {
        // rollback step1
        throw e
    }
    // 3.continue to write to file
} 

但是撤销已经写入文件的内容这个操作不太好做,因此有第二种方案,给 withContext() 加上 NonCancellable 不支持取消:

suspend fun writeInfo() = withContext(Dispatchers.IO + NonCancellable) { 
    // 1.write to file
    // 2.read from database(Room suspend function)
    // 3.continue to write to file
} 

http://www.kler.cn/a/453485.html

相关文章:

  • 精选9个自动化任务的Python脚本精选
  • Go快速开发框架2.6.0版本更新内容快速了解
  • Leecode刷题C语言之字符串及其反转中是否存在同一子字符串
  • Python爬虫(入门+进阶)
  • STM32-笔记11-手写带操作系统的延时函数
  • 前端工作中问题点拆分
  • 鸿蒙Next状态管理V2 - @Param装饰器总结
  • Linux系统升级OpenSSH 9.8流程
  • CUDA与Microsoft Visual Studio不兼容问题
  • 深入解析 Pytest 钩子函数及二次开发过程
  • 【MySQL】索引 面试题
  • PostgreSQL自带的一个命令行工具pg_waldump
  • 免杀0到1之ShellCode与加载器
  • adb无法连接到安卓设备【解决方案】报错:adb server version (40) doesn‘t match this client (41);
  • 【形式化验证latency】2.AADL项目结构及语法(一)
  • 计算机组成原理的学习笔记(9)-- CPU·其一 CPU的基本概念/流水线技术/数据通路
  • docker compose deploy fate cluster
  • 免费证件照大师 3.3 | 界面极简的免费证件照制作软件,支持无水印导出
  • 大型3d模型应用内容的云推流之国产信创系统方案
  • python常见数组操作
  • Linux-----进程处理(子进程创建)
  • Java项目中Oracle数据库开发过程中相关内容
  • AI绘图丨中国风 古典 产品展台电商场景第三弹(附关键词)
  • 路由策略
  • 学习C++:关键字
  • 每天40分玩转Django:Django静态文件