当前位置: 首页 > article >正文

Kotlin 协程基础三 —— 结构化并发(二)

本篇将继续结构化并发的话题,来介绍结构化异常管理的相关内容。

1、协程的结构化异常管理

如果一个协程抛异常,它所在的整个协程树上的其他协程(向上到根协程,向下到子孙协程)都会被取消,因此协程发生异常的后果是十分严重的。

1.1 协程的取消与异常

本质上,协程异常与取消采用的是同一套处理逻辑。只不过取消协程抛出的是一个特殊异常 CancellationException,并进行特殊处理以取消协程;而协程异常是抛出除 CancellationException 以外其他的异常。

先查看抛出异常前后父子协程的状态:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    var childJob: Job? = null
    val parentJob = scope.launch {
        childJob = launch {
            println("Child started")
            delay(3000)
            println("Child finished")
        }
        delay(1000)
        throw IllegalStateException("Wrong User!")
    }
    // 抛异常之前打印两个协程的状态
    delay(500)
    println("isActive: parent - ${parentJob.isActive}, child - ${childJob?.isActive}")
    println("isCancelled: parent - ${parentJob.isCancelled}, child - ${childJob?.isCancelled}")
    // 抛异常之后打印两个协程的状态
    delay(1500)
    println("isActive: parent - ${parentJob.isActive}, child - ${childJob?.isActive}")
    println("isCancelled: parent - ${parentJob.isCancelled}, child - ${childJob?.isCancelled}")
    delay(10000)
}

运行结果:

Child started
isActive: parent - true, child - true
isCancelled: parent - false, child - false
Exception in thread "DefaultDispatcher-worker-1" java.lang.RuntimeException: Exception while trying to handle coroutine exception
	at kotlinx.coroutines.CoroutineExceptionHandlerKt.handlerException(CoroutineExceptionHandler.kt:33)
	at 
...
isActive: parent - false, child - false
isCancelled: parent - true, child - true

显而易见,在父协程抛出异常后,父协程取消连带子协程一起被取消了。

注意,应用程序发生未被捕获的异常导致程序崩溃是 Android 的规则,而不是普通 JVM 的规则。JVM 中发生异常只会导致线程崩溃,而不会导致整个应用崩溃。因此上面的运行结果在崩溃后输出的父子协程状态数据也是正确的、可参考的。

假如让父协程抛出 CancellationException 代替 IllegalStateException,你会发现运行结果除了没有异常信息外,其余结果是一样的:

Child started
isActive: parent - true, child - true
isCancelled: parent - false, child - false
isActive: parent - false, child - false
isCancelled: parent - true, child - true

Process finished with exit code 0

这是因为,协程的取消与异常处理,在底层走的是同一套流程,只不过取消处理相对于较为完整的异常处理,过程有所简化。

1.2 取消与异常的不同点

虽然协程的取消与异常走的是同一套逻辑,但它们又有两点显著的不同:

  1. 取消的作用方向是单向的,只会取消自己与所有的下层协程;而异常的作用方向是双向的,除了取消自己,也会向上取消所有上层协程,向下取消所有下层协程,即取消整个协程树
  2. 取消有两种方式,可以在协程内部抛 CancellationException,也可以在协程外部调用 cancel();而异常流程的触发只有一种方式,就是在协程内抛异常

我们结合源码来解释以上两点。

当取消子协程时,取消流程会调用到 JobSupport 的 childCancelled():

	// JobSupport 实现了 Job 接口,作为所有 Job 的父类被继承
	public open fun childCancelled(cause: Throwable): Boolean {
        if (cause is CancellationException) return true
        return cancelImpl(cause) && handlesException
    }

如果取消的原因是 CancellationException 就返回 true 不继续执行后续的 cancelImpl(),该函数内部正是处理取消父协程的代码。

为什么会有这种区别?这实际上体现了协程的设计思想。

两个协程之所以被写成父子协程的关系,是因为二者在逻辑上有相互包含的关系(当然,从运行角度,二者是并行关系),子协程执行的内容通常是父协程任务的子流程。

从正常的逻辑上讲,当一个大流程被取消时,它内部的子流程也就没用了。因此,才会给协程设计这种取消时连带的性质。类似的,父协程等待所有子协程完成后再结束,也是因为所有子流程完成后整个大流程才算完成,所以才有父协程等待子协程的性质。

