当前位置: 首页 > article >正文

go中的并发处理

. Goroutines

概念:

Goroutines 是 Go 的核心并发机制。它们是由 Go 运行时管理的轻量级线程,具有比操作系统线程更少的开销。每个 goroutine 只需少量的内存(大约 2KB),并且由 Go 运行时负责调度和管理,哪怕是java发展到21的虚拟线程和go比也还是不够轻量

创建:

go func() {
    fmt.Println("Hello from goroutine")
}()

使用 go 关键字前缀一个函数调用即可创建一个新的 goroutine。它将异步执行指定的函数。
调度模型:

Go 使用 M调度模型,其中 M 个用户级线程(goroutines)通过 N 个操作系统线程(OS threads)进行调度。Go 运行时会动态地将 goroutines 分配到操作系统线程上,减少了上下文切换的开销。
底层原理:
Go 的调度器使用协作式调度,依靠 goroutine 的堆栈跟踪(stack traces)和调度策略(如抢占式调度)来管理并发执行。调度器负责在 goroutines 和 OS threads 之间进行合理分配,以实现高效的并发执行。
###Channel

概念:
Channel 是用于 goroutine 之间通信的管道,可以安全地传递数据。它们实现了数据传递的同步机制,避免了传统的锁竞争问题。
创建:

ch := make(chan int)

发送和接收:

// 发送数据
ch <- 42

// 接收数据
value := <-ch
//数据通过 <- 操作符在 channel 中发送和接收。发送操作会阻塞直到有接收方,接收操作会阻塞直到有发送方。

这样就可以简单模拟java的join 操作 ,只有上一个任务执行完才允许接下来的线程执行

func testTrueCache() {
	//如果是无缓冲的 需要立即读取 所以异步写入
	ch := make(chan string)
	go func() {
		fmt.Println("hello")
		ch <- "hello"
	}()
	fmt.Println(<-ch)//不采用读取 那么可能子携程还没有执行主协诚就结束了
	fmt.Print("这个输出第一是在hello之后")
}

缓冲区:

无缓冲 channel:发送和接收操作必须匹配,否则会阻塞。
注意 串行的代码块不能写如通道会立即死锁 ,因为 无缓冲通道在写入数据时必须立刻有其他协程来读取数据,否则会导致阻塞。串行中,写入数据和读取数据是在同一个协程中进行的,阻塞后根本不会执行下一个代码 这会导致死锁。

func testFialChannelCache() {
	// 创建无缓冲管道
	ch := make(chan int) // 正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据
	defer close(ch)

	// 启动一个新的协程来读取数据
	go func() {
		n := <-ch
		fmt.Println(n)
	}()

	// 写入数据
	ch <- 123 // 不在串行 写入后必须读取 没有容量可以保存 会报错
}

在这里插入图片描述

缓冲 channel:创建时指定缓冲区大小,发送操作只有在缓冲区满时才会阻塞,接收操作只有在缓冲区空时才会阻塞。
go

ch := make(chan int, 2) // 创建一个缓冲区大小为 2 的 channel

关闭:

可以使用 close 函数关闭 channel,以表明没有更多数据将被发送。

//建议创建号channel后就使用
 defer close(ch)
关闭后的 channel 仍然可以读取数据,但不能再发送数据。读取操作会返回 channel 的零值。

底层原理:

Channel 是基于锁和条件变量实现的。每个 channel 有一个缓冲区和一个互斥锁,用于协调数据的发送和接收操作。

只读通道和只写通道的申明创建

var ch <-chan int // 只读通道,只能接收 int 类型的数据
var ch chan<- int // 只写通道,只能发送 int 类型的数据


WaitGroup

刚才使用无缓冲的通道模拟 java中的join api 效果
概念:

sync.WaitGroup 用于等待一组 goroutine 完成任务,确保所有并发操作完成后才继续执行。

var wg sync.WaitGroup
wg.Add(1) // 增加等待计数
go func() {
    defer wg.Done() // 任务完成时减少计数
    // 任务代码
}()
wg.Wait() // 等待所有任务完成

底层就是维护了一个原子性的任务数 wg.Done() 进行任务数自减1wg.Wait() // 等待所有任务完成 只有为0才会进行执行

这样就更直观的可以发出来 可以保证多个无序携程的有序性

