40分钟学 Go 语言高并发:原子操作与CAS
原子操作与CAS
学习要点:
- atomic包使用
- CAS原理与实现
- ABA问题
- 无锁队列
一、atomic包使用
Go语言的sync/atomic
包提供了一系列原子操作,可以用于实现并发安全的数据结构和算法。常见的原子操作有:
操作 | 描述 |
---|---|
Load* | 原子读取变量的值 |
Store* | 原子写入变量的值 |
Add* | 原子增加变量的值 |
Swap* | 原子交换变量的值 |
CompareAndSwap* | 原子比较并交换变量的值 |
其中,Load*
和Store*
用于原子读写变量,Add*
用于原子增减,Swap*
用于原子交换,CompareAndSwap*
用于原子比较并交换。这些操作都是针对基本数据类型(int32、int64、uint32、uint64、uintptr)的,不支持复合数据类型。
下面是一个使用atomic.LoadInt64
和atomic.StoreInt64
实现并发安全计数器的例子:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("Final count:", counter)
}
在这个例子中,我们使用atomic.AddInt64
原子增加计数器的值,确保在多个goroutine并发修改计数器时不会出现数据竞争问题。
二、CAS原理与实现
CAS(Compare-and-Swap)是一种常见的无锁并发原语,它通常由硬件指令实现。CAS操作包括三个操作数:
- 内存位置(V)
- 预期原值(A)
- 新值(B)
CAS语义是:如果内存位置的值与预期原值A相同,则以原子方式将该位置的值更新为新值B,并返回true;否则,返回false。
下面是一个使用CAS实现无锁栈的例子:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Node struct {
value interface{}
next *Node
}
type Stack struct {
top *Node
}
func NewStack() *Stack {
return &Stack{}
}
func (s *Stack) Push(value interface{}) {
node := &Node{value: value}
for {
top := s.top
node.next = top
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&s.top)), unsafe.Pointer(top), unsafe.Pointer(node)) {
return
}
}
}
func (s *Stack) Pop() (interface{}, bool) {
for {
top := s.top
if top == nil {
return nil, false
}
next := top.next
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&s.top)), unsafe.Pointer(top), unsafe.Pointer(next)) {
return top.value, true
}
}
}
func main() {
stack := NewStack()
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
stack.Push(n)
}(i)
}
wg.Wait()
for {
value, ok := stack.Pop()
if !ok {
break
}
fmt.Println(value)
}
}
在这个例子中,我们使用atomic.CompareAndSwapPointer
来实现并发安全的栈操作。在Push
操作中,我们先获取栈顶元素,然后尝试将新节点插入到栈顶。在Pop
操作中,我们先获取栈顶元素,然后尝试将栈顶元素弹出。如果CAS操作成功,则说明并发修改不会引起数据竞争。
三、ABA问题
CAS操作虽然可以解决并发安全问题,但也会引入一些新的问题,其中最著名的就是ABA问题。
ABA问题指的是,当一个线程从A变到B再变回A时,另一个线程看到的仍然是A,但实际上这个A已经不是原来的A了。这可能会导致一些意外的结果,比如栈的弹出操作获取到了错误的元素。
解决ABA问题的常见方法有:
- 使用版本号: 在每个数据项中增加一个版本号,在CAS操作时检查版本号是否一致。
- 使用标记位: 在每个数据项中增加一个标记位,表示数据是否已经被修改过。
- 使用双重锁定: 在CAS操作前后加锁,确保数据在CAS期间不会被修改。
下面是一个使用版本号解决ABA问题的例子:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
type Node struct {
value int
version int64
}
type Stack struct {
top *Node
}
func NewStack() *Stack {
return &Stack{}
}
func (s *Stack) Push(value int) {
node := &Node{value: value, version: 1}
for {
top := s.top
node.next = top
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
unsafe.Pointer(top),
unsafe.Pointer(node)) {
return
}
}
}
func (s *Stack) Pop() (int, bool) {
for {
top := s.top
if top == nil {
return 0, false
}
next := top.next
if atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
unsafe.Pointer(top),
unsafe.Pointer(next)) {
return top.value, true
}
}
}
func main() {
stack := NewStack()
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
stack.Push(n)
}(i)
}
wg.Wait()
for {
value, ok := stack.Pop()
if !ok {
break
}
fmt.Println(value)
}
}
在这个例子中,我们在每个节点中增加了一个版本号字段,在CAS操作时检查版本号是否一致。这样可以有效地避免ABA问题的发生。
四、无锁队列
无锁队列是一种基于CAS原理实现的并发安全的队列。相比于使用锁的队列实现,无锁队列具有以下优点:
- 更高的并发性: 无锁队列可以支持多个生产者和多个消费者并发操作,不会因为锁的竞争而出现性能瓶颈。
- 更低的开销: 无锁队列的实现不需要获取和释放锁,因此没有锁的开销。
- 更好的可伸缩性: 无锁队列可以更好地利用多核CPU的并行处理能力,具有更好的可伸缩性。
下面是一个使用CAS实现的无锁队列:
package main
import (
"fmt"
"sync"
"sync/atomic"
"unsafe"
)
type Node struct {
value interface{}
next *Node
}
type Queue struct {
head, tail unsafe.Pointer
}
func NewQueue() *Queue {
node := &Node{}
return &Queue{
head: unsafe.Pointer(node),
tail: unsafe.Pointer(node),
}
}
func (q *Queue) Enqueue(value interface{}) {
node := &Node{value: value}
for {
tail := (*Node)(atomic.LoadPointer(&q.tail))
next := (*Node)(atomic.LoadPointer(&tail.next))
if tail == (*Node)(atomic.LoadPointer(&q.tail)) {
if next == nil {
if atomic.CompareAndSwapPointer(&tail.next, unsafe.Pointer(next), unsafe.Pointer(node)) {
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(node))
return
}
} else {
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
}
}
}
}
func (q *Queue) Dequeue() (interface{}, bool) {
for {
head := (*Node)(atomic.LoadPointer(&q.head))
tail := (*Node)(atomic.LoadPointer(&q.tail))
next := (*Node)(atomic.LoadPointer(&head.next))
if head == (*Node)(atomic.LoadPointer(&q.head)) {
if head == tail {
if next == nil {
return nil, false
}
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), unsafe.Pointer(next))
} else {
value := next.value
if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), unsafe.Pointer(next)) {
return value, true
}
}
}
}
}
func main() {
queue := NewQueue()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
queue.Enqueue(n)
}(i)
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
value, _ := queue.Dequeue()
fmt.Println(value)
}()
}
wg.Wait()
}
在这个例子中,我们使用CAS操作实现了入队和出队操作的并发安全。队列使用两个指针head
和tail
来表示队头和队尾,通过CAS操作来确保并发修改时不会出现数据竞争。这种无锁队列的实现可以充分利用CPU的并行处理能力,提供更高的并发性能。
综上所述
本章我们详细讨论了Go语言中的原子操作和CAS原理,并给出了相关的实战示例。希望通过这些内容,您能够更好地理解并使用这些并发原语,设计出更加高效和安全的并发程序。
怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!