【Go】-go中的锁机制
目录
一、锁的基础知识
1. 互斥量/互斥锁
2. CAS(compare and swap)
3. 自旋锁
4. 读写锁
5. 乐观锁 & 悲观锁
6. 死锁
二、go中锁机制
1. Mutex-互斥锁
2. RWMutex-读写锁
2.1 RWMutex流程概览
2.2 写锁饥饿问题
2.3. golang的读写锁源码剖析
2.3.1 读锁实现
2.3.2 写锁实现
2.3.3 关键核心机制
3. 常见问题
一、锁的基础知识
1. 互斥量/互斥锁
互斥量(Mutex), 又称为互斥锁, 是一种用来保护临界区的特殊变量, 它可以处于锁定(locked) 状态, 也可以处于解锁(unlocked) 状态。
在编程中,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。
2. CAS(compare and swap)
解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。"
CAS机制执行流程:
CAS存在的问题:
1. ABA问题
CAS需要在操作值的时候,检查值有没有发生变化,如果没有发生变化就更新,但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了 —> 这就是所谓的ABA问题。
ABA问题的解决思路其实也很简单,就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A了。
2. 循环时间长开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销
3. 只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。
3. 自旋锁
自旋锁与互斥锁比较类似,它们都是为了解决对某项资源的互斥使用。无论是互斥锁,还是自旋锁,在任何时刻,最多只能有一个保持者,也就说,在任何时刻最多只能有一个执行单元获得锁。但是两者在调度机制上略有不同。对于互斥锁,如果资源已经被占用,资源申请者只能进入睡眠状态。但是自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋"一词就是因此而得名。
自旋锁可能存在的2个问题:
-
试图递归地获得自旋锁必然会引起死锁:递归程序的持有实例在第二个实例循环,以试图获得相同自旋锁时,不会释放此自旋锁。
在递归程序中使用自旋锁应遵守下列策略:递归程序决不能在持有自旋锁时调用它自己,也决不能在递归调用时试图获得相同的自旋锁。 -
过多占用cpu资源。如果不加限制,由于申请者一直在循环等待,因此自旋锁在锁定的时候,如果不成功,不会睡眠,会持续的尝试,单cpu的时候自旋锁会让其它process动不了. 因此,一般自旋锁实现会有一个参数限定最多持续尝试次数. 超出后, 自旋锁放弃当前time slice. 等下一次机会。
由此可见,自旋锁比较适用于锁使用者保持锁时间比较短的情况。正是由于自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是睡眠是非常必要的,自旋锁的效率远高于互斥锁。
4. 读写锁
读写锁实际是一种特殊的自旋锁,它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。
这种锁相对于自旋锁而言,能提高并发性,因为在多处理器系统中,它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。在读写锁保持期间也是抢占失效的。
如果读写锁当前没有读者,也没有写者,那么写者可以立刻获得读写锁,否则它必须自旋在那里,直到没有任何写者或读者。如果读写锁没有写者,那么读者可以立即获得该读写锁,否则读者必须自旋在那里,直到写者释放该读写锁。
5. 乐观锁 & 悲观锁
乐观锁其实主要就是一种思想,因为乐观锁的操作过程中其实没有没有任何锁的参与,乐观锁只是和悲观锁相对,严格的说乐观锁不能称之为锁。
悲观锁:总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞,直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。
乐观锁:总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,只在更新的时候会判断一下在此期间别人有没有去更新这个数据。
乐观锁适用于写比较少的情况下(多读场景),即冲突真的很少发生的时候,这样可以省去了锁的开销,加大了系统的整个吞吐量。但如果是多写的情况,一般会经常产生冲突,这就会导致上层应用会不断的进行retry,这样反倒是降低了性能,所以一般多写的场景下用悲观锁就比较合适。
乐观锁常见的两种实现方式:
1. 版本号机制
CAS机制保证了在更新数据的时候没有被修改为其他数据的同步机制,版本机制就保证了没有被修改过的同步机制
2. CAS机制
当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败。
6. 死锁
死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
虽然进程在运行过程中,可能发生死锁,但死锁的发生也必须具备一定的条件,死锁的发生必须具备以下四个必要条件:
- 互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
- 请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
- 不剥夺条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
- 环路等待条件:指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。
二、go中锁机制
在 Golang 里有专门的方法来实现锁,就是 sync 包,这个包有两个很重要的锁类型。一个叫 Mutex, 利用它可以实现互斥锁。一个叫 RWMutex,利用它可以实现读写锁。
sync.Mutex
的锁只有一种锁:Lock()
,它是互斥锁,同一时间只能有一个锁。sync.RWMutex
叫读写锁,它有两种锁:RLock()
和Lock()
:RLock()
叫读锁。它不是绝对锁,可以有多个读者同时获取此锁(调用mu.RLock
)。Lock()
叫写锁,它是个绝对锁,就是说,如果一旦某人拿到了这个锁,别人就不能再获取此锁了。
1. Mutex-互斥锁
Mutex 的实现主要借助了 CAS 指令 + 自旋 + 信号量
数据结构:
type Mutex struct {
state int32
sema uint32
}
上述两个加起来只占 8 字节空间的结构体表示了 Go语言中的互斥锁
状态:
在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:
- 1位表示是否被锁定
- 1位表示是否有协程已经被唤醒
- 1位表示是否处于饥饿状态
- 剩下29位表示阻塞的协程数
正常模式和饥饿模式
正常模式:正常模式下waiter都是先入先出,在队列中等待的waiter被唤醒后不会直接获取锁,因为要和新来的goroutine 进行竞争,新来的goroutine相对于被唤醒的waiter是具有优势的,新的goroutine 正在cpu上运行,被唤醒的waiter还要进行调度才能进入状态,所以在并发的情况下waiter大概率抢不过新来的goroutine,这个时候waiter会被放到队列的头部,如果等待的时间超过了1ms,这个时候Mutex就会进入饥饿模式。
饥饿模式:当Mutex进入饥饿模式之后,锁的所有权会从解锁的goroutine移交给队列头部的goroutine,这几个时候新来的goroutine会直接放入队列的尾部,这样很好的解决了老的goroutine一直抢不到锁的场景。
对于两种模式,正常模式下的性能是最好的,goroutine可以连续多次获取锁,饥饿模式解决了取锁公平的问题,但是性能会下降,其实是性能和公平的一个平衡模式。所以在lock的源码里面,当队列只剩本省goroutine一个并且等待时间没有超过1ms,这个时候Mutex会重新恢复到正常模式。
Lock函数
// 加锁
// 如果锁已经被使用,调用goroutine阻塞,直到锁可用
func (m *Mutex) Lock() {
// 快速路径:没有竞争直接获取到锁,修改状态位为加锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
// 开启-race之后会进行判断,正常情况可忽略
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 慢路径(以便快速路径可以内联)
m.lockSlow()
}
来看看Lock函数,分为两个部分, 快速路径,先通过CAS尝试直接获取锁,如果能获取到直接返回,否则进入慢路径的方法,这里的代码注释提到了内联
tips:方法内联
简单的说方法内联就是将被调用方函数代码“复制”到调用方函数中,减少函数调用开销,在2018年之前的go版本中,所有的逻辑都在Lock函数中,并没有拆出来,2018年之后Go开发者将slow path拆出来,当lock方法被频繁调用的时候,有两种情况,如果直接获得锁走的是fast path,这个时候内联就只有fast path 的代码,这样会减少方法调用的堆栈空间和时间的消耗 ,如果处于自旋,锁竞争的情况下,走的是slow path,这个时候才会把lock slow 的方法内联进来,这样方便了编译器做内联。
lockSlow 函数
func (m *Mutex) lockSlow() {
var waitStartTime int64 //记录请求锁的初始时间
starving := false //饥饿标记
awoke := false //唤醒标记
iter := 0 //自旋次数
old := m.state //当前锁的状态
for {
//锁处于正常模式还没有释放的时候,尝试自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
//在临界区耗时很短的情况下提高性能
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0
&& atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
//更新锁的状态
old = m.state
continue
}
new := old
// 非饥饿状态进行加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 等待着数量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 加锁的情况下切换为饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//goroutine 唤醒的时候进行重置标志
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 设置新的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 判断是不是第一次加入队列
// 如果之前就在队列里面等待了,加入到队头
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 检查锁是否处于饥饿状态
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁处于饥饿状态,直接抢到锁
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 设置标志,进行加锁并且waiter-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果是最后一个的话清除饥饿标志
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
// -race开启检测冲突,可以忽略
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
Unlock函数
//如果对没有lock 的Mutex进行unlock会报错
//unlock和goroutine是没有绑定的,对于一个Mutex,可以一个goroutine加锁,另一个goroutine进行解锁
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 快速之路,直接解锁,去除加锁位的标记
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 解锁失败进入慢路径
//同样的对慢路径做了单独封装,便于内联
m.unlockSlow(new)
}
}
unlockSlow函数
func (m *Mutex) unlockSlow(new int32) {
//解锁一个未加锁的Mutex会报错(可以想想为什么,Mutex使用状态位进行标记锁的状态的)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
//正常模式下,没有waiter或者在处理事情的情况下直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//如果有等待者,设置mutexWoken标志,waiter-1,更新state
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式下会直接将mutex交给下一个等待的waiter,让出时间片,以便waiter执行
runtime_Semrelease(&m.sema, true, 1)
}
}
同样的,在unlock也有fastpath和slowpath,fastpath尝试解锁,解锁成功就返回,否则进入slowpath,slowpath分为正常模式的处理和饥饿模式的处理,饥饿模式直接将锁的控制权交给队列中等待的waiter,正常模式分两种情况 如果当前没有waiter,只有自己本身,直接解锁返回,如果有waiter,解锁后唤醒下个等待者。
2. RWMutex-读写锁
RWMutex
是一个读/写互斥锁,在某一时刻只能由任意数量的 reader
持有 或者 一个 writer
持有。也就是说,要么放行任意数量的 reader
,多个 reader
可以并行读;要么放行一个 writer
,多个 writer
需要串行写。
RWMutex 对外暴露的方法有五个:
RLock()
:读操作获取锁,如果锁已经被 writer 占用,会一直阻塞直到 writer 释放锁;否则直接获得锁;RUnlock()
:读操作完毕之后释放锁;Lock()
:写操作获取锁,如果锁已经被 reader 或者 writer 占用,会一直阻塞直到获取到锁;否则直接获得锁;Unlock()
:写操作完毕之后释放锁;RLocker()
:返回读操作的 Locker 对象,该对象的 Lock() 方法对应 RWMutex 的 -RLock()
,Unlock() 方法对应 RWMutex 的 RUnlock() 方法。
一旦涉及到多个 reader 和 writer ,就需要考虑优先级问题,是 reader 优先还是 writer 优先。
2.1 RWMutex流程概览
可以想象 RWMutex
有两个队伍,一个是包含 所有reader 和你获得准入权writer 的 队列A,一个是还没有获得准入权 writer 的 队列B。
队列 A 最多只允许有 一个writer,如果有其他 writer,需要在 队列B 等待;
当一个 writer 到了 队列A 后,只允许它 之前的reader 执行读操作,新来的 reader 需要在 队列A 后面排队;
当前面的 reader 执行完读操作之后,writer 执行写操作;
writer 执行完写操作后,让 后面的reader 执行读操作,再唤醒队列B 的一个 writer 到 队列A 后面排队。
初始时刻 队列A 中 writer W1 前面有三个 reader,后面有两个 reader,队列B中有两个 writer
并发读 多个 reader 可以同时获取到读锁,进入临界区进行读操作;writer W1 在 队列A 中等待,同时又来了两个 reader,直接在 队列A 后面排队
写操作 W1 前面所有的 reader 完成后,W1 获得锁,进入临界区操作
获得准入权 W1 完成写操作退出,先让后面排队的 reader 进行读操作,然后从 队列B 中唤醒 W2 到 队列A 排队。W2 从 队列B 到 队列A 的过程中,R8 先到了 队列A,因此 R8 可以执行读操作。R9、R10、R11 在 W2 之后到的,所以在后面排队;新来的 W4 直接在队列B 排队。
从上面的示例可以看出,RWMutex
可以看作是没有优先级,按照先来先到的顺序去执行,只不过是 多个reader 可以 并行去执行罢了。
2.2 写锁饥饿问题
因为读锁是共享的,所以如果当前已经有读锁,那后续goroutine继续加读锁正常情况下是可以加锁成功,但是如果一直有读锁进行加锁,那尝试加写锁的goroutine则可能会长期获取不到锁,这就是因为读锁而导致的写锁饥饿问题
go通过引入以下特性避免出现写锁饥饿:
- 当写锁阻塞时,新的读锁是无法申请的
即在sync.RWMutex
的使用中,一个线程请求了他的写锁(mx.Lock()
)后,即便它还没有取到该锁(可能由于资源已被其他人锁定),后面所有的读锁的申请,都将被阻塞,只有取写锁的请求得到了锁且用完释放后,读锁才能去取。
这种特性可以有效防止写锁饥饿。如果一个线程因为某种原因,导致长时间得不到CPU时间片,这种状态被称之为饥饿。
2.3. golang的读写锁源码剖析
成员变量
结构体
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // 用于writer等待读完成排队的信号量
readerSem uint32 // 用于reader等待写完成排队的信号量
readerCount int32 // 读锁的计数器
readerWait int32 // 等待读锁释放的数量
}
写锁计数
读写锁中允许加读锁的最大数量是4294967296,在go里面对写锁的计数采用了负值进行,通过递减最大允许加读锁的数量从而进行写锁对读锁的抢占
const rwmutexMaxReaders = 1 << 30
2.3.1 读锁实现
读锁加锁逻辑
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 累加reader计数器,如果小于0则表明有writer正在等待
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 当前有writer正在等待读锁,读锁就加入排队
runtime_SemacquireMutex(&rw.readerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
读锁释放逻辑
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 如果小于0,则表明当前有writer正在等待
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 将等待reader的计数减1,证明当前是已经有一个读的,如果值==0,则进行唤醒等待的
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
}
if race.Enabled {
race.Enable()
}
}
2.3.2 写锁实现
加写锁实现
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 首先获取mutex锁,同时多个goroutine只有一个可以进入到下面的逻辑
rw.w.Lock()
// 对readerCounter进行进行抢占,通过递减rwmutexMaxReaders允许最大读的数量
// 来实现写锁对读锁的抢占
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 记录需要等待多少个reader完成,如果发现不为0,则表明当前有reader正在读取,当前goroutine
// 需要进行排队等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
释放写锁
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 将reader计数器复位,上面减去了一个rwmutexMaxReaders现在再重新加回去即可复位
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒所有的读锁
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false)
}
// 释放mutex
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
2.3.3 关键核心机制
写锁对读锁的抢占
加写锁的抢占
// 在加写锁的时候通过将readerCount递减最大允许加读锁的数量,来实现对加读锁的抢占
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
加读锁的抢占检测
// 如果没有写锁的情况下读锁的readerCount进行Add后一定是一个>0的数字,这里通过检测值为负数
//就实现了读锁对写锁抢占的检测
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false)
}
写锁抢占读锁后后续的读锁就会加锁失败,但是如果想加写锁成功还要继续对已经加读锁成功的进行等待
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 写锁发现需要等待的读锁释放的数量不为0,就自己自己去休眠了
runtime_SemacquireMutex(&rw.writerSem, false)
}
写锁既然休眠了,则必定要有一种唤醒机制其实就是每次释放锁的时候,当检查到有加写锁的情况下,就递减readerWait,并由最后一个释放reader lock的goroutine来实现唤醒写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false)
}
3. 常见问题
- 不可复制
和 Mutex 一样,RWMutex 也是不可复制。不能复制的原因和互斥锁一样。一旦读写锁被使用,它的字段就会记录它当前的一些状态。这个时候你去复制这把锁,就会把它的状态也给复制过来。但是,原来的锁在释放的时候,并不会修改你复制出来的这个读写锁,这就会导致复制出来的读写锁的状态不对,可能永远无法释放锁。 - 不可重入
不可重入的原因是,获得锁之后,还没释放锁,又申请锁,这样有可能造成死锁。比如 reader A 获取到了读锁,writer B 等待 reader A 释放锁,reader 还没释放锁又申请了一把锁,但是这把锁申请不成功,他需要等待 writer B。这就形成了一个循环等待的死锁。 - 加锁和释放锁一定要成对出现,不能忘记释放锁,也不能解锁一个未加锁的锁。