Golang——协程同步
本文详细介绍Golang的协程同步的实现和应用场景。
文章目录
- 协程同步是什么?
- 为什么需要协程同步?
- 常见的协程同步机制
- 互斥锁(Mutex)
- 读写锁(RWMutex)
- 等待组(WaitGroup)
- 通道(Channel)
- 原子操作(Atomic Operations)
- 典型的协程同步场景
- 对比:`channel` 和 `sync.Mutex`, `sync.WaitGroup`
- 互斥锁控制并发顺序
- 使用互斥锁实现Goroutine 控制顺序
- 解释
协程同步是什么?
协程同步(Goroutine synchronization)是指在多个 Goroutine 并发执行时,通过某些机制来协调它们之间的执行顺序、共享数据访问和资源管理,从而避免数据竞争(race condition)、死锁、资源冲突等问题。同步的目的是确保在并发程序中,多个 Goroutine 可以正确地共享数据、按预期的顺序执行任务,并且避免因为并发操作导致的不可预测行为。
为什么需要协程同步?
在 Go 语言中,Goroutine 是一种轻量级的线程,多个 Goroutine 可能并发执行并共享资源。在并发执行时,如果没有适当的同步机制,多个 Goroutine 可能会同时访问共享资源,导致数据不一致、程序崩溃或其他并发问题。因此,需要同步机制来确保:
- 共享资源的正确访问:避免多个 Goroutine 同时修改同一资源(如变量、数据结构等),从而导致数据竞态。
- 执行顺序的控制:确保 Goroutine 在特定顺序下执行,满足某些逻辑条件(如等待某些任务完成)。
- 任务完成的等待:在某些场景下,需要等待多个并发任务完成后再继续后续操作。
常见的协程同步机制
互斥锁(Mutex)
互斥锁(sync.Mutex
)用于保护共享资源,确保同一时刻只有一个 Goroutine 能访问临界区(共享资源)。在锁的保护下,其他 Goroutine 必须等待,直到当前 Goroutine 完成对资源的操作并释放锁。
-
优点:
- 简单易用,适合保护临界区。
- 避免多个 Goroutine 同时读写共享资源时的数据竞态。
-
缺点:
- 锁的粒度较粗,不适合高并发场景。
- 可能会导致死锁,尤其是当锁的使用不当时。
读写锁(RWMutex)
sync.RWMutex
是一种更细粒度的锁,它允许多个 Goroutine 同时读取共享资源,但写操作时会阻止所有其他的读取和写入操作。适用于读多写少的场景。
-
优点:
- 适合读多写少的场景,可以允许多个 Goroutine 同时读取共享数据。
- 减少了锁竞争,提高了并发性能。
-
缺点:
- 写操作仍然是独占的,不适用于频繁写操作的场景。
等待组(WaitGroup)
sync.WaitGroup
用于等待一组 Goroutine 完成任务。它提供了 Add
、Done
和 Wait
方法,用来协调多个 Goroutine 的执行顺序。
-
优点:
- 非常适合等待多个并发任务的完成。
- 通过
Add
增加等待的任务数,通过Done
表示任务完成,Wait
阻塞当前 Goroutine 直到所有任务完成。
-
缺点:
- 只能用于同步“任务完成”,不能用于同步临界区的访问。
WaitGroup源码如下:
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}
- 一个WaitGroup等待多个goroutine执行完成,main的goroutine可以调用Add()方法设置需要等待的goroutine数量,之后每一个goroutine在运行结束时调用Done(),在这段时间内,我们可以使用Wait()阻塞main的goroutine直到所有的goroutine都执行完成。
- WaitGroup不能进行复制操作【struct里面有noCopy类型,禁止做值拷贝,只能通过指针来传递】,在函数中使用
WaitGroup
时,需要传递它的指针,即*sync.WaitGroup
。
通道(Channel)
Channel
是 Go 的核心特性之一,它不仅用于 Goroutine 间通信,还能通过阻塞机制隐式地实现同步。通过发送和接收操作,Channel
可以协调多个 Goroutine 的执行,确保它们按照特定顺序进行。
-
优点:
- 通过 Channel 传递数据本身就会进行同步,使用非常灵活。
- 可以避免使用显式的锁(如
sync.Mutex
)来控制并发。
-
缺点:
- 不适合所有场景,特别是需要复杂同步时,可能需要更多的设计。
- 需要注意死锁和缓冲区的大小等问题。
原子操作(Atomic Operations)
sync/atomic
包提供了一些原子操作,用于在多个 Goroutine 之间同步访问单个变量。这些操作不需要使用锁,适用于简单的计数器、标志位等场景。
-
优点:
- 对单一变量的原子操作非常高效。
- 适用于计数器、标志位等简单的同步操作。
-
缺点:
- 只适用于单个变量,不适合复杂的数据结构。
- 操作较为低级,可能需要更多的代码来管理并发逻辑。
典型的协程同步场景
- 保护共享数据:当多个 Goroutine 需要读写共享数据时,可以使用
Mutex
或RWMutex
来保护数据的访问。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var mu sync.Mutex
var counter int
for i := 0; i < 1000; i++ {
go func() {
mu.Lock()
counter++
mu.Unlock()
}()
}
time.Sleep(time.Second * 3)
fmt.Println(counter)
}
说明:time.Sleep
可以使用WaitGroup进行替代。
- 等待多个任务完成:使用
WaitGroup
来等待多个并发任务完成后再继续执行后续操作。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
}
说明:time.Sleep
可以使用WaitGroup进行替代。
- 协调 Goroutine 执行顺序:使用
Channel
来确保 Goroutine 按照特定顺序执行。
package main
import (
"fmt"
"time"
)
func main() {
// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
step1 := make(chan struct{})
step2 := make(chan struct{})
step3 := make(chan struct{})
// 定义第一个 Goroutine
go func() {
fmt.Println("Goroutine 1: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 1: Done")
// 通知 Goroutine 2 可以开始
close(step1)
}()
// 定义第二个 Goroutine
go func() {
// 等待 Goroutine 1 完成
<-step1
fmt.Println("Goroutine 2: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 2: Done")
// 通知 Goroutine 3 可以开始
close(step2)
}()
// 定义第三个 Goroutine
go func() {
// 等待 Goroutine 2 完成
<-step2
fmt.Println("Goroutine 3: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 3: Done")
// 通知主线程结束
close(step3)
}()
// 等待 Goroutine 3 完成
<-step3
fmt.Println("All Goroutines Finished!")
}
package main
import (
"fmt"
"time"
)
func main() {
// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
step1 := make(chan struct{})
step2 := make(chan struct{})
step3 := make(chan struct{})
// 定义第一个 Goroutine
go func() {
fmt.Println("Goroutine 1: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 1: Done")
// 通知 Goroutine 2 可以开始
step1 <- struct{}{}
}()
// 定义第二个 Goroutine
go func() {
// 等待 Goroutine 1 完成
<-step1
fmt.Println("Goroutine 2: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 2: Done")
// 通知 Goroutine 3 可以开始
step2 <- struct{}{}
}()
// 定义第三个 Goroutine
go func() {
// 等待 Goroutine 2 完成
<-step2
fmt.Println("Goroutine 3: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 3: Done")
// 通知主线程结束
step3 <- struct{}{}
}()
// 等待 Goroutine 3 完成
<-step3
fmt.Println("All Goroutines Finished!")
}
这两个代码的主要区别在于如何实现 Goroutine 之间的同步信号传递:一个使用 close()
关闭通道,另一个使用 <-chan
来发送信号。
-
使用
close
通道:close(stepX)
通知接收方,通道不再发送任何数据。这意味着接收方在收到数据后,可以认为没有更多的工作需要处理。- 这种方式适用于需要明确表示“结束”或“没有更多数据”的场景。
-
使用
<-stepX
信号传递:stepX <- struct{}{}
用来传递一个信号,通常是通过发送一个空的结构体。接收方通过<-stepX
等待信号,表示前一个 Goroutine 已经完成,可以继续执行。- 这种方式更常见用于同步 Goroutine 之间的顺序执行,它不表示“结束”,只是简单的通知和同步。
在 Go 中,无缓冲通道(
chan struct{}
)通常用于同步信号传递。代码中使用的step1
,step2
, 和step3
是用于控制 Goroutine 执行顺序的信号通道。对于这种情形,关闭通道并不是必要的,因为:
无缓冲通道的用途:在你的代码中,通道是用于 Goroutine 之间的同步,而不是用来传输数据或产生多次通信。
通道关闭的场景:关闭通道通常用于发送者完成发送所有数据并且没有更多数据要发送时,或者用于接收方识别通道的结束。在这个例子中,主线程只需要等待第三个Goroutine 发出的信号,而不需要读取或等待更多的值。因此,不关闭通道不会导致问题。
等待通道信号的场景:主线程通过
<-step3
来等待所有 Goroutine 完成。当最后一个 Goroutine 通过step3 <- struct{}{}
完成时,主线程就能结束。无缓冲通道不需要关闭,也不会导致死锁或资源泄漏。总结:
- 通常在有缓冲的通道或多个接收者的情况下,关闭通道的意义更大,因为接收者可能需要知道什么时候没有更多的数据,或者什么时候发送者不再发送数据。这种channel如果未关闭,可能导致它们在垃圾回收机制中未被及时回收。
- 在当前的场景下(无缓冲通道、每个通道仅用于同步信号),没有关闭通道也不会影响程序的正确性。Go 的垃圾回收机制会自动处理那些不再使用的对象和数据结构,包括通道。所以即使没有显式关闭通道,程序结束时,未关闭的通道也会被垃圾回收。
当然,也可以在使用 <-chan
来发送信号后强制关闭通道,如下:
package main
import (
"fmt"
"time"
)
func main() {
// 创建多个无缓冲的 Channel,用来控制 Goroutine 的顺序
step1 := make(chan struct{})
step2 := make(chan struct{})
step3 := make(chan struct{})
// 定义第一个 Goroutine
go func() {
fmt.Println("Goroutine 1: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 1: Done")
// 通知 Goroutine 2 可以开始
step1 <- struct{}{}
// 关闭 step1 通道,表示没有更多信号
close(step1)
}()
// 定义第二个 Goroutine
go func() {
// 等待 Goroutine 1 完成
<-step1
fmt.Println("Goroutine 2: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 2: Done")
// 通知 Goroutine 3 可以开始
step2 <- struct{}{}
// 关闭 step2 通道,表示没有更多信号
close(step2)
}()
// 定义第三个 Goroutine
go func() {
// 等待 Goroutine 2 完成
<-step2
fmt.Println("Goroutine 3: Start")
time.Sleep(1 * time.Second) // 模拟工作
fmt.Println("Goroutine 3: Done")
// 通知主线程结束
step3 <- struct{}{}
// 关闭 step3 通道,表示没有更多信号
close(step3)
}()
// 等待 Goroutine 3 完成
<-step3
fmt.Println("All Goroutines Finished!")
}
channel同步执行Goroutine请参考:【todo】
- 原子操作:使用
atomic
来实现对单个变量的原子操作,如计数器的增加,但是是无序的。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var count int32
var wg sync.WaitGroup
// 创建10个 Goroutine 来增加计数器
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// 使用 atomic 对 count 进行原子增加
atomic.AddInt32(&count, 1)
// 打印当前的数字
fmt.Println(i)
}(i)
}
// 等待所有 Goroutine 完成
wg.Wait()
// 输出最终的 count 值
fmt.Println("Final count:", count)
}
对比:channel
和 sync.Mutex
, sync.WaitGroup
channel
:更灵活,能够传递数据并同步控制执行顺序。通常用于需要显式控制执行顺序的场景,比如一个任务完成后通知另一个任务。sync.Mutex
和sync.RWMutex
:主要用于同步对共享资源的访问,无法直接控制 Goroutine 的执行顺序。sync.WaitGroup
:用于等待多个 Goroutine 完成,可以确保所有 Goroutine 都完成后再执行下一步,但它不控制 Goroutine 的执行顺序。
使用channel可以控制执行顺序,当然也可只使用 sync.Mutex
或 sync.RWMutex
,sync.WaitGroup
来控制 Goroutine 的执行顺序,只不过没有channel那么优雅,参考下节。
互斥锁控制并发顺序
如果单独使用 sync.Mutex
或 sync.RWMutex
和 sync.WaitGroup
来实现 Goroutine 顺序打印 0 到 9,需要巧妙地利用 sync.Mutex
或 sync.RWMutex
来确保 Goroutine 按顺序执行。
使用互斥锁实现Goroutine 控制顺序
我们可以通过 sync.Mutex
来实现一个基本的锁机制,确保每次只有一个 Goroutine 在执行,并按顺序打印数字。
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
// var mu sync.RWMutex
var wg sync.WaitGroup
counter := 0
// 使用 Mutex 来控制顺序打印
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// 控制顺序打印
mu.Lock()
// 这里使用 counter 来确保按顺序执行
for counter != i {
mu.Unlock()
mu.Lock()
}
// 打印当前数字
fmt.Println(i)
counter++
mu.Unlock()
}(i)
}
wg.Wait() // 等待所有 Goroutine 执行完毕
}
解释
- 我们使用
sync.Mutex
来保护共享变量counter
,确保每个 Goroutine 在它轮到执行时才会打印。 counter
用于跟踪已经执行的顺序,mu.Lock()
和mu.Unlock()
确保只有一个 Goroutine 可以进入临界区。for counter != i
的检查保证每个 Goroutine 在它的数字到达时才开始执行。