当前位置: 首页 > article >正文

Go语言的并发安全与互斥锁

线程通讯

在程序中不可避免的出现并发或者并行,一般来说对于一个程序大多数是遵循开发语言的启动顺序。例如,对于go语言来说,一般入口为main,main中依次导入import导入的包,并按顺序执行init方法,之后在按照调用顺序执行程序。所以一般情况下程序是串行的。如下所示:
在这里插入图片描述

在很多时候串行并不满足要求,程序同时需要满足,很多客户访问,例如web程序必须要设置为并发的才能满足众多请求。

go语言通过go关键字实现新线程,如下·:

go func(){
	fmt.Println("-------开启一个新线程-----")
}()

线程通讯,go语言实现线程通讯是通过进程通讯实现内存共享。go语言内置chan数据结构实现线程通讯与数据共享。

package main

import (
	"fmt"
	"sync"
	"time"
)

var ch = make(chan int)
var wati sync.WaitGroup

func main() {
	wati.Add(2)
	go producter()
	go customer()
	wati.Wait()
}

func producter() {
	for i := 0; i < 10; i++ {
		ch <- i * 10
		fmt.Println("-----等待1秒")
		time.Sleep(time.Second * 1)
	}
	wati.Done()
	close(ch)
}
func customer() {
	for {

		x, ok := <-ch
		fmt.Println("-----------------custome", x, ok)
		if ok {
			fmt.Println("-----", x)
		} else {
			fmt.Println("----no data")
			break
		}
	}
	wati.Done()
}

chan本身在设计上是并发安全的。这意味着多个协程可以同时安全地对一个chan进行发送和接收操作,而无需额外的同步措施。Go 语言的运行时系统会自动处理这些操作的并发安全性,保证数据的正确传递和协程的正确同步。

线程等待

在多线程中,各个线程是是独立的,并不知道线程完成的次序,所以线程等待也很重要,如下代码:

package main

import (
	"os"
	"sync"
)

var Filestream = make(chan []byte)

func main() {
	var str = []byte("hello")
	go Read(str)
	go Write()
}

func Read(by []byte) {
	Filestream <- by
	close(Filestream)
}

func Write() error {
	file, err := os.Create("test.txt")
	if err != nil {
		return err
	}
	by := <-Filestream
	_, err = file.Write(by)
	if err != nil {
		return err
	}
	return nil
}

上述代码通过chan实现了多线程创建文件,但是实际上执行代码并不能成功,这是由于主线程没有等待其他线程,导致主线程在过早结束,程序结束,通过sync.WaitGroup库实现线程等待,改造后的代码如下:

package main

import (
	"os"
	"sync"
)

var Filestream = make(chan []byte)
var wait sync.WaitGroup

func main() {
	var str = []byte("hello")
	wait.Add(2)
	go Read(str)
	go Write()
	wait.Wait()
}

func Read(by []byte) {
	Filestream <- by
	close(Filestream)
	wait.Done()
}

func Write() error {
	file, err := os.Create("test.txt")
	if err != nil {
		return err
	}
	by := <-Filestream
	_, err = file.Write(by)
	if err != nil {
		return err
	}
	wait.Done()
	return nil
}

并发安全

数据锁

在并发时就会出现资源被竞争的情况,这就是涉及到并发安全了,例如对于一个数组var list []string在对这个数组插入数据时,对于同一时刻进行的操作数据就可能会丢失,因此在操作数据时一定要保证操作的数据结构是并发安全的,例如sync.Map就是线程安全的go语言底层实现了。

那么如何对自定义的数据结构体实现并发安全呢,就要用到互斥锁了。互斥锁是一种用于多线程或多协程编程中的同步原语,其主要目的是保护共享资源,防止多个线程或协程同时访问和修改这些资源,从而避免数据竞争和并发冲突。

在并发编程中,多个线程或协程可能会同时访问和修改同一个共享变量。如果不加以控制,就可能导致数据竞争,即多个操作同时对同一个数据进行读写,从而导致数据的不一致性或错误结果。

