Kotlin 协程基础知识总结五 —— 通道、多路复用、并发安全
本篇内容:
- Channel:认识 Channel、容量与迭代、produce 与 actor、Channel 的关闭、BroadcastChannel
- 多路复用:什么是多路复用、复用多个 await、复用多个 Channel、SelectClause、Flow 实现多路复用
- 并发安全:协程的并发工具、Mutex、Semaphore
1、Channel
(P79)什么是 Channel
Channel 实际上是一个并发安全的队列,可以用来连接协程,实现协程间通信。
Channel 在概念上非常类似于 BlockingQueue,关键的区别在于,Channel 不使用阻塞的 put、take 操作,而使用挂起的 send、receive 操作。
(P79)使用 Channel 进行协程通信:
@OptIn(DelicateCoroutinesApi::class)
fun test01() = runBlocking {
val channel = Channel<Int>()
// 生产者协程
val producer = GlobalScope.launch {
for (i in 1..5) {
delay(1000)
channel.send(i)
println("send $i")
}
}
// 消费者协程
val consumer = GlobalScope.launch {
while (true) {
val value = channel.receive()
println("receive $value")
if (value == 5) {
break
}
}
}
joinAll(producer, consumer)
}
生产者每隔 1s 向通道发送一次数据,接收者不断地监听接收数据,运行结果如下:
send 1
receive 1
send 2
receive 2
send 3
receive 3
send 4
receive 4
send 5
receive 5
(P80)Channel 的容量:既然 Channel 是一个队列,队列一定存在缓冲区中,一旦缓冲区满了并且一直没有其他协程调用 receive 取走数据,send 就需要挂起。这样让发送端的节奏放慢,只要执行 send 就挂起,直到其他协程调用 receive 取走数据,让缓冲区有空间保存数据了让 send 继续。
实际上,创建 Channel 对象时可以通过构造函数指定容量大小:
public fun <E> Channel(
capacity: Int = RENDEZVOUS, // 常量 0
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {
if (onBufferOverflow == BufferOverflow.SUSPEND)
RendezvousChannel(onUndeliveredElement) // an efficient implementation of rendezvous channel
else
ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // support buffer overflow with buffered channel
}
CONFLATED -> {
require(onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
ConflatedChannel(onUndeliveredElement)
}
UNLIMITED -> LinkedListChannel(onUndeliveredElement) // ignores onBufferOverflow: it has buffer, but it never overflows
BUFFERED -> ArrayChannel( // uses default capacity with SUSPEND
if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1,
onBufferOverflow, onUndeliveredElement
)
else -> {
if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST)
ConflatedChannel(onUndeliveredElement) // conflated implementation is more efficient but appears to work in the same way
else
ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement)
}
}
容量 capacity 的默认值为 RENDEZVOUS,是常量 0,此时如果数据从缓冲区溢出时采用的措施 onBufferOverflow 也是默认值 BufferOverflow.SUSPEND 的话,那么构造函数返回的就是 RendezvousChannel —— 一种无缓冲通道:
internal open class RendezvousChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) {
protected final override val isBufferAlwaysEmpty: Boolean get() = true
protected final override val isBufferEmpty: Boolean get() = true
protected final override val isBufferAlwaysFull: Boolean get() = true
protected final override val isBufferFull: Boolean get() = true
}
无缓冲通道意味着发送方和接收方必须同时准备好,才能进行通信,否则它们会在通道中进行“会面(rendezvous)”,这也是该通道类型的命名来源。
RendezvousChannel 的作用:
- 同步通信:
RendezvousChannel
提供了一种同步的通信机制,发送方和接收方必须同时准备好才能进行通信。这种方式可以确保通信的安全性和可靠性。
- 阻塞特性:
- 当发送者试图向通道发送数据时,如果没有对应的接收者准备好接收,发送操作会被阻塞,直到有接收者准备好。同样,当接收者尝试接收数据时,如果没有对应的发送者准备好发送,接收操作也会被阻塞。
- 避免数据丢失:
- 由于是无缓冲通道,发送方发送的数据会直接传递给接收方,这样可以避免数据丢失或者发送方发送速度快于接收方处理速度时的数据堆积问题。
- 用途:
RendezvousChannel
适用于需要精确的同步和顺序性的场景,可以确保发送方和接收方之间的数据交换是按照特定顺序进行的。
(P81)迭代 Channel:Channel 本身像一个序列,读取时可以直接获取 Channel 的 Iterator。
代码示例:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test02() = runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
val producer = GlobalScope.launch {
for (x in 1..5) {
channel.send(x * x)
println("send ${x * x}")
}
}
val consumer = GlobalScope.launch {
val iterator = channel.iterator()
while (iterator.hasNext()) {
val element = iterator.next()
println("receive $element")
// 每隔 2s 接收一次数据
delay(2000)
}
}
joinAll(producer, consumer)
}
运行结果:
send 1
send 4
send 9
send 16
send 25
receive 1
receive 4
receive 9
receive 16
receive 25
send 端是在很短的时间内就输出了所有结果(到通道中),receive 端则是每 2s (从通道)读取一次数据并打印出来。这个情景适合在接收部分网络数据就执行刷新的情况。比如说小红书那种瀑布流,每个帖子都有一张封面图,你可以加载一部分封面图然后就通过 UI 刷新出来,然后再加载、再刷新。
(P82)produce 与 actor 是构造生产者与消费者的便捷方法。produce 启动生产者协程得到 ReceiveChannel,其他协程可以使用这个 Channel 接收数据。反之,actor 启动消费者协程:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test03() = runBlocking {
// 生成生产者 Channel
val receiveChannel: ReceiveChannel<Int> = GlobalScope.produce {
repeat(100) {
delay(1000)
send(it)
}
}
val consumer = GlobalScope.launch {
for (element in receiveChannel) {
println("received $element")
}
}
// 主线程需要等待 consumer 协程执行,receiveChannel 是一个
// Channel 不是协程,因此不用 join
consumer.join()
}
生产者每隔 1s 向 ReceiveChannel 发送一次数据,消费者开启一个协程,通过 in 对 receiveChannel 的迭代器进行遍历,输出 receiveChannel 中的内容。
actor 函数与 produce 类似,不过在 Kotlin 1.5 版本中已经将 actor 废弃,推荐使用 SendChannel 方式了。不过还是来看一下 actor 示例:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test04() = runBlocking {
val actor: SendChannel<Int> = GlobalScope.actor<Any> {
while (true) {
val element = receive()
println("received $element")
}
}
val producer = GlobalScope.launch {
for (x in 1..5) {
actor.send(x * x)
}
}
producer.join()
}
运行结果:
received 1
received 4
received 9
received 16
received 25
actor 函数被添加了 @ObsoleteCoroutinesApi 注解:
/**
* Marks declarations that are **obsolete** in coroutines API, which means that the design of the corresponding
* declarations has serious known flaws and they will be redesigned in the future.
* Roughly speaking, these declarations will be deprecated in the future but there is no replacement for them yet,
* so they cannot be deprecated right away.
*/
@MustBeDocumented
@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(level = RequiresOptIn.Level.WARNING)
public annotation class ObsoleteCoroutinesApi
标记在协程 API 中已经过时的声明,这意味着对应声明的设计存在严重已知缺陷,它们将在未来重新设计。简言之,这些声明将来会被弃用,但目前还没有替代方案,因此暂时无法立即弃用它们。
(P83)Channel 的关闭:
- produce 与 actor 返回的 Channel 都会随着对应的协程执行完毕而关闭。正因如此,Channel 才被称为热数据流
- 对于普通 Channel,调用它的 close 方法会立即停止接收新元素,它的 isClosedForSend 会立即返回 true。由于 Channel 缓冲区的存在,可能此时还有一些元素没有被处理完,因此要等所有元素都被读取之后 isClosedForReceive 才返回 true
- Channel 的生命周期最好由主导方来维护,建议由主导的一方实现关闭
示例代码:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test05() = runBlocking {
val channel = Channel<Int>(3)
val producer = GlobalScope.launch {
List(3) {
channel.send(it)
println("send $it")
}
channel.close()
println(
"""close channel.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}
""".trimMargin()
)
}
val consumer = GlobalScope.launch {
for (element in channel) {
println("receive $element")
delay(1000)
}
println(
"""After consuming.
| - ClosedForSend: ${channel.isClosedForSend}
| - ClosedForReceive: ${channel.isClosedForReceive}
""".trimMargin()
)
}
joinAll(producer, consumer)
}
运行结果:
receive 0
send 0
send 1
send 2
close channel.
- ClosedForSend: true
- ClosedForReceive: false
receive 1
receive 2
After consuming.
- ClosedForSend: true
- ClosedForReceive: true
发送方作为主导方,在发送完数据后关闭通道,此时 ClosedForSend 立即变为 true,而 ClosedForReceive 由于通道内还有数据未被接收,所以暂时还为 false。在接收方接收通道内所有数据后,再看 ClosedForReceive 就为 true 了。
(P84)BroadcastChannel:前面提到,发送端和接收端在 Channel 中存在一对多的情形,从数据处理本身来讲,虽然有多个接收端,但是同一个元素只会被一个接收端读到。广播则不然,多个接收端不存在互斥行为(发布订阅模式):
@OptIn(DelicateCoroutinesApi::class, ObsoleteCoroutinesApi::class)
@Test
fun test06() = runBlocking {
val broadcastChannel = BroadcastChannel<Int>(Channel.BUFFERED)
// 生产者协程,使用广播通道发送数据
val producer = GlobalScope.launch {
List(3) {
delay(100)
broadcastChannel.send(it)
println("send $it")
}
broadcastChannel.close()
}
// 接收者启三个协程接收数据
List(3) { index ->
GlobalScope.launch {
val receiveChannel = broadcastChannel.openSubscription()
for (element in receiveChannel) {
println("$index receive $element")
}
}
}.joinAll()
}
运行结果:
send 0
1 receive 0
2 receive 0
0 receive 0
1 receive 1
send 1
0 receive 1
2 receive 1
send 2
2 receive 2
1 receive 2
0 receive 2
三个协程异步接收到广播数据。普通 Channel 与广播 Channel 之间可以互相转换:
val channel = Channel<Int>()
val broadcastChannel = channel.broadcast(3)
需要注意的是,BroadcastChannel 从 Kotlin 1.5 开始被弃用,成为过时的 API 了。
2、多路复用
(P85)await 多路复用:数据通信系统或计算机网络系统中,传输媒体的带宽或容量往往大于传输单一信号的需求,为了有效地利用通信线路,希望一个信道同时传输多路信号,这就是所谓的多路复用技术(Multiplexing)。
复用多个 await 是指两个 API 分别从网络和本地缓存获取数据,Select 会进行选择,哪个数据先返回就用哪个数据进行展示:
代码示例,首先项目配置需要 Retrofit 以及转换器依赖:
def retrofit_version = "2.9.0"
implementation "com.squareup.retrofit2:retrofit:$retrofit_version"
implementation "com.squareup.retrofit2:converter-gson:$retrofit_version"
此外由于我们是在单元测试代码中运行,所以需要添加配置让单元测试可以访问网络:
android {
// 测试代码中如果需要访问网络,需要添加此配置
testOptions {
unitTests.returnDefaultValues = true
}
}
然后定义 Api 接口以便使用 Retrofit 去请求网络数据:
const val TAG = "UserApi"
data class User(val name: String, val address: String)
interface UserServiceApi {
@GET("user")
fun loadUser(@Query("name") name: String): Call<User>
@GET("user")
suspend fun getUser(@Query("name") name: String): User
}
val userServiceApi: UserServiceApi by lazy {
val okHttpClient = OkHttpClient.Builder()
.addInterceptor {
it.proceed(it.request()).apply { Log.d(TAG, "request: ${it.request()}") }
}
.build()
val retrofit = Retrofit.Builder()
.client(okHttpClient)
.baseUrl("https://www.xxx.com")
.addConverterFactory(GsonConverterFactory.create())
.build()
retrofit.create(UserServiceApi::class.java)
}
来到测试代码,使用 async 开两个协程,分别从本地缓存和网络请求数据:
private const val cachePath = "E://coroutine.cache"
private val gson = Gson()
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
// 故意增加一个延迟,来对比结果所用
delay(1000)
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
userServiceApi.getUser(name)
}
这里有一个小技巧就是 async 正常应该是在协程环境中使用的,粗暴的方式是直接 GlobalScope.async,但是由于 GlobalScope 会造成内存泄漏,因此稍好一点的方式就是将两个函数从原本的挂起函数改造成 CoroutineScope 的扩展函数。因为扩展函数会通过隐含的 this 去调用指定的方法,比如 CoroutineScope.getUserFromLocal 就是调用 this.async,而 this 就是 CoroutineScope 实例,相当于调用了 CoroutineScope.async 解决了 async 的调用环境问题。
最后就是多路复用的使用,通过 select 结合 onAwait 筛选出返回结果较快的那一组操作:
data class Response<T>(val value: T, val isLocal: Boolean)
fun test01() = runBlocking<Unit> {
GlobalScope.launch {
val localResult = getUserFromLocal("xxx")
val remoteResult = getUserFromRemote("yyy")
val userResponse = select<Response<User>> {
localResult.onAwait { Response(it, true) }
remoteResult.onAwait { Response(it, false) }
}
userResponse.value?.let { println(it) }
}.join()
}
localResult 与 remoteResult 是两个协程,它们在调用 onAwait 时实际会触发 await 获取结果,select 的作用就是选取返回结果较快的那一组作为结果。
(P86)复用多个 Channel:与 await 类似,会接收到最快的那个 Channel 消息:
fun test02() = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
GlobalScope.launch {
delay(100)
channels[0].send(200)
}
GlobalScope.launch {
delay(50)
channels[1].send(100)
}
// select 后的泛型是结果的数据类型,由于可能都没有收到,因此使用了可空类型
val result = select<Int?> {
channels.forEach { channel -> channel.onReceive { it } }
}
println(result)
}
运行结果:
100
因为第二个协程只挂起 50ms 就发送了 100,它比较快,因此最终结果为 100。
(P87)SelectClause
如何确定哪些事件可以被 select 呢?其实所有能被 select 的事件都是 SelectClauseN 类型,包括:
- SelectClause0:对应事件没有返回值, 如 join 没有返回值,那么 onJoin 就是 SelectClauseN 类型。使用时,onJoin 的参数是一个无参函数
- SelectClause1:对应事件有返回值,例如前面的 onAwait 和 onReceive
- SelectClause2:对应事件有返回值,此外还需要一个额外参数,例如 Channel.onSend 有两个参数,第一个是 Channel 类型的值,表示即将发送的值;第二个是发送成功时的回调参数
如果想确认挂起函数是否支持 select,只需要查看其是否存在对应的 SelectClauseN 回调即可。
先看一个 SelectClause0 的示例:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test03() = runBlocking<Unit> {
val job1 = GlobalScope.launch {
delay(100)
println("job 1")
}
val job2 = GlobalScope.launch {
delay(10)
println("job 2")
}
// select 的泛型是 Unit,可以省略的
select {
job1.onJoin { println("job1 is done") }
job2.onJoin { println("job2 is done") }
}
// 不需要 delay 或 join 就能看到打印结果
// delay(1000)
}
job2 只挂起 10ms,因此 select 会选择 job2.onJoin() 作为结果。onJoin 的类型为 SelectClause0:
/**
* 在join挂起函数的select表达式中选择的子句,表示当作业完成时选择。即使作业异常完成,
* 该子句也永远不会失败。
*/
public val onJoin: SelectClause0
SelectClause1 的 onAwait 和 onReceive 前面已有演示,这里直接看 SelectClause2 的示例:
@OptIn(DelicateCoroutinesApi::class)
@Test
fun test04() = runBlocking<Unit> {
val channels = listOf(Channel<Int>(), Channel<Int>())
println(channels)
launch(Dispatchers.IO) {
select<Unit?> {
launch {
delay(10)
channels[1].onSend(200) { sendChannel ->
println("sent on $sendChannel")
}
}
launch {
delay(100)
channels[0].onSend(100) { sendChannel ->
println("sent on $sendChannel")
}
}
}
}
GlobalScope.launch {
println(channels[0].receive())
}
GlobalScope.launch {
println(channels[1].receive())
}
delay(1000)
}
运行结果:
[RendezvousChannel@5a45133e{EmptyQueue}, RendezvousChannel@5e600dd5{EmptyQueue}]
200
sent on RendezvousChannel@5e600dd5{EmptyQueue}
select 内两个协程,只挂起 10ms 的会先出结果,因此 channel[1] 会发送一个 200 被接收 channel[1] 数据的协程接收到。
onSend 是 SelectClause2 类型的变量:
/**
* 选择发送挂起函数的select表达式中的子句,表示当指定为参数的元素发送到通道时进行选择。
* 当选择该子句时,对该通道的引用会传递到相应的代码块中。如果通道关闭以进行发送(请参阅关闭详情),
* 则select调用会因异常而失败。
*/
public val onSend: SelectClause2<E, SendChannel<E>>
SelectClause2 是一个接口(后面把 SelectClause0 和 SelectClause1 也贴出):
/**
* 选择带有额外类型为P的参数的select表达式的子句,用于选择类型为Q的值。
*/
public interface SelectClause2<in P, out Q> {
/**
* 将此子句与指定的[select]实例和代码块[block]注册:
* @suppress 这是不稳定的 API,可能会发生更改。
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause2(select: SelectInstance<R>, param: P, block: suspend (Q) -> R)
}
/**
* Clause for [select] expression without additional parameters that does not select any value.
*/
public interface SelectClause0 {
/**
* Registers this clause with the specified [select] instance and [block] of code.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause0(select: SelectInstance<R>, block: suspend () -> R)
}
/**
* Clause for [select] expression without additional parameters that selects value of type [Q].
*/
public interface SelectClause1<out Q> {
/**
* Registers this clause with the specified [select] instance and [block] of code.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (Q) -> R)
}
对 SelectClauseN 的理解,首先需要配合 select 使用,然后可以看成对应的不带 on 的挂起函数,只不过增加了一些额外操作:
- 比如 onJoin 后面接的 Lambda 表达式实际上是对 SelectClause0 接口的唯一方法 registerSelectClause0() 的实现,而对应的 join 就是一个普通的没有参数的挂起方法,它们的功能都是等待协程执行完毕
- 再比如 onAwait 作为 SelectClause1 接口类型的变量,需要在实现 registerSelectClause1() 的代码块中返回指定类型的数据作为结果,而 await 是一个返回指定类型数据的挂起函数,它们的功能都是等待结果
(P88)Flow 实现多路复用:多数情况下,可以通过构造合适的 Flow 实现多路复用的效果:
fun CoroutineScope.getUserFromLocal(name: String) = async(Dispatchers.IO) {
// 故意增加一个延迟,来对比结果所用
delay(1000)
File(cachePath).readText().let { gson.fromJson(it, User::class.java) }
}
fun CoroutineScope.getUserFromRemote(name: String) = async(Dispatchers.IO) {
userServiceApi.getUser(name)
}
fun test05() = runBlocking<Unit> {
val name = "guest"
coroutineScope {
listOf(::getUserFromLocal, ::getUserFromRemote) // List 包含两个函数类型的变量
.map { function -> function.call(name) } // 调用两个 async 函数,得到两个 Deferred
.map { deferred -> flow { emit(deferred.await()) } } // 获取 Deferred 的结果用流发射
.merge().collect { user -> println(user) } // 将两个流合并并收集流中数据
}
}
运行结果会得到本地和网络两种方式的 User 对象(当然,我们没有配置网络获取的网址,所以网络结果就是一个演示效果罢了):
User(name=Jack, address=New York)
User(name=Jason, address=California)
3、并发安全
(P89)不安全的并发访问:我们在使用线程解决并发问题时总会遇到线程安全问题,而 Java 平台上的 Kotlin 协程实现免不了存在并发调度的情况,因此线程安全同样值得留意。
如下代码演示了协程的并发问题:
fun test06() = runBlocking<Unit> {
var count = 0
List(1000) {
GlobalScope.launch { count++ }
}.joinAll()
println(count)
}
运行结果:
978
实际上跟线程是一样的,共享变量 count 被多个线程操作,由于 count++ 不是原子操作,所以某个线程在取 count 的值时,可能不是 count 最新的值,而是自加前的老值,因此最终结果小于 1000。
(P90)协程并发安全
Java 的线程并发机制是可用的,比如将 count 声明为原子类型,自加使用原子操作:
fun test06() = runBlocking<Unit> {
var count = AtomicInteger(0)
List(1000) {
GlobalScope.launch { count.getAndIncrement() }
}.joinAll()
println(count)
}
最终可输出 1000。
除了 Java 提供的机制,Kotlin 也提供了如下并发安全工具:
- Channel:并发安全的消息通道
- Mutex:轻量级锁,它的 lock 与 unlock 从语义上与线程锁比较类似,之所以轻量是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放
- Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当 Semaphore 的参数为 1 时,效果等价于 Mutex
Mutex 示例:
fun test07() = runBlocking<Unit> {
var count = 0
val mutex = Mutex()
List(1000) {
GlobalScope.launch {
mutex.withLock {
count++
}
}
}.joinAll()
println(count)
}
Semaphore 示例:
fun test08() = runBlocking<Unit> {
var count = 0
val semaphore = Semaphore(1)
List(1000) {
GlobalScope.launch {
semaphore.withPermit {
count++
}
}
}.joinAll()
println(count)
}
除了使用以上机制,也可以尽量避免访问外部可变状态。编写函数时要求它不得访问外部状态,只能基于参数做运算,通过返回值提供运算结果:
fun test09() = runBlocking<Unit> {
// 将 count 移到协程之外就不会有并发安全问题
var count = 0
val result = count + List(1000) {
GlobalScope.async { 1 }
}.sumOf { it.await() }
println(result)
}
这个例子比较极端。