子协程取消,对外部大流程而言可能只是一个正常的事件而已,因此子协程的取消不会导致父协程的取消。

但如果子协程抛异常了,通常意味着父协程被损坏,影响整个大流程。因此子协程抛异常,会导致父协程也以该异常为原因而取消。

当然,协程源码实际上也存在通过 cancel() 触发异常流程的函数:

    @Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.2.0, binary compatibility with versions <= 1.1.x")
    public fun cancel(cause: Throwable? = null): Boolean

但是明显能看到该函数是 HIDDEN 的,不提供给开发者使用。实际上也很好理解,取消函数就触发取消流程,不要触发异常流程搞得逻辑混乱。

关于异常还需注意一点,用于取消的 CancellationException 除了用来处理取消协程之外就别无用处;而异常流程里抛出的异常对象除了用来取消协程,还会把该异常对象暴露给线程。直接体现就是本节举例的,协程抛出 IllegalStateException 导致线程崩溃输出 log 信息。

2、CoroutineExceptionHandler

先从一个例子看起:

// 示例代码 1
fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    scope.launch {
        try {
            throw RuntimeException()
        } catch (e: Exception) {
            
        }
    }
    delay(10000)
}

在协程内部将有可能抛出异常的代码用 try-catch 包起来,可以捕获异常。但倘若将 try-catch 挪到协程 launch 之外:

// 示例代码 2
fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)

    try {
        scope.launch {
            throw RuntimeException()
        }
    } catch (e: Exception) {

    }

    delay(10000)
}

这个异常就无法被捕获。这种形式类似于:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)

    try {
        thread {
            throw RuntimeException()
        }
    } catch (e: Exception) {

    }

    delay(10000)
}

子线程内抛出的异常,在主线程去捕获,肯定是捕获不到的。对于协程也是一样的,像示例代码 2 那样,try-catch 只能捕获到协程启动阶段可能抛出的异常,也就是如果 scope.launch() 抛异常了,在它外面的 try-catch 可以捕获到。但对于协程运行阶段的代码,也就是 launch() 大括号里面的代码,它是运行在 Default 线程池的,与 try-catch 都不在一个线程中,因此无法被捕获。想要捕获协程运行阶段的代码,可以像示例代码 1 那样把 try-catch 放在协程内部。

那么真的就没办法在协程外部捕获协程内部的异常了吗?答案是有,可以使用 CoroutineExceptionHandler,将其对象传给最外层协程

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }
    // 这个 try-catch 无法捕获到 launch 内的异常
    try {
        scope.launch(handler) {
            throw RuntimeException()
        }
    } catch (e: Exception) {

    }

    delay(10000)
}

运行输出结果,异常没有抛出:

Caught java.lang.RuntimeException

使用 CoroutineExceptionHandler 一定要注意的是,CoroutineExceptionHandler 对象只能设置给最外层协程,这样最外层协程本身及其所有子协程所抛出的异常才可被捕获。如果设置给内层的某个子协程,即便是该子协程抛出异常也无法被捕获:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught $exception")
    }

    scope.launch(/*handler*/) {
        // 这样还是会抛异常
        launch(handler) {
            throw RuntimeException()
        }
    }

    delay(10000)
}

3、异常结构化管理的本质

3.1 UncaughtExceptionHandler

捕获线程异常,除了 try-catch,还可以使用 UncaughtExceptionHandler:

fun main() = runBlocking<Unit> {
    val thread = Thread {
        throw RuntimeException("Thread error!")
    }
    // 在线程启动之前设置 UncaughtExceptionHandler
    thread.setUncaughtExceptionHandler { _, e -> println("Caught $e") }
    thread.start()
}

在线程启动之前,可以调用 setUncaughtExceptionHandler() 给该线程设置异常捕获,运行结果如下:

Caught java.lang.RuntimeException: Thread error!

除了给单个线程设置,也可以给所有线程设置默认的 UncaughtExceptionHandler:

fun main() = runBlocking<Unit> {
    Thread.setDefaultUncaughtExceptionHandler { _, e -> println("Default caught: $e") }

    val thread = Thread {
        throw RuntimeException("Thread error!")
    }
    thread.setUncaughtExceptionHandler { _, e -> println("Caught $e") }
    thread.start()

    thread {
        throw IllegalStateException("Wrong User!")
    }
}

运行结果:

