Golang 并发 Cond条件变量
Golang 并发 Cond条件变量
背景
编写代码过程中, 通常有主协程和多个子协程进行协作的过程,比如通过 WaitGroup 可以实现当所有子协程完成之后, 主协程再继续执行。
如上的场景是主协程等待子协程达到某个状态再继续运行。 但是反过来怎么操作呢,要求一组子协程等待主协达到某个状态时才继续运行。这个时候就需要用到 Cond 了
简介
Cond 是和某个条件相关,在条件还没有满足的时候,所有等待这个条件的协程都会被阻塞住,只有这个条件满足的时候,等待的协程才可能继续进行下去。Cond 在初始化的时候,需要关联一个 Locker 接口的实例,一般会使用 Mutex 或者 RWMutex。Cond 关联的 Locker 实例可以通过 c.L 访问,它内部维护着一个先入先出的等待队列。
Cond 分别有三个方法如下所示:
Wait
会把当前协程放入Cond的等待队列中并阻塞,直到被Signal或者Broadcast方法从等待队列中移除并唤醒,用于子协程阻塞。
Signal
主协程唤醒等待队列中的一个子协程,先唤醒最先阻塞的子协程,被唤醒的子协程继续执行。
Broadcast
主协程唤醒等待队列中的全部协程,所有子协程继续执行。
注意:调用
Signal
和Broadcast
方法,不强求持有c.L的锁,调用Wait方法是必须要持有c.L的锁。
Signal的使用场景
大家都去医院先排队,然后等待叫号,先排队的先叫号。这次模拟有5个病人,分别先排队。 然后护士根据排队先后来叫号;
具体场景是,5个病人在三秒中之内分别排号,护士今天要叫5个号,一秒叫一个,叫完5个号就结束了
代码如下:
func TestCondSignal(t *testing.T) {
c := sync.NewCond(&sync.Mutex{})
num := 0
// 当前叫号是几号
hand_num := 0
for i := 0; i < 5; i++ {
go func(i int) {
// 分别在不同时间排队
time.Sleep(time.Second * time.Duration(rand.Int63n(10)))
c.L.Lock()
num++
// 当前取得号。
cur := num
fmt.Printf("%s %d 号病人取到了 %d 号\n", time.Now().Format("2006-01-02 15:04:05"), i, cur)
// 取到号了,等待叫号
c.Wait()
fmt.Printf("%s %d 号病人排队号是 %d 号,被叫号了\n", time.Now().Format("2006-01-02 15:04:05"), i, cur)
hand_num = cur
c.L.Unlock()
}(i)
}
// 都叫号了
for hand_num != 5 {
// 叫号
c.Signal()
time.Sleep(time.Second * 1)
}
time.Sleep(time.Second * 10)
}
代码输出:
=== RUN TestCondSignal
2024-02-06 13:49:55 0 号病人取到了 1 号
2024-02-06 13:49:56 4 号病人取到了 2 号
2024-02-06 13:49:56 3 号病人取到了 3 号
2024-02-06 13:49:56 0 号病人排队号是 1 号,被叫号了
2024-02-06 13:49:56 1 号病人取到了 4 号
2024-02-06 13:49:57 4 号病人排队号是 2 号,被叫号了
2024-02-06 13:49:58 3 号病人排队号是 3 号,被叫号了
2024-02-06 13:49:59 1 号病人排队号是 4 号,被叫号了
2024-02-06 13:50:02 2 号病人取到了 5 号
2024-02-06 13:50:02 2 号病人排队号是 5 号,被叫号了
--- PASS: TestCondSignal (18.09s)
PASS
结果表明,5个病人,分别在三秒钟内先后取号, 然后护士每过一秒钟按照排队的先后顺序叫一个号(叫号的过程依然有病人取号),先取号的被先叫号。
此场景中,5个病人相当于5个协程, 主协程反复使用Signal() 按照顺序一个个唤醒阻塞的子协程。
Broadcast的使用场景
场景为如下: 运动员跑步比赛,要求8秒内全部运动员准备好,然后等待教练发令, 教练10秒后发令,所有运动员在发令后开始跑。
func TestBroadcast(t *testing.T) {
c := sync.NewCond(&sync.Mutex{})
for i := 0; i < 10; i++ {
go func(i int) {
// 随机一个8秒内的准备时间
time.Sleep(time.Second * time.Duration(rand.Int63n(8)))
fmt.Printf("%s 运动员%d已准备就绪\n", time.Now().Format("2006-01-02 15:04:05"), i)
c.L.Lock()
// 准备完毕,等待教练发令
c.Wait()
c.L.Unlock()
fmt.Printf("%s 运动员%d开跑\n", time.Now().Format("2006-01-02 15:04:05"), i)
}(i)
}
// 主协程等待10秒后发令
time.Sleep(time.Second * 10)
fmt.Printf("%s 教练发令。\n", time.Now().Format("2006-01-02 15:04:05"))
// 教练发令。通知所有运动员开始跑步, 即唤起之前 wait()的所有协程
c.Broadcast()
// 等待跑步
time.Sleep(time.Second * 5)
}
代码输出如下
=== RUN TestBroadcast
2024-02-06 13:56:57 运动员4已准备就绪
2024-02-06 13:56:57 运动员7已准备就绪
2024-02-06 13:56:58 运动员8已准备就绪
2024-02-06 13:56:58 运动员3已准备就绪
2024-02-06 13:56:59 运动员9已准备就绪
2024-02-06 13:57:00 运动员2已准备就绪
2024-02-06 13:57:01 运动员5已准备就绪
2024-02-06 13:57:02 运动员1已准备就绪
2024-02-06 13:57:03 运动员6已准备就绪
2024-02-06 13:57:04 运动员0已准备就绪
2024-02-06 13:57:07 教练发令。
2024-02-06 13:57:07 运动员0开跑
2024-02-06 13:57:07 运动员9开跑
2024-02-06 13:57:07 运动员8开跑
2024-02-06 13:57:07 运动员3开跑
2024-02-06 13:57:07 运动员4开跑
2024-02-06 13:57:07 运动员5开跑
2024-02-06 13:57:07 运动员2开跑
2024-02-06 13:57:07 运动员1开跑
2024-02-06 13:57:07 运动员6开跑
2024-02-06 13:57:07 运动员7开跑
--- PASS: TestBroadcast (15.01s)
如结果所示, 10个运动员在8秒内分别准备好,等待教练发令后,同时开跑。
此场景中,10个运动员相当于10个协程, 同时等待主协程的命令,使用Broadcast() 唤醒所有阻塞的子协程。
注意事项
使用 Cond,最容易踩的坑就是调用 Wait()
方法之前,调用者没有持有锁或没有检查辅助条件。在如上示例代码中,假如把调用 Wait()
方法前后的加锁和释放锁的代码注释掉,运行代码会 导致程序 panic
。原因是调用 Wait 方法
,会先把调用者放入等待队列中,然后释放锁。此时如果在未持有锁时调用释放锁的方法,就会 导致程序 panic
。
Wait方法的使用
- Wait会自动释放c.L锁,并挂起调用者的goroutine,之后恢复执行
- Wait会在返回时对c.L加锁
- 除非被Broadcast或Signal唤醒,否则Wait不会返回
- 由于Wait第一次恢复是,c.L并没有加锁,所以当Wait返回时,调用者通常不能假设条件为真
- 简单来说,只要想使用condition就必须加锁
参考
https://www.jb51.net/article/277047.htm