Golang——并发控制
本文介绍Go并发,同步,顺序执行,设计的一些常见的场景,顺序执行主要用channel实现。在这种同步信号的使用场景中,使用无缓冲通道,可以选择不关闭通道。
文章目录
- 协程同步背景介绍
- 无缓冲通道的作用
- 为什么不需要关闭通道
- 何时需要关闭通道
- 总结
- 常见的同步场景
- 顺序执行10个Goroutine
- 两个 Goroutines 的交替执行,交替打印偶数和奇数
- 两个 server 的任意一个执行完成,就执行第三个
- 两个 server 必须全部执行完成,再执行第三个。
- 三个 server 的必须按 1 2 3 顺序执行
- 一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
- 一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)
协程同步背景介绍
无缓冲通道(unbuffered channel)通常用于同步 Goroutine 的执行顺序。它的作用是让 Goroutines 在指定的时刻按顺序执行,而不是用来传递数据。因此,在这种同步信号传递的场景中,不需要关闭通道,程序依然能够正常运行。
无缓冲通道的作用
无缓冲通道的特性是:
- 发送方必须等待接收方接收数据,才能继续发送下一个数据。
- 它本质上是一个同步机制,用于协调不同 Goroutine 的执行顺序。
这种同步机制不涉及数据传输,只是信号传递。
为什么不需要关闭通道
-
同步控制:在你的代码中,通道仅仅用于同步 Goroutine 的执行,而不用于传输实际的数据。在同步场景下,通道充当的是“信号”的角色。接收方只关心信号的到来,处理完后就继续执行,通道是否关闭不会影响这一行为。
-
没有资源泄漏:由于通道没有存储任何数据,而是仅用于发送和接收信号,通道的关闭不会带来资源泄漏。即使不关闭通道,程序也会正确地运行,因为没有对通道进行进一步的发送操作。
-
避免复杂性:关闭通道通常用于“通知接收方数据已经完成传输”,而在这种情况下,你只是在等待信号,不需要关心通道的关闭状态。因此,不关闭通道反而使得程序逻辑更简洁,避免了复杂的资源管理。
-
垃圾回收机制close():Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。
何时需要关闭通道
在其他情况下,关闭通道是必要的,尤其是在以下几种场景:
-
多接收方模式:如果你有多个接收方从同一个通道接收数据,关闭通道可以通知所有接收方没有更多的数据可以接收。
-
通知没有更多数据:如果通道用于传输数据,关闭通道可以标识发送方不再有数据发送,这对于避免接收方阻塞和避免无限等待是非常重要的。
总结
在 无缓冲通道 的同步信号传递场景下:
- 不关闭通道是合理的,因为通道的作用仅仅是控制同步,而不是用于传输数据。
- 通道的关闭通常是在传输数据时,通知接收方“没有更多数据了”,而在同步信号的场景中,这种关闭操作并不必要。
所以,在 顺序执行的同步场景 中,如果你只是通过无缓冲通道传递信号,不关闭通道完全没有问题。
常见的同步场景
顺序执行10个Goroutine
这里选择了关闭channel,其实可以删掉close(),后续的例子不会再关闭channel。
package main
import (
"fmt"
"time"
)
func main() {
// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
steps := make([]chan struct{}, 10)
for i := 0; i < 10; i++ {
steps[i] = make(chan struct{})
}
// 定义 10 个 Goroutine,按照顺序执行
for i := 0; i < 10; i++ {
go func(i int) {
if i == 0 {
// 第一个 Goroutine 不需要等待
fmt.Println("Goroutine 1: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 1: Done")
// 通知 Goroutine 2 可以开始
steps[0] <- struct{}{}
close(steps[0])
} else {
// 等待上一个 Goroutine 完成
<-steps[i-1]
fmt.Printf("Goroutine %d: Start\n", i+1)
time.Sleep(1 * time.Second) // 模拟工作
fmt.Printf("Goroutine %d: Done\n", i+1)
// 通知下一个 Goroutine 可以开始
if i < 9 {
steps[i] <- struct{}{}
close(steps[i])
} else {
// 最后一个 Goroutine,通知主线程完成
steps[9] <- struct{}{}
close(steps[9])
}
}
}(i)
}
// 等待最后一个 Goroutine 完成
<-steps[9]
fmt.Println("All Goroutines Finished!")
}
输出:
Goroutine 1: Start
Goroutine 1: Done
Goroutine 2: Start
Goroutine 2: Done
Goroutine 3: Start
Goroutine 3: Done
Goroutine 4: Start
Goroutine 4: Done
Goroutine 5: Start
Goroutine 5: Done
Goroutine 6: Start
Goroutine 6: Done
Goroutine 7: Start
Goroutine 7: Done
Goroutine 8: Start
Goroutine 8: Done
Goroutine 9: Start
Goroutine 9: Done
Goroutine 10: Start
Goroutine 10: Done
All Goroutines Finished!
两个 Goroutines 的交替执行,交替打印偶数和奇数
package main
import (
"fmt"
)
func main() {
// 定义两个channel,分别用于控制打印顺序
ch1 := make(chan struct{})
ch2 := make(chan struct{})
done := make(chan struct{}) // 用于通知主协程完成
// 启动第一个goroutine,负责打印偶数
go func() {
for i := 0; i <= 9; i += 2 {
<-ch1 // 等待信号
fmt.Println(i) // 打印当前偶数
ch2 <- struct{}{} // 通知另一个goroutine
}
}()
// 启动第二个goroutine,负责打印奇数
go func() {
for i := 1; i <= 9; i += 2 {
<-ch2 // 等待信号
fmt.Println(i) // 打印当前奇数
if i == 9 {
done <- struct{}{} // 打印完成通知主协程
} else {
ch1 <- struct{}{} // 通知另一个goroutine
}
}
}()
// 主协程启动打印过程
ch1 <- struct{}{} // 先给ch1发送信号,开始打印偶数
// 主协程等待所有任务完成
<-done
}
输出:
0
1
2
3
4
5
6
7
8
9
两个 server 的任意一个执行完成,就执行第三个
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package main
import (
"fmt"
"time"
)
func server1(ch chan string) {
time.Sleep(1 * time.Second)
ch <- "server1"
}
func server2(ch chan string) {
time.Sleep(1 * time.Second)
ch <- "server2"
}
func main() {
for i := 0; i < 5; i++ {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
select {
case s1 := <-output1:
fmt.Println(s1, "server3")
case s2 := <-output2:
fmt.Println(s2, "server3")
}
fmt.Println("--------------")
}
fmt.Println("5组执行完成")
}
输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。
server1 server3
--------------
server2 server3
--------------
server1 server3
--------------
server1 server3
--------------
server1 server3
--------------
5组执行完成
说明:
select如果加上default会直接命中default,不会等待两个通道。
两个 server 必须全部执行完成,再执行第三个。
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package main
import (
"fmt"
"time"
)
func server1(ch chan string) {
time.Sleep(1 * time.Second)
ch <- "server1"
}
func server2(ch chan string) {
time.Sleep(1 * time.Second)
ch <- "server2"
}
func main() {
for i := 0; i < 5; i++ {
output1 := make(chan string)
output2 := make(chan string)
go server1(output1)
go server2(output2)
for i := 0; i < 2; i++ {
select {
case s1 := <-output1:
fmt.Println(s1)
case s2 := <-output2:
fmt.Println(s2)
}
}
fmt.Println("server3")
fmt.Println("--------------")
}
fmt.Println("5组执行完成")
}
上面使用了for+select控制2个server完成,也可以使用WaitGroup,如下:
package main
import (
"fmt"
"sync"
"time"
)
func server1(wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知 WaitGroup
time.Sleep(1 * time.Second)
fmt.Println("server1")
}
func server2(wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知 WaitGroup
time.Sleep(1 * time.Second)
fmt.Println("server2")
}
func main() {
for i := 0; i < 5; i++ {
wg := sync.WaitGroup{}
wg.Add(2)
go server1(&wg)
go server2(&wg)
wg.Wait()
fmt.Println("server3")
fmt.Println("--------------")
}
fmt.Println("5组执行完成")
}
输出:
下面输出5组,前面的server顺序不一定(哪个先完成都行)。
server1
server2
server3
--------------
server1
server2
server3
--------------
server2
server1
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成
三个 server 的必须按 1 2 3 顺序执行
这里执行了5次,以验证随机性,time.Sleep代表server的一些执行的逻辑。
package main
import (
"fmt"
"sync"
"time"
)
func server1(ch chan string, wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知 WaitGroup
time.Sleep(1 * time.Second) // 模拟工作
ch <- "server1"
}
func server2(ch chan string, wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知 WaitGroup
time.Sleep(1 * time.Second) // 模拟工作
ch <- "server2"
}
func main() {
for i := 0; i < 5; i++ {
output1 := make(chan string)
output2 := make(chan string)
var wg sync.WaitGroup
// 启动 server1 和 server2
wg.Add(2) // 等待两个 Goroutine 完成
go server1(output1, &wg)
go server2(output2, &wg)
// 确保按顺序输出 server1, server2, server3
s1 := <-output1
s2 := <-output2
fmt.Println(s1)
fmt.Println(s2)
fmt.Println("server3")
fmt.Println("--------------")
}
// 完成5组任务
fmt.Println("5组执行完成")
}
输出:
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
server1
server2
server3
--------------
5组执行完成
一个生产者Goroutine,多个消费者Goroutine(每条消息,一次消费)
此处生产者生产的每个消息,只会有一个消费者消费,并发消费。
package main
import (
"fmt"
"sync"
"time"
)
func producer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i <= 5; i++ {
fmt.Printf("Produced: %d\n", i)
ch <- i // 向通道发送数据
time.Sleep(time.Second)
}
close(ch) // 生产者完成后关闭通道
}
func consumer(id int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for task := range ch { // 从通道读取任务直到通道关闭
fmt.Printf("Consumer %d processing task: %d\n", id, task)
time.Sleep(2 * time.Second) // 模拟消费任务的延时
}
}
func main() {
var wg sync.WaitGroup
tasks := make(chan int, 10) // 定义缓冲区大小为10的任务通道
// 启动多个消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, tasks, &wg)
}
// 启动生产者
wg.Add(1)
go producer(tasks, &wg)
// 等待所有消费者完成工作
wg.Wait()
fmt.Println("All tasks completed.")
}
输出:
Produced: 1
Consumer 2 processing task: 1
Produced: 2
Consumer 1 processing task: 2
Produced: 3
Consumer 3 processing task: 3
Produced: 4
Consumer 2 processing task: 4
Produced: 5
Consumer 1 processing task: 5
All tasks completed.
一个生产者Goroutine,多个消费者Goroutine(每条消息每个消费者都会消费)
此处生产者生产的每个消息,每个消费者都会消费。
方案:
-
我们应该设计一个 多播机制(广播模式),即每个任务都会被多个消费者消费。最简单的方式是使用 复制通道(通过多个 goroutine 消费同一个任务通道)。
-
可以通过 使用多个通道,每个消费者都从这些通道中接收任务,或者使用
sync.WaitGroup
等方式来确保每个消费者都能够完成任务处理。
package main
import (
"fmt"
"sync"
"time"
)
func producer(channels []chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保 producer 完成时通知 WaitGroup
// 生产 5 个任务
for i := 1; i <= 5; i++ {
fmt.Printf("Produced: %d\n", i)
// 向每个通道发送任务
for _, ch := range channels {
ch <- i
}
time.Sleep(time.Second)
}
// 所有任务发送完毕后,关闭每个通道
for _, ch := range channels {
close(ch)
}
}
func consumer(id int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保 consumer 完成时通知 WaitGroup
// 从通道中接收任务并处理
for task := range ch {
fmt.Printf("Consumer %d processing task: %d\n", id, task)
time.Sleep(2 * time.Second) // 模拟任务处理延迟
}
}
func main() {
var wg sync.WaitGroup
numConsumers := 3
channels := make([]chan int, numConsumers)
// 创建多个消费者通道
for i := 0; i < numConsumers; i++ {
channels[i] = make(chan int, 10)
}
// 启动多个消费者
for i := 1; i <= numConsumers; i++ {
wg.Add(1)
go consumer(i, channels[i-1], &wg)
}
// 启动生产者
wg.Add(1)
go producer(channels, &wg)
// 等待所有消费者和生产者完成工作
wg.Wait()
fmt.Println("All tasks completed.")
}
输出:
Produced: 1
Consumer 1 processing task: 1
Consumer 2 processing task: 1
Consumer 3 processing task: 1
Produced: 2
Produced: 3
Consumer 1 processing task: 2
Consumer 2 processing task: 2
Consumer 3 processing task: 2
Produced: 4
Produced: 5
Consumer 2 processing task: 3
Consumer 1 processing task: 3
Consumer 3 processing task: 3
Consumer 1 processing task: 4
Consumer 2 processing task: 4
Consumer 3 processing task: 4
Consumer 3 processing task: 5
Consumer 2 processing task: 5
Consumer 1 processing task: 5
All tasks completed.