Caught java.lang.RuntimeException: Thread error!
Default caught: java.lang.IllegalStateException: Wrong User!

结果显示,对于单独设置了 UncaughtExceptionHandler 的线程,会使用单独设置的 UncaughtExceptionHandler 捕获异常。未设置的线程会使用默认的 UncaughtExceptionHandler 捕获异常。

对于异常处理要有一些深入的思考。比如对于 Android 应用而言,发生未捕获的异常会发生 FC 导致应用崩溃关闭。为了避免 FC 的发生,在写代码的时候,对于可能发生异常的代码,我们会用 try-catch 把它包起来,在 catch 代码块中做一些补救工作,比如尝试解决问题(重新执行函数或者重置变量值等等)或重启应用。而不是单单只为了通过 catch 吞掉这个异常而不发生 FC,因为即便不 FC,抛异常也意味着你的应用没有正常工作。盲目的吞掉异常让软件在异常状态下继续运行,可能会产生无法预料的结果。

对于能提前预料的异常,可以使用 try-catch。但是往往会有没有预判的异常出现,跑出 catch 的捕获,最终被 UncaughtExceptionHandler 接收到。此时线程已经运行结束了,没有机会做补救工作了,只能做一些善后工作,使用通用的解决方案优雅地结束应用。一般有两步:收尾工作(如记录崩溃日志)以及重启或杀死应用:

Thread.setDefaultUncaughtExceptionHandler { _, e ->
    // 记录崩溃日志
    println("Default caught: $e")
    // 结束或重启应用
    exitProcess(1)
}

对于为单个线程设置的 UncaughtExceptionHandler,如果该线程只是在内部完成自己的工作,其崩溃不影响其他线程,可以尝试在 UncaughtExceptionHandler 中重启线程。当然,一个线程独立完成一件事的场景比较少见。

3.2 CoroutineExceptionHandler

让我们再来看协程。如果没有捕获协程的异常,它最终会抛到线程的环境中,通过默认的 UncaughtExceptionHandler 可以捕获协程抛出的异常:

fun main() = runBlocking<Unit> {
    // 协程异常会被线程的 DefaultUncaughtExceptionHandler 捕获
    Thread.setDefaultUncaughtExceptionHandler { _, e ->
        println("Caught in DefaultUncaughtExceptionHandler: $e")
    }

    val scope = CoroutineScope(EmptyCoroutineContext)
	// 协程没有捕获异常
    val job = scope.launch {
        launch {
            throw RuntimeException()
        }
    }

    job.join()
}

使用 CoroutineExceptionHandler 为 job 这个协程树添加异常捕获处理,可以实现类似于为单个线程设置 UncaughtExceptionHandler 的效果:

fun main() = runBlocking<Unit> {
    Thread.setDefaultUncaughtExceptionHandler { _, e ->
        println("Caught in DefaultUncaughtExceptionHandler: $e")
    }

    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught in CoroutineExceptionHandler: $exception")
    }

    val job = scope.launch(handler) {
        launch {
            throw RuntimeException()
        }
    }

    job.join()
}

异常会被 CoroutineExceptionHandler 捕获:

Caught in CoroutineExceptionHandler: java.lang.RuntimeException

上一节我们说,很多任务都是通过多个线程完成的,因此在单个线程的 UncaughtExceptionHandler 做重启线程的适用场景并不多。但是我们看 CoroutineExceptionHandler 正是为一个协程树做异常善后工作所用的,将一个任务放在协程树中去执行,当某一个协程发生异常后,可以在 CoroutineExceptionHandler 中重启整个协程树,这就具有更广泛的实际意义了。

当然,CoroutineExceptionHandler 并不能替代 DefaultUncaughtExceptionHandler 去对整个应用做未知异常的善后工作,它只能针对一棵协程树。即便是纯协程应用,也要通过 DefaultUncaughtExceptionHandler 来做通用拦截。

现在再来考虑,子协程的异常为什么要交到最外层父协程那里去注册 CoroutineExceptionHandler?因为注册 CoroutineExceptionHandler 的目的是善后,不管是哪个协程发生了异常,都是对整个协程树进行善后。因此设置给最外层协程最方便。

协程异常的结构化管理的本质,是针对协程发生的未知异常善后方案。因为已知异常,直接通过在协程内部 try-catch 就可以修复,只有未知异常才会走到结构化异常处理流程。

