Kotlin 使用虚拟线程并在低版本中自动切换到协程
在 Java 虚拟线程正式发布后,Kotlin 开发者迎来了并发编程的新选择。本文将揭示如何构建一个智能化的执行方法,在支持虚拟线程的环境(Java 19+)中享受轻量级线程的优势,同时在低版本运行时自动降级到协程机制,实现真正的版本自适应并发控制。
如何创建一个虚拟线程
在虚拟线程发布后有多种新增的API创建方式通过Executors
和 Thread
等。在这些类中现在了一些方法用于创建虚拟线程,同样的可以通过是否有这些方法来判断是否支持虚拟线程
- Executors
val p = Executors.newVirtualThreadPerTaskExecutor()
- Thread
Thread.ofVirtual().start {
runBlocking {
println("VirtualThread")
}
}
如何检测是否支持虚拟线程
如上面所说,虚拟线程发布后新增了API,通过反射访问API,如果不存在说明不存在API。不存在API就意味着不支持虚拟线程
/**
* 判断是否支持虚拟线程
* */
fun isSupportVirtualThread(): Boolean {
return virtualThreadExecutor != null
}
/**
* 反射获取虚拟线程的ExecutorService
* */
private val virtualThreadExecutor: ExecutorService? by lazy {
runCatching {
val clazz = Executors::class.java
val method = clazz.getMethod("newVirtualThreadPerTaskExecutor")
method.isAccessible = true
method.invoke(null) as ExecutorService
}.getOrNull()
}
开启虚拟线程,不存在则开启协程
通过检查是否支持虚拟线程来判断开启虚拟线程还是协程
if (isSupportVirtualThread()) {
Thread.ofVirtual().start {
runBlocking {
block()
}
}
} else {
CoroutineScope(Dispatchers.IO).launch {
block()
}
}
下面对其进行简单封装一下,方法返回一个封装类,类可以对虚拟线程和协程进行等待等操作
fun <T : Any> CoroutineScope.runLightweightThread(block: suspend () -> T) = runLightweightThread(this) {
block()
}
fun <T : Any> runLightweightThread(
cs: CoroutineScope = CoroutineScope(Dispatchers.IO),
block: suspend () -> T
): LightweightThread<T> {
return if (isSupportVirtualThread()) {
LightweightThread(virtualThreadExecutor!!.submit { runBlocking { block() } })
} else {
LightweightThread(job = cs.async { block() })
}
}
class LightweightThread<T : Any>(
private val thread: Future<*>? = null,
private val job: Deferred<T>? = null
) {
suspend fun coAwait(): T? = withContext(Dispatchers.IO) {
thread?.get()?.let { it as T }
} ?: job?.await()
fun await(): T? {
return runBlocking {
job?.await()
} ?: thread?.get()?.let { it as T }
}
fun join() {
thread?.get()
runBlocking {
job?.join()
}
}
fun cancel() {
thread?.cancel(true)
job?.cancel()
}
}
如何开一个虚拟线程的协程
通过 asCoroutineDispatcher
方法可以将ExecutorService
转为一个协程调度器
fun main() {
CoroutineScope(Dispatchers.VT).launch {
println(123)
}
}
val Dispatchers.VT: CoroutineDispatcher by lazy {
if (isSupportVirtualThread()) {
virtualThreadExecutor!!.asCoroutineDispatcher()
} else {
Dispatchers.IO
}
}