互斥锁通过提供一种互斥访问的机制,确保在任何时刻只有一个线程或协程能够访问被保护的共享资源。当一个线程或协程获取了互斥锁后,其他试图获取该锁的线程或协程就会被阻塞,直到锁被释放。

Go语言的互斥锁被sync.Mutex实现。在go语言中提供了sync.Mapmap类型是并发安全的。要实现自己的并发安全需要借助``sync.Mutex`如下:

  • 并发安装的结构体
type safeArr[T any] struct {
	sync.Mutex
	data []T
}

func (self *safeArr[T]) Add(item T) {
	self.Lock()
	self.data = append(self.data, item)
	self.Unlock()
}

func (self *safeArr[T]) Remove(index int) {
	self.Lock()
	if index >= 0 && index < len(self.data) {
		self.data = append(self.data[:index], self.data[index+1:]...)
	}
	self.Unlock()
}

func (self *safeArr[T]) Data() []T {
	self.Lock()
	self.Unlock()
	return self.data
}

注意,data 是不能直接向外部暴露的,不可以使用append直接操作,必须通过并发安装的Add方法。

  • 测试
var arr safeArr[string]

func Test(c *fiber.Ctx) error {
	arr.Add("sss")
	return c.JSON(fiber.Map{
		"data": arr.data,
	})
}


func main() {
    app := fiber.New()

    app.Get("/test", Test)

    go func() {
        for {
            if requestCount == 0 {
                wg.Done()
            }
        }
    }()

    app.Listen(":8081")
}

通过上述代码测试代码测试不用路由请求进来后的线程对并发安全的
safeArr的处理是否满足要求。如下所示:

请添加图片描述
同步的路由线程进来之后,对全局的变量可以实现操作,这里并不能体现它是并发安全的,我们将Add方法的self.Unlock()注释掉,再次启动服务,请求接口如下:

请添加图片描述

如上图所示,在注释掉解锁代码后,再次请求会一直堵塞等待解锁,可以看出上述定义的结构体就是并发安全的了。

互斥锁会使线程变为阻塞线程,等待解锁,而不是直接停掉。

方法锁

如何实现对方法枷锁,同一时刻只允许一个线程使用该方法。对方法加锁和对数据结构加锁一样,再方法内部加锁。

var arr []string

var wu sync.Mutex

func Test(c *fiber.Ctx) error {
	wu.Lock()
	defer wu.Unlock()
	arr = append(arr, "ssss")
	return c.JSON(fiber.Map{
		"data": arr,
	})
}


func main() {
    app := fiber.New()

    app.Get("/test", Test)

    go func() {
        for {
            if requestCount == 0 {
                wg.Done()
            }
        }
    }()

    app.Listen(":8081")
}

如果将defer wu.Unlock()注释掉,也会是线程进入等待状态,如下:

请添加图片描述

但是又会有新的问题,由于线程是独立的,此时存在一个线程处于阻塞状态,但是却可以再次发送新的请求,再阻塞的这段时间内,可以一直发送请求,多次请求会也会造成数据的错误,如下所示:
请添加图片描述
如上图可以发现,互斥锁可以是方法变成并发安全的,但是在线程等待的过程中仍然可以发送请求。

Redis互斥锁

在上一节的方法互斥锁中,需要额外的需求,就是如果当前的线程占用某个资源时,新的线程不会处于阻塞状态而是直接停止,这时就需要有外部的标识记录资源的占用情况,就需要借助r内存数据库如redis了。如下:

  • redis初始化
var lock sync.Mutex

var Client *CustomRedisClient

type CustomRedisClient struct {
	redis.Conn
}

func Init() {
	conn, err := redis.Dial("tcp", config.RedisVar.Host)
	if err != nil {
		panic(errors.New("conn redis failed"))
	}
	_, err = conn.Do("auth", config.RedisVar.Password)
	if err != nil {
		panic(errors.New("redis auth failed"))
	}
	c := CustomRedisClient{conn}
	Client = &c
	fmt.Println("conn redis success")
}

func (c *CustomRedisClient) Cmd(commandName string, args ...interface{}) (reply interface{}, err error) {
	lock.Lock()
	defer lock.Unlock()
	return c.Do(commandName, args...)
}
  • redis互斥锁

func Lock(name string) (error, *string) {
	apply, err := Client.Cmd("GET", name)
	if apply != nil || err != nil {
		return errors.New("locking"), nil
	}
	uid := utils.UUID()
	fmt.Printf("name: %v, uid: %v \n", name, uid)
	_, err = Client.Cmd("SET", name, uid, "EX", 5)
	if err != nil {
		Client.Cmd("DEL", name)
		return errors.New("redis set err"), nil
	}
	return nil, &uid
}

func Unlock(name string, uid string) error {
	reply, err := Client.Cmd("GET", name)
	if reply == nil && err == nil {
		return nil
	}
	s := string(reply.([]byte))
	if s != uid {
		Client.Cmd("DEL", name)
		return errors.New("can not unlock")
	}
	_, err = Client.Cmd("DEL", name)
	if err != nil {
		Client.Cmd("DEL", name)
		return err
	}
	return nil
}

使用上述的redis互斥锁就可以显示,当某个资源被线程占用时,另一个资源进来会直接停掉。

对代码改造使用新的互斥锁,如下:

func Test(c *fiber.Ctx) error {
	err, s := redis_util.Lock("test")
	if err != nil {
		if errors.Is(err, errors.New("locking")) {
			return c.JSON(fiber.Map{
				"data": "locking",
			})
		} else {
			return c.JSON(fiber.Map{
				"data": err.Error(),
			})
		}
	}
	defer redis_util.Unlock("test", *s)

	time.Sleep(time.Second * 15)
	arr = append(arr, "ssss")
	return c.JSON(fiber.Map{
		"data": arr,
	})
}

请添加图片描述

如上图所示,当资源被占用是会返回locking,需要注意的是redis存储的互斥标识的时间一定要大于或等于程序的执行时间,不然程序还未执行玩,redis占用标识就销毁了,导致错误。


http://www.kler.cn/a/392020.html

相关文章:

  • MATLAB语言的循环实现
  • Transformer入门教程全解析(一)
  • RK3568 Android 13 内置搜狗输入法小计
  • STM32+WIFI获取网络时间+8位数码管显示+0.96OLED显
  • 【C++】B2106 矩阵转置
  • springboot + vue+elementUI图片上传流程
  • 使用cloudflare搭建私人docker镜像站
  • 【深圳大学/大学物理实验2】弗兰克-赫兹实验预习题参考
  • PL/SQL执行.sql文件
  • Go语言实现用户登录Web应用
  • 解析“ChatGPT网络错误”:从网络专线到IP地址的根源与解决方案
  • 券商隔夜单自动下单交易接口
  • 基于表格滚动截屏(表格全部展开,没有滚动条)
  • 【LeetCode】【算法】240. 搜索二维矩阵II
  • 鸿蒙next版开发:ArkTS组件通用属性(布局约束)
  • QT常用控件
  • 127.WEB渗透测试-信息收集-ARL(18)
  • 自动化爬虫Selenium
  • 【启程Golang之旅】从零开始构建可扩展的微服务架构
  • 算法魅力-二分查找实战
  • 服务号消息折叠折射出的腾讯傲慢:上云会不会也一样?
  • 红日靶机(七)笔记
  • Ue5 umg学习(二)图像控件,锚点
  • 在PHP8内,用Jenssegers MongoDB扩展来实现Laravel与MongoDB的集成
  • 2024年第四届数字化社会与智能系统国际学术会议(DSInS 2024)
  • 百度世界2024:AI应用的浪潮时刻