4、async 的异常处理

async 的异常处理与 launch 的大致相同,但是因为 async 启动的协程,往往需要通过 await 获取结果,会有两点比较明显的差异:

  1. 如果在 async 内发生异常,那么调用 await 的协程会受到双重影响
  2. async 启动的协程在 await 抛出异常,这个异常往往在协程内部就被 try-catch 捕获了。因此 async 作为最外层父协程存在时,不会将内部异常抛出到线程世界,给最外层的 async 设置的 CoroutineExceptionHandler 一般不会起作用

如果在 async 块中发生了异常,这个异常会被包装在 Deferred 对象中,然后在调用 await 方法时会抛出这个异常。

先来看第一点,示例代码:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught in CoroutineExceptionHandler: $exception")
    }

    val job = scope.launch(handler) {
        val deferred = async {
            delay(1000)
            throw RuntimeException()
        }
        launch {
            // await 也会抛出 async 内抛出的 RuntimeException
            try {
                deferred.await()
            } catch (e: Exception) {
                println("Caught in await: $e")
            }

            // delay 用来验证 async 抛出的异常触发了结构化取消,导致
            // async -> job -> 当前协程被取消
            try {
                delay(1000)
            } catch (e: Exception) {
                println("Caught in delay: $e")
            }
        }
    }

    job.join()
}

运行结果:

Caught in await: java.lang.RuntimeException
Caught in delay: kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=StandaloneCoroutine{Cancelling}@6463030a
Caught in CoroutineExceptionHandler: java.lang.RuntimeException

观察结果:

  • 因为调用 await 时才会抛出 async 内抛出的异常,所以它的 try-catch 先捕获到异常
  • 然后由于 async 抛的异常先让自己,也就是 deferred 被取消,进而导致父协程 job 被取消,进一步导致 delay 所在的协程也要被取消,所以 delay 抛出 CancellationException
  • async 与 await 抛出的异常最终会被最外层协程的 CoroutineExceptionHandler 捕获

以上过程能看出,调用 await 的协程实际上受到双重影响:一是 await 会抛出与 async 抛出的异常导致该协程进入异常处理流程;二是 async 抛出异常使得父协程与兄弟协程都会被取消,await 所在协程还会触发取消流程。

再看第二点差异,示例代码:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught in CoroutineExceptionHandler: $exception")
    }
    scope.async {
        val deferred = async {
            delay(1000)
            throw RuntimeException("Error!")
        }
        launch(Job()) {
            try {
                deferred.await()
            } catch (e: Exception) {
                println("Caught in await: $e")
            }
        }
    }

    delay(3000)
}

由于 await 抛异常会被 try-catch 捕获,因此 async 就不会向线程世界抛出异常,此时给 async 设置 CoroutineExceptionHandler 是多余的。但倘若没有为 await 添加 try-catch,其异常还是会被最外层的 async 抛到线程世界,需要 CoroutineExceptionHandler:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val handler = CoroutineExceptionHandler { _, exception ->
        println("Caught in CoroutineExceptionHandler: $exception")
    }
    scope.async(handler) {
        val deferred = async {
            delay(1000)
            throw RuntimeException("Error!")
        }
        launch(Job()) {
            deferred.await()
        }
    }

    delay(3000)
}

运行结果:

Caught in Coroutine: java.lang.RuntimeException: Error!

5、SupervisorJob

5.1 SupervisorJob 的作用

SupervisorJob 源码:

@Suppress("FunctionName")
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    // 子协程被取消,父协程会调用 childCancelled
    override fun childCancelled(cause: Throwable): Boolean = false
}

子协程被取消,父协程会调用 childCancelled()。但对于 SupervisorJob 而言,它直接返回 false 表示子协程发生一般异常(非 CancellationException)时,不取消父协程:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val supervisorJob = SupervisorJob()
    scope.launch(supervisorJob) {
        throw RuntimeException("Error!")
    }
    delay(100)
    println("Parent job cancelled: ${supervisorJob.isCancelled}")
    delay(1000)
}

在抛出异常后仍然会输出:

Parent job cancelled: false

说明子协程的异常没有导致 supervisorJob 被取消。

5.2 SupervisorJob 的常用方式

