Go语言的并发安全与互斥锁
线程通讯
在程序中不可避免的出现并发或者并行,一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如,对于go语言来说,一般入口为main
,main中依次导入import
导入的包,并按顺序执行init
方法,之后在按照调用顺序执行程序。所以一般情况下程序是串行的。如下所示:
在很多时候串行并不满足要求,程序同时需要满足,很多客户访问,例如web程序必须要设置为并发的才能满足众多请求。
go语言通过go
关键字实现新线程,如下·:
go func(){
fmt.Println("-------开启一个新线程-----")
}()
线程通讯,go语言实现线程通讯是通过进程通讯实现内存共享。go语言内置chan
数据结构实现线程通讯与数据共享。
package main
import (
"fmt"
"sync"
"time"
)
var ch = make(chan int)
var wati sync.WaitGroup
func main() {
wati.Add(2)
go producter()
go customer()
wati.Wait()
}
func producter() {
for i := 0; i < 10; i++ {
ch <- i * 10
fmt.Println("-----等待1秒")
time.Sleep(time.Second * 1)
}
wati.Done()
close(ch)
}
func customer() {
for {
x, ok := <-ch
fmt.Println("-----------------custome", x, ok)
if ok {
fmt.Println("-----", x)
} else {
fmt.Println("----no data")
break
}
}
wati.Done()
}
chan本身在设计上是并发安全的。这意味着多个协程可以同时安全地对一个chan进行发送和接收操作,而无需额外的同步措施。Go 语言的运行时系统会自动处理这些操作的并发安全性,保证数据的正确传递和协程的正确同步。
线程等待
在多线程中,各个线程是是独立的,并不知道线程完成的次序,所以线程等待也很重要,如下代码:
package main
import (
"os"
"sync"
)
var Filestream = make(chan []byte)
func main() {
var str = []byte("hello")
go Read(str)
go Write()
}
func Read(by []byte) {
Filestream <- by
close(Filestream)
}
func Write() error {
file, err := os.Create("test.txt")
if err != nil {
return err
}
by := <-Filestream
_, err = file.Write(by)
if err != nil {
return err
}
return nil
}
上述代码通过chan
实现了多线程创建文件,但是实际上执行代码并不能成功,这是由于主线程没有等待其他线程,导致主线程在过早结束,程序结束,通过sync.WaitGroup
库实现线程等待,改造后的代码如下:
package main
import (
"os"
"sync"
)
var Filestream = make(chan []byte)
var wait sync.WaitGroup
func main() {
var str = []byte("hello")
wait.Add(2)
go Read(str)
go Write()
wait.Wait()
}
func Read(by []byte) {
Filestream <- by
close(Filestream)
wait.Done()
}
func Write() error {
file, err := os.Create("test.txt")
if err != nil {
return err
}
by := <-Filestream
_, err = file.Write(by)
if err != nil {
return err
}
wait.Done()
return nil
}
并发安全
数据锁
在并发时就会出现资源被竞争的情况,这就是涉及到并发安全了,例如对于一个数组var list []string
在对这个数组插入数据时,对于同一时刻进行的操作数据就可能会丢失,因此在操作数据时一定要保证操作的数据结构是并发安全的,例如sync.Map
就是线程安全的go语言底层实现了。
那么如何对自定义的数据结构体实现并发安全呢,就要用到互斥锁了。互斥锁是一种用于多线程或多协程编程中的同步原语,其主要目的是保护共享资源,防止多个线程或协程同时访问和修改这些资源,从而避免数据竞争和并发冲突。
在并发编程中,多个线程或协程可能会同时访问和修改同一个共享变量。如果不加以控制,就可能导致数据竞争,即多个操作同时对同一个数据进行读写,从而导致数据的不一致性或错误结果。
互斥锁通过提供一种互斥访问的机制,确保在任何时刻只有一个线程或协程能够访问被保护的共享资源。当一个线程或协程获取了互斥锁后,其他试图获取该锁的线程或协程就会被阻塞,直到锁被释放。
Go语言的互斥锁被sync.Mutex
实现。在go语言中提供了sync.Map
map类型是并发安全的。要实现自己的并发安全需要借助``sync.Mutex`如下:
- 并发安装的结构体
type safeArr[T any] struct {
sync.Mutex
data []T
}
func (self *safeArr[T]) Add(item T) {
self.Lock()
self.data = append(self.data, item)
self.Unlock()
}
func (self *safeArr[T]) Remove(index int) {
self.Lock()
if index >= 0 && index < len(self.data) {
self.data = append(self.data[:index], self.data[index+1:]...)
}
self.Unlock()
}
func (self *safeArr[T]) Data() []T {
self.Lock()
self.Unlock()
return self.data
}
注意,data 是不能直接向外部暴露的,不可以使用
append
直接操作,必须通过并发安装的Add
方法。
- 测试
var arr safeArr[string]
func Test(c *fiber.Ctx) error {
arr.Add("sss")
return c.JSON(fiber.Map{
"data": arr.data,
})
}
func main() {
app := fiber.New()
app.Get("/test", Test)
go func() {
for {
if requestCount == 0 {
wg.Done()
}
}
}()
app.Listen(":8081")
}
通过上述代码测试代码测试不用路由请求进来后的线程对并发安全的
safeArr
的处理是否满足要求。如下所示:
同步的路由线程进来之后,对全局的变量可以实现操作,这里并不能体现它是并发安全的,我们将Add
方法的self.Unlock()
注释掉,再次启动服务,请求接口如下:
如上图所示,在注释掉解锁代码后,再次请求会一直堵塞等待解锁,可以看出上述定义的结构体就是并发安全的了。
互斥锁会使线程变为阻塞线程,等待解锁,而不是直接停掉。
方法锁
如何实现对方法枷锁,同一时刻只允许一个线程使用该方法。对方法加锁和对数据结构加锁一样,再方法内部加锁。
var arr []string
var wu sync.Mutex
func Test(c *fiber.Ctx) error {
wu.Lock()
defer wu.Unlock()
arr = append(arr, "ssss")
return c.JSON(fiber.Map{
"data": arr,
})
}
func main() {
app := fiber.New()
app.Get("/test", Test)
go func() {
for {
if requestCount == 0 {
wg.Done()
}
}
}()
app.Listen(":8081")
}
如果将defer wu.Unlock()
注释掉,也会是线程进入等待状态,如下:
但是又会有新的问题,由于线程是独立的,此时存在一个线程处于阻塞状态,但是却可以再次发送新的请求,再阻塞的这段时间内,可以一直发送请求,多次请求会也会造成数据的错误,如下所示:
如上图可以发现,互斥锁可以是方法变成并发安全的,但是在线程等待的过程中仍然可以发送请求。
Redis互斥锁
在上一节的方法互斥锁中,需要额外的需求,就是如果当前的线程占用某个资源时,新的线程不会处于阻塞状态而是直接停止,这时就需要有外部的标识记录资源的占用情况,就需要借助r内存数据库如redis了。如下:
- redis初始化
var lock sync.Mutex
var Client *CustomRedisClient
type CustomRedisClient struct {
redis.Conn
}
func Init() {
conn, err := redis.Dial("tcp", config.RedisVar.Host)
if err != nil {
panic(errors.New("conn redis failed"))
}
_, err = conn.Do("auth", config.RedisVar.Password)
if err != nil {
panic(errors.New("redis auth failed"))
}
c := CustomRedisClient{conn}
Client = &c
fmt.Println("conn redis success")
}
func (c *CustomRedisClient) Cmd(commandName string, args ...interface{}) (reply interface{}, err error) {
lock.Lock()
defer lock.Unlock()
return c.Do(commandName, args...)
}
- redis互斥锁
func Lock(name string) (error, *string) {
apply, err := Client.Cmd("GET", name)
if apply != nil || err != nil {
return errors.New("locking"), nil
}
uid := utils.UUID()
fmt.Printf("name: %v, uid: %v \n", name, uid)
_, err = Client.Cmd("SET", name, uid, "EX", 5)
if err != nil {
Client.Cmd("DEL", name)
return errors.New("redis set err"), nil
}
return nil, &uid
}
func Unlock(name string, uid string) error {
reply, err := Client.Cmd("GET", name)
if reply == nil && err == nil {
return nil
}
s := string(reply.([]byte))
if s != uid {
Client.Cmd("DEL", name)
return errors.New("can not unlock")
}
_, err = Client.Cmd("DEL", name)
if err != nil {
Client.Cmd("DEL", name)
return err
}
return nil
}
使用上述的redis互斥锁就可以显示,当某个资源被线程占用时,另一个资源进来会直接停掉。
对代码改造使用新的互斥锁,如下:
func Test(c *fiber.Ctx) error {
err, s := redis_util.Lock("test")
if err != nil {
if errors.Is(err, errors.New("locking")) {
return c.JSON(fiber.Map{
"data": "locking",
})
} else {
return c.JSON(fiber.Map{
"data": err.Error(),
})
}
}
defer redis_util.Unlock("test", *s)
time.Sleep(time.Second * 15)
arr = append(arr, "ssss")
return c.JSON(fiber.Map{
"data": arr,
})
}
如上图所示,当资源被占用是会返回locking
,需要注意的是redis存储的互斥标识的时间一定要大于或等于程序的执行时间,不然程序还未执行玩,redis占用标识就销毁了,导致错误。