func quicktest() {
	var wait sync.WaitGroup
	// 指定子协程的数量  本质是原子性的自增自减
	wait.Add(1) //自增
	go func() {
		fmt.Println(1)
		// 执行完毕
		wait.Done() //自减
	}()
	// 等待子协程
	wait.Wait() //判断是否为0 阻塞等待 只有为0才会继续执行 这样就保证了执行的有序性
	wait.Add(1)
	go func() {
		fmt.Println(2)
		// 执行完毕
		wait.Done() //自减
	}()
	wait.Wait()

	wait.Add(1)
	go func() {
		fmt.Println(3)
		// 执行完毕
		wait.Done() //自减
	}()
	wait.Wait()
	wait.Add(1)
	go func() {
		fmt.Println(4)
		// 执行完毕
		wait.Done() //自减
	}()

	// 等待子协程
	wait.Wait() //判断是否为0 阻塞等待
	fmt.Println("完成")
}

Select

概念:

select 语句允许一个 goroutine 等待多个 channel 操作,并在其中一个操作准备好时执行相应的代码块。它类似于 Java 中的 Selector,用于处理多个异步事件,和netty一样 用到了seletor 多路复用思想
使用:

go
基本用法 类似 swtich case go中的stich 不会像Java一样需要显示申明return返回

select {
case msg := <-ch1:
    fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
    fmt.Println("Received from ch2:", msg)
case <-time.After(time.Second):
    fmt.Println("Timeout")
}

select 语句会阻塞,直到其中一个 case 准备好。time.After 是一个用于超时的示例,它创建一个定时器 channel。
默认分支:

可以使用 default 分支来避免 select 阻塞。

func main() {
	test()
}
func test() {
	chA := make(chan int)
	chB := make(chan int)
	chC := make(chan int)
	defer func() {
		close(chA)
		close(chB)
		close(chC)
	}()
	// 开启一个新的协程
	go func() {
		// 向A管道写入数据

		chA <- 1

	}()
	go func() {
		// 向A管道写入数据

		chB <- 1

	}()
	go func() {
		// 向A管道写入数据

		chC <- 1

	}()
	// 选择器 这样就可以监控 多个通道 有数据的时候就触发 如果都没有数据 就阻塞
	// 这里的select 语句会阻塞,直到有某个通道有数据可读
	//如果都同时 检查到有数据可读,则随机选择一个进行读取
Loop: //break Loop 后不会执行:后的代码了
	for {
		select {
		case n, ok := <-chA:
			fmt.Println("写入通道A")
			fmt.Println(n, ok)
		case n, ok := <-chB:
			fmt.Println("写入通道B")
			fmt.Println(n, ok)
		case n, ok := <-chC:
			fmt.Println("写入通道C")
			fmt.Println(n, ok)
			//超时时间跳出进行下一次循环
		case <-time.After(time.Second): // 设置1秒的超时时间 //返回只读通通道
			break Loop // 退出循环 避免死循环锁毒素 还可以一直监听

		}

	}

}

底层原理:

select 通过轮询和系统调用来检查多个 channel 的状态,使用系统级的 poll 或 epoll(在 Linux 上)机制来实现高效的 I/O 多路复用。

go中的锁也相对轻量

Mutex(互斥锁):

概念:

sync.Mutex 用于保护临界区,确保在同一时间只有一个 goroutine 可以访问共享资源。

使用:

var mu sync.Mutex
mu.Lock()
// 临界区代码
mu.Unlock()

比如这种代码进行应用

/*
*
这种情况肯定是脏数据的 10个携程并发执行肯定有脏数据 并且还无法确 还没有等待确定有序性

解决方案:
. 加锁 互斥锁l
*/
func main() {
	wait.Add(10) // 等待10个协程完成
	for i := 0; i < 10; i++ {
		go func(data *int) {
			// 模拟访问耗时
			// 加锁 同一时间只有有所的次啊可以执行
			lock.Lock()
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
			// 访问数据
			temp := *data
			// 模拟计算耗时
			time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
			ans := 1
			// 修改数据
			*data = temp + ans

			lock.Unlock() //数据修改完毕了才可以解锁
			fmt.Println(*data)
			wait.Done()
		}(&count)
	}
	wait.Wait()
	fmt.Println("最终结果", count)
}

只有数据处理时候加锁才能保证有序性,多个携程并发处理数据 会造成脏数据

底层原理:基于自旋锁和操作系统的原子操作实现,使用互斥锁来保护数据的一致性。

RWMutex(读写锁):

概念:

sync.RWMutex 允许多个 goroutine 同时读取数据,但在写操作时独占访问。本质就是为了优化读写分离的情况,写锁还是写携程之间互斥,读锁是为了保证有读锁的携程允许时候写锁的携程就堵塞