使用 SupervisorJob 开发时一种常见的格式:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val supervisorJob = SupervisorJob()
    val job = scope.launch {
        // coroutineContext 是 scope 的,job 也就是 scope 内的 job 作为
        // SupervisorJob 的父协程,SupervisorJob 又作为这个 launch 启动
        // 协程的父协程,作为连接内外的链条
        launch(SupervisorJob(coroutineContext.job)) {
            throw RuntimeException("Error!")
        }
    }
    delay(100)
    println("Parent job cancelled: ${supervisorJob.isCancelled}")
    println("Job cancelled: ${job.isCancelled}")
    delay(1000)
}

让 SupervisorJob 作为作为 job 的子协程,还作为内层 launch 启动协程的父协程,也就是让 SupervisorJob 作为中间链条。这样内层协程发生异常时不会取消父协程,但 job 取消时会取消所有子协程:

Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException: Error!
...
Parent job cancelled: false
Job cancelled: false

还有一种常用方式是将全新的 SupervisorJob 直接传给 CoroutineScope,这样由该 CoroutineScope 启动的协程,其 SupervisorJob 不会因为异常而被取消:

fun main() = runBlocking<Unit> {
    val supervisorJob = SupervisorJob()
    val scope = CoroutineScope(supervisorJob)
    var child1: Job? = null
    var child2: Job? = null
    val job = scope.launch {
        child1 = launch {
            child2 = launch() {
                throw RuntimeException("Error!")
            }
        }
    }
    delay(100)
    println("SupervisorJob cancelled: ${supervisorJob.isCancelled}")
    println("Job cancelled: ${job.isCancelled}")
    println("ChildJob1 cancelled: ${child1?.isCancelled}")
    println("ChildJob2 cancelled: ${child2?.isCancelled}")
    delay(1000)
}

运行结果:

Exception in thread "DefaultDispatcher-worker-3" java.lang.RuntimeException: Error!
...
SupervisorJob cancelled: false
Job cancelled: true
ChildJob1 cancelled: true
ChildJob2 cancelled: true

此外,SupervisorJob 的子协程内部抛出异常时,即便 SupervisorJob 的直接子协程不是最外层协程,也会由 SupervisorJob 将异常抛到线程中:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    scope.launch {
        val handler = CoroutineExceptionHandler { _, exception ->
            println("Caught in handler: $exception")
        }
        launch(SupervisorJob(coroutineContext.job) + handler) {
            launch {
                throw RuntimeException("Error!")
            }
        }
    }

    delay(1000)
}

按照前面说过的,将异常抛到线程中的应该是最外层协程,但是在中间的协程设置 SupervisorJob 为父 Job 的情况下,是由该 launch 将异常抛出到线程中的:

Caught in handler: java.lang.RuntimeException: Error!

http://www.kler.cn/a/500814.html

相关文章:

  • ROS Action接口
  • SQLAlchemy
  • 高级java每日一道面试题-2025年01月08日-微服务篇-负载平衡的意义什么 ?
  • 《新闻大厦抢先版》V0.18.105+Dlcs官方学习版
  • 【Linux】文件 文件描述符fd
  • 使用sqlplus的easy connect时如何指定是链接到shared server还是dedicated process
  • 国产3D CAD将逐步取代国外软件
  • Excel中身份证号码都变成E+乱码显示如何处理?
  • 2024 Java若依(RuoYi)框架视频教程(课件+示例代码+视频)
  • 【DevOps】Jenkins使用Pipline发布Web项目
  • WEB前端-3.1
  • 抖音矩阵是什么
  • 探索 Cloudflare Workers:高效边缘计算的新选择
  • 浅谈云计算02 | 云计算模式的演进
  • Flutter中Get.snackbar避免重复显示的实现
  • Gitlab-Runner配置
  • ModbusTCP转CCLINKIE在机器人中的革命性应用!
  • ConvNeXt V2: Co-designing and Scaling ConvNets with Masked Autoencoders论文解读
  • SUTD:偏好优化提升文本到音频效果
  • 理解 SQL 中NULL值对IN操作符的影响
  • 蓝桥杯历届真题 # 封闭图形个数(C++,Java)
  • Win32汇编学习笔记10.OD插件
  • Vue.js组件开发-如何使用day.js、luxon或date-fns处理日期时间
  • 【经管数据】ZF数字采购采购明细数据(2015.3-2024.3)
  • Mybatis——Mybatis开发经验总结
  • Vue 常用指令详解(附代码实例)