golang的sync包浅析
总述
为了更好的使用golang
来进行网络编程,将介绍以下六种除fmt
外的经典包中的方法及其他内容。共有sync
,runtime
,net
,reflect
,io
,time
包,本系列是学习笔记,只涉及浅显的用法,需要深入了解的可以前往go语言中文网以及Golang标准库文档。
首先,我们介绍一下sync
包,这个包常用的接口如下。
进程控制
Locker
这个接口代表一个可以加锁和解锁的对象
type Locker interface {
Lock()
Unlock()
}
sync.WaitGroup
简要介绍
这个类是用来实现go中并发任务同步的。sync.WaitGroup
有以下几个方法:
方法名 | 功能 |
---|---|
(wg * WaitGroup) Add(delta int) | 计数器+delta |
(wg *WaitGroup) Done() | 计数器-1 |
(wg *WaitGroup) Wait() | 阻塞直到计数器变为0 |
sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。
使用这三个方法时,一般时配套承担一个功能:
-
defer wg.Done
,这个语句往往放在将要并发的函数的第一行,来确保这个函数并发进程完成之后计数器会减一 -
wg.Add(1) go work(&wg)
add这个方法,往往是在并发语句之前使用,达到计数器增加的效果(在进程结束后由Done方法来减去)
-
wg.Wait()
,往往放在主进程或者整个进程最后,来确保所有进程执行完之后,才解除阻塞(如果所有的Done
方法都已经调用并且计数器值变为零,Wait
方法会立即返回,解除阻塞)
实例
package main
import (
"fmt"
"sync"
"time"
)
func work(wg *sync.WaitGroup) {
defer wg.Done() // 在任务完成后减少等待计数
fmt.Println("Doing work...")
time.Sleep(1 * time.Second) // 模拟工作
}
func main() {
var wg sync.WaitGroup
// 为每个工作者增加等待计数
for i := 0; i < 5; i++ {
wg.Add(1)
go work(&wg)
}
// 等待所有的工作者完成工作
wg.Wait()
fmt.Println("All work is done.")
}
运行结果:
这是一个子进程
这是一个子进程
这是一个子进程
这是一个子进程
这是一个子进程
这是一个主进程
为什么使用
func work() {
fmt.Println("这是一个子进程")
time.Sleep(1 * time.Second)
}
func main() {
for i := 0; i < 5; i++ {
go work()
}
fmt.Println("这是一个主进程")
}
运行结果:
这是一个子进程
这是一个子进程
这是一个子进程
这是一个主进程
这是一个子进程
这是一个子进程
可以看出,如果不使用这个方法进行优化的话,主进程和子进程结束的时间是随机的,优化之后,可以确保主进程在最后完成。
还有人会说,那将主进程结束的语句加一个defer不就好了?而实际情况是,不但没有达到这个效果,这样玩之后,连子进程个数有多少都无法保证了!
func main() {
defer fmt.Println("这是一个主进程")
for i := 0; i < 5; i++ {
go work()
}
}
运行结果:
这是一个子进程
这是一个子进程
这是一个主进程
补充总结
我们使用sync.WaitGroup
组合时,一般想要将进程控制在一个个的时间区域内,就像下面代码实现的一样:
func work(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("这是一个工作的子进程")
time.Sleep(1 * time.Second)
}
func work2(wg *sync.WaitGroup) {
defer wg.Done()
fmt.Println("这是一个摸鱼的子进程")
time.Sleep(1 * time.Second)
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go work(&wg)
}
wg.Wait()
fmt.Println("偶滴工作结束了")
var wg2 sync.WaitGroup
for i := 0; i < 5; i++ {
wg2.Add(1)
go work2(&wg2)
}
wg2.Wait()
fmt.Println("呜呜呜,摸鱼结束了")
}
运行结果:
这是一个工作的子进程
这是一个工作的子进程
这是一个工作的子进程
这是一个工作的子进程
这是一个工作的子进程
偶滴工作结束了
这是一个摸鱼的子进程
这是一个摸鱼的子进程
这是一个摸鱼的子进程
这是一个摸鱼的子进程
这是一个摸鱼的子进程
呜呜呜,摸鱼结束了
sync.Cond
Cond
允许 goroutine 在某个条件满足前挂起(等待),并允许其他 goroutine 通知它们条件已经改变,可以继续执行。Cond
需要与 sync.Mutex
或 sync.RWMutex
结合使用来保证条件变量的正确使用。调用 Wait
方法的 goroutine 会释放锁并进入等待状态,直到其他 goroutine 调用 Signal
或 Broadcast
方法来唤醒它们。
方法
func NewCond
func NewCond(l Locker) *Cond
使用锁l创建一个*Cond。
func (*Cond) Broadcast
func (c *Cond) Broadcast()
Broadcast唤醒所有等待c的线程。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。
func (*Cond)
func (c *Cond) Signal()
Signal唤醒等待c的一个线程(如果存在)。调用者在调用本方法时,建议(但并非必须)保持c.L的锁定。
func (*Cond) Wait
func (c *Cond) Wait()
Wait自行解锁c.L并阻塞当前线程,在之后线程恢复执行时,Wait方法会在返回前锁定c.L。和其他系统不同,Wait除非被Broadcast或者Signal唤醒,不会主动返回。
实例:
var (
mu sync.Mutex
cond = sync.NewCond(&mu)
ready = false
)
// 线程A
go func() {
mu.Lock()
for !ready {
cond.Wait()
}
// 执行任务
mu.Unlock()
}()
// 线程B
go func() {
mu.Lock()
ready = true
cond.Broadcast()
//cond.Signal() // 或使用 cond.Broadcast() 唤醒所有等待的 goroutine
mu.Unlock()
}()
运行结果:
线程A: 等待条件满足...
线程B: 设置条件为true
线程B: 已经唤醒线程A
线程A: 条件满足,开始执行任务
cond.Wait()
方法在执行时会自动解除持有的锁,并使当前 goroutine 进入等待状态,具体来说,这个过程包含以下几个步骤:
- 解除锁:
cond.Wait()
方法会自动释放与Cond
关联的sync.Mutex
锁。这样,其他 goroutine 可以获取这个锁,修改条件变量所依赖的状态。 - 进入等待状态:在释放锁之后,当前 goroutine 会被挂起(进入等待状态),直到条件满足并被唤醒。
- 重新获取锁:当条件被满足并且
cond.Signal()
或cond.Broadcast()
被调用后,cond.Wait()
会重新获取锁,然后继续执行等待之后的代码。
sync.Once
(进阶用法)
sync.Once
是一个用来保证某个操作只被执行一次的类型,这在并发编程中非常有用。它可以用来实现线程安全的单例模式或其他只需初始化一次的操作。这个类只含一个方法:
func (o *Once) Do(f func())
实例:
package main
import (
"fmt"
"sync"
)
var once sync.Once
func initialize() {
fmt.Println("Initialization")
}
func main() {
var wg sync.WaitGroup
// 启动多个 goroutine
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(initialize)
}()
}
// 等待所有 goroutine 完成
wg.Wait()
}
运行结果:
Initialization
这个方法确保传入的参数(也是一个函数)只会被被执行一次,简洁有效!
RWNutex(读写锁)
补充——方法总结:
func (*RWMutex) Lock
func (rw *RWMutex) Lock()
Lock方法将rw锁定为写入状态,禁止其他线程读取或者写入。
func (*RWMutex) Unlock
func (rw *RWMutex) Unlock()
Unlock方法解除rw的写入锁状态,如果m未加写入锁会导致运行时错误。
func (*RWMutex) RLock
func (rw *RWMutex) RLock()
RLock方法将rw锁定为读取状态,禁止其他线程写入,但不禁止读取。
func (*RWMutex) RUnlock
func (rw *RWMutex) RUnlock()
Runlock方法解除rw的读取锁状态,如果m未加读取锁会导致运行时错误。
func (*RWMutex) RLocker
func (rw *RWMutex) RLocker() Locker
Rlocker方法返回一个互斥锁,通过调用rw.Rlock和rw.Runlock实现了Locker接口。
共用存储
sync.Pool
Pool是一个可以分别存取的临时对象的集合。Pool中保存的任何item都可能随时不做通告的释放掉。如果Pool持有该对象的唯一引用,这个item就可能被回收。Pool可以安全的被多个线程同时使用。Pool的目的是缓存申请但未使用的item用于之后的重用,以减轻GC的压力。也就是说,让创建高效而线程安全的空闲列表更容易。但Pool并不适用于所有空闲列表。Pool的合理用法是用于管理一组静静的被多个独立并发线程共享并可能重用的临时item。Pool提供了让多个线程分摊内存申请消耗的方法。Pool的一个好例子在fmt包里。该Pool维护一个动态大小的临时输出缓存仓库。该仓库会在过载(许多线程活跃的打印时)增大,在沉寂时缩小。另一方面,管理着短寿命对象的空闲列表不适合使用Pool,因为这种情况下内存申请消耗不能很好的分配。这时应该由这些对象自己实现空闲列表。
创建Pool常使用的方法:
var pool = sync.Pool{
New: func() interface{} {
// 这里定义池中对象的创建方式
return &MyObject{}
},
}
方法:
func (*Pool) Get
func (p *Pool) Get() interface{}
Get方法从池中选择任意一个item,删除其在池中的引用计数,并提供给调用者。Get方法也可能选择无视内存池,将其当作空的。调用者不应认为Get的返回这和传递给Put的值之间有任何关系。
假使Get方法没有取得item:如p.New非nil,Get返回调用p.New的结果;否则返回nil。
func (*Pool) Put
func (p *Pool) Put(x interface{})
Put方法将x放入池中。
实例:
package main
import (
"fmt"
"sync"
"time"
)
// 创建一个 sync.Pool 实例
var pool = sync.Pool{
New: func() interface{} {
// 这里定义池中对象的创建方式
return &MyObject{}
},
}
type MyObject struct {
ID int
Value string
}
func main() {
var wg sync.WaitGroup
// 启动多个 goroutine
for i := 0; i < 5; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
// 从池中获取一个对象
obj := pool.Get().(*MyObject)
obj.ID = goroutineID
obj.Value = fmt.Sprintf("Value for ID %d", goroutineID)
fmt.Printf("Goroutine %d got object with ID: %d and Value: %s\n", goroutineID, obj.ID, obj.Value)
// 模拟处理对象的时间
time.Sleep(time.Millisecond * 500)
// 进行处理:增加一些变更
obj.Value = fmt.Sprintf("Processed Value for ID %d", goroutineID)
fmt.Printf("Goroutine %d processed object with ID: %d to Value: %s\n", goroutineID, obj.ID, obj.Value)
// 放回池中
pool.Put(obj)
fmt.Printf("Goroutine %d put object with ID: %d back to pool\n", goroutineID, obj.ID)
}(i)
}
// 等待所有 goroutine 完成
wg.Wait()
}
- 增加
Value
字段:MyObject
结构体现在有一个Value
字段,用于存储字符串数据。 - 获取对象:每个 goroutine 从
sync.Pool
中获取一个MyObject
实例,并设置其ID
和Value
字段。 - 处理对象数据:每个 goroutine 模拟对对象的处理,这里我们改变
Value
字段的内容以反映处理过程。 - 放回对象:处理完对象后,每个 goroutine 将对象放回池中,供其他 goroutine 重用。
运行结果:
Goroutine 3 got object with ID: 3 and Value: Value for ID 3
Goroutine 4 got object with ID: 4 and Value: Value for ID 4
Goroutine 0 got object with ID: 0 and Value: Value for ID 0
Goroutine 1 got object with ID: 1 and Value: Value for ID 1
Goroutine 2 got object with ID: 2 and Value: Value for ID 2
Goroutine 0 processed object with ID: 0 to Value: Processed Value for ID 0
Goroutine 0 put object with ID: 0 back to pool
Goroutine 2 processed object with ID: 2 to Value: Processed Value for ID 2
Goroutine 1 processed object with ID: 1 to Value: Processed Value for ID 1
Goroutine 1 put object with ID: 1 back to pool
Goroutine 3 processed object with ID: 3 to Value: Processed Value for ID 3
Goroutine 3 put object with ID: 3 back to pool
Goroutine 4 processed object with ID: 4 to Value: Processed Value for ID 4
Goroutine 4 put object with ID: 4 back to pool
Goroutine 2 put object with ID: 2 back to pool
相当于创建一个共享数据池,以供一个进程中的多个go程使用。
sync.Map
Go语言中内置的map不是并发安全的,因此我们使用sync包中的Map。简而言之:sync.Map
是一个并发版本的Go
语言的map
,sync.Map
的一些主要操作:
Load(key interface{}) (interface{}, bool)
:通过键值查找映射中的值,如果没有找到,则返回零值和布尔类型的false。Store(key, value interface{})
:将键值对存入映射。如果键已经在映射中,则新的值会替换旧的值。Delete(key interface{})
:从映射中删除与给定键关联的值。Reserve
:在并发场景中,定义了如何预分配映射的存储空间,能够提高性能。可以使用sync.Map.LoadOrStore(key, value)
:如果键已经在映射中,返回映射中的值;否则,将新值存入映射并返回它。