func main() {
	wait.Add(12) //12个任务携程

	// 读多写少
	go func() {
		for i := 0; i < 3; i++ {
			go Write(&count)
		}
		wait.Done()
	}()
	go func() {
		for i := 0; i < 7; i++ {
			go Read(&count)
		}
		wait.Done()
	}()
	// 等待子协程结束
	wait.Wait()
	fmt.Println("最终结果", count)
}

func Read(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
	rw.RLock()
	fmt.Println("拿到读锁")
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	fmt.Println("释放读锁", *i)
	rw.RUnlock()
	wait.Done()
}

func Write(i *int) {
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	rw.Lock()
	fmt.Println("拿到写锁")
	temp := *i
	time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	*i = temp + 1
	fmt.Println("释放写锁", *i)
	rw.Unlock()
	wait.Done()
}

Context

概念:

context 包用于在 goroutine 中传递取消信号、截止时间和请求范围的信息,类似于 Java 中的 Future 或 ExecutorService 的取消机制。

在 Go 中,context 包提供了上下文管理的功能,主要用于控制 goroutine 的生命周期、传递请求范围的值以及处理超时和取消信号。context 是在并发编程中不可或缺的工具,尤其是在处理网络请求和后台任务时。

Context 的主要用途

  • 控制 goroutine 的生命周期:通过 context,你可以在父任务结束或取消时,通知所有子 goroutine 停止执行,从而避免 goroutine 泄漏。

  • 传递请求范围的值:context 可以在函数调用链之间传递值,这些值通常是与请求有关的信息,比如用户身份、授权令牌、请求截止时间等。

  • 处理超时和取消信号:context 可以设定超时时间或在外部取消信号时终止操作,这对于网络请求和长时间运行的任务非常有用。

Context 的类型

  • context.Background():通常作为主函数的起点使用,表示一个空的上下文。
  • context.TODO():当你还不确定要用什么样的上下文时,可以使用它作为占位符。

Context 的使用模式
在实际编程中,context 通常以以下几种方式使用:

取消任务:
当需要在某个操作完成后取消所有相关的 goroutine 时,使用 context.WithCancel。
超时控制:
当一个操作需要在指定时间内完成时,使用 context.WithTimeout。
截止时间控制:
与超时类似,但使用的是具体的时间点而不是时间间隔,使用 context.WithDeadline。
Context 实例详细讲解
go

package main

import (
    "context"
    "fmt"
    "time"
)

// 模拟一个处理请求的函数
func processRequest(ctx context.Context, duration time.Duration) {
    select {
    case <-time.After(duration):
        fmt.Println("Request processed")
    case <-ctx.Done():
        fmt.Println("Request canceled:", ctx.Err())
    }
}

func main() {
    // 创建一个 context,设置超时时间为 3 秒
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // 确保取消函数被调用,以释放资源

    // 启动一个 goroutine 来处理请求,预计处理时间为 5 秒
    go processRequest(ctx, 5*time.Second)

    // 模拟一些其他操作
    time.Sleep(2 * time.Second)
    
    // 主 goroutine 等待 6 秒,以便观察子 goroutine 的处理情况
    time.Sleep(6 * time.Second)
    fmt.Println("Main function ends")
}

详细解释
创建 Context:context.WithTimeout(context.Background(), 3*time.Second) 创建了一个有超时限制的 context,3 秒后自动取消。
处理请求:processRequest 是一个模拟的请求处理函数,它要么在指定的 duration 后完成请求处理,要么在 context 被取消时停止操作。
启动 goroutine:我们在主函数中启动了一个 goroutine 来处理请求,这个 goroutine 模拟了一个 5 秒的处理时间。
观察取消效果:由于 context 设置的超时时间是 3 秒,而请求的处理时间是 5 秒,3 秒后 context 会被自动取消,导致 processRequest 提前退出并输出取消原因。
实际用途
在实际应用中,context 经常用于以下场景:

API 请求的超时控制:确保 API 请求在设定的时间内完成,避免服务阻塞。
批量任务的并发处理:在父任务取消时,自动终止所有子任务,避免资源浪费。
数据库操作:结合 context 来设置数据库查询的超时时间,确保数据库操作不会无限制地阻塞。

go中的池化技术–工作池

即使 Goroutine 很轻量,但在某些情况下,仍需要限制同时运行的 Goroutine 数量:

资源限制:

当每个任务都涉及到大量的资源(如 CPU、内存、网络)时,过多的 Goroutine 会导致资源竞争,影响系统性能。
后端服务限制:如果你的程序需要调用外部服务或数据库,这些服务可能有并发连接的限制,需要控制并发量。
稳定性和可控性:使用工作池可以更好地管理任务的执行,提供任务队列、超时、重试等机制,提升系统的稳定性和可靠性。
工作池的作用:

限制并发数:

通过固定数量的工作 Goroutine,限制同时执行的任务数量。
任务调度:将任务放入队列,等待空闲的 Worker 处理。
结果收集:汇总任务执行的结果,方便后续处理。
错误处理:统一管理任务执行中的错误和异常。

在 Go 中,实现工作池通常有以下几种方式:

使用 Channel 实现简单的工作池:通过创建一定数量的 Worker Goroutine,从任务 Channel 中获取任务并执行。
使用同步包(如 sync.WaitGroup)协调任务执行:确保所有任务执行完毕后再进行后续操作。
使用第三方库:社区中有一些成熟的工作池库,如 antlabs/workerpool、gammazero/workerpool 等,提供了丰富的功能和更好的性能。
接下来,下面的代码示例来演示如何在 Go 中实现工作池。


// 任务类型
type Task struct {
	id int
}

// 模拟处理任务的方法
func (t Task) Process() {
	fmt.Printf("Processing task with id: %d\n", t.id)
	time.Sleep(time.Second) // 模拟任务处理耗时
}

// 工作池函数
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		fmt.Printf("Worker %d started task %d\n", id, task.id)
		task.Process()
		fmt.Printf("Worker %d finished task %d\n", id, task.id)
	}
}

func main() {
	const numWorkers = 3 // Worker 的数量
	const numTasks = 10  // 任务的数量

	tasks := make(chan Task, numTasks) // 创建一个任务通道
	var wg sync.WaitGroup

	// 启动 worker
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go worker(i, tasks, &wg)
	}

	// 发送任务到任务通道
	for i := 1; i <= numTasks; i++ {
		tasks <- Task{id: i}
	}

	close(tasks) // 关闭任务通道,不再发送新任务

	// 等待所有 worker 完成
	wg.Wait()
	fmt.Println("All tasks processed")
}


详细解释

  • Task 类型:

这是一个简单的任务类型,其中包含了任务的 ID。Process 方法模拟任务的处理过程。
Worker 函数:

worker 函数是每个 worker 执行的工作。它从 tasks 通道中接收任务,并调用 Process 方法处理任务。sync.WaitGroup 用于等待所有 worker 完成任务。

  • 启动 Worker:

numWorkers 决定了 worker 的数量。在这里,启动了 3 个 worker。
发送任务:

  • 将 numTasks 数量的任务发送到 tasks 通道,任务会被 worker 逐一处理。
    关闭通道:

任务发送完毕后,关闭任务通道,告诉所有 worker 没有更多的任务了。
等待所有 Worker 完成:

使用 sync.WaitGroup 确保主程序在所有 worker 完成任务后再退出。

Java 线程池通过 Executors 工具类来创建不同类型的线程池,Go 则通过自定义的 worker pool 和 goroutine 来管理并发


http://www.kler.cn/a/287900.html

相关文章:

  • SpringBoot之LazyInitializationBeanFactoryPostProcessor类源码学习
  • Chatper 4: Implementing a GPT model from Scratch To Generate Text
  • 如何选择Ubuntu版本
  • RabbitMQ 高可用方案:原理、构建与运维全解析
  • 基于springboot+vue+微信小程序的宠物领养系统
  • Elasticsearch学习(2) :DSL和RestClient实现搜索文档
  • 书生大模型实战营(1)——InterStudio基础知识+Vscode SSH连接远程服务器+Linux基础指令
  • 深度解析MFT损坏:原因、恢复策略与预防措施
  • 知道哪些键值型存储数据结构?这些数据结构的时间、空间复杂度分别是什么?什么时候选⽤?
  • 【C++】C++ 多态的底层实现
  • Python进阶04-网络编程
  • 和字符串有关的经典OJ题——字符串的逆置和字符串的翻转
  • 【TPAMI 2024】Occlusion-Aware Self-Supervised Monocular 6D Object Pose Estimation
  • 音视频解码 AVIO内存输入模式
  • nexus 清理 docker 镜像
  • rv1126-rv1109-mkcramfs-mkfs.cramfs-打包文件系统
  • 干货含源码!如何用Java后端操作Docker(命令行篇)
  • 基于STM32实现智能园艺系统
  • 数据结构代码集训day14(适合考研、自学、期末和专升本)
  • 从零开始,认识游戏设计师(2)游戏源于设计师
  • 新加坡:区块链与加密货币的全球创新中心
  • FATE Board 执行流程探索
  • C++20 是 C++ 语言的一次重大更新
  • 【dp力扣】环绕字符串中唯一的子字符串
  • 【C语言】通讯录的实现(详解)
  • Ansible一键安装Harbor服务