go中的并发处理
. Goroutines
概念:
Goroutines 是 Go 的核心并发机制。它们是由 Go 运行时管理的轻量级线程,具有比操作系统线程更少的开销。每个 goroutine 只需少量的内存(大约 2KB),并且由 Go 运行时负责调度和管理,哪怕是java发展到21的虚拟线程和go比也还是不够轻量
创建:
go func() {
fmt.Println("Hello from goroutine")
}()
使用 go 关键字前缀一个函数调用即可创建一个新的 goroutine。它将异步执行指定的函数。
调度模型:
Go 使用 M调度模型,其中 M 个用户级线程(goroutines)通过 N 个操作系统线程(OS threads)进行调度。Go 运行时会动态地将 goroutines 分配到操作系统线程上,减少了上下文切换的开销。
底层原理:
Go 的调度器使用协作式调度,依靠 goroutine 的堆栈跟踪(stack traces)和调度策略(如抢占式调度)来管理并发执行。调度器负责在 goroutines 和 OS threads 之间进行合理分配,以实现高效的并发执行。
###Channel
概念:
Channel 是用于 goroutine 之间通信的管道,可以安全地传递数据。它们实现了数据传递的同步机制,避免了传统的锁竞争问题。
创建:
ch := make(chan int)
发送和接收:
// 发送数据
ch <- 42
// 接收数据
value := <-ch
//数据通过 <- 操作符在 channel 中发送和接收。发送操作会阻塞直到有接收方,接收操作会阻塞直到有发送方。
这样就可以简单模拟java的join 操作 ,只有上一个任务执行完才允许接下来的线程执行
func testTrueCache() {
//如果是无缓冲的 需要立即读取 所以异步写入
ch := make(chan string)
go func() {
fmt.Println("hello")
ch <- "hello"
}()
fmt.Println(<-ch)//不采用读取 那么可能子携程还没有执行主协诚就结束了
fmt.Print("这个输出第一是在hello之后")
}
缓冲区:
无缓冲 channel:发送和接收操作必须匹配,否则会阻塞。
注意 串行的代码块不能写如通道会立即死锁 ,因为 无缓冲通道在写入数据时必须立刻有其他协程来读取数据,否则会导致阻塞。串行中,写入数据和读取数据是在同一个协程中进行的,阻塞后根本不会执行下一个代码 这会导致死锁。
func testFialChannelCache() {
// 创建无缓冲管道
ch := make(chan int) // 正因为无缓冲管道无法存放数据,在向管道写入数据时必须立刻有其他协程来读取数据
defer close(ch)
// 启动一个新的协程来读取数据
go func() {
n := <-ch
fmt.Println(n)
}()
// 写入数据
ch <- 123 // 不在串行 写入后必须读取 没有容量可以保存 会报错
}
缓冲 channel:创建时指定缓冲区大小,发送操作只有在缓冲区满时才会阻塞,接收操作只有在缓冲区空时才会阻塞。
go
ch := make(chan int, 2) // 创建一个缓冲区大小为 2 的 channel
关闭:
可以使用 close 函数关闭 channel,以表明没有更多数据将被发送。
//建议创建号channel后就使用
defer close(ch)
关闭后的 channel 仍然可以读取数据,但不能再发送数据。读取操作会返回 channel 的零值。
底层原理:
Channel 是基于锁和条件变量实现的。每个 channel 有一个缓冲区和一个互斥锁,用于协调数据的发送和接收操作。
只读通道和只写通道的申明创建
var ch <-chan int // 只读通道,只能接收 int 类型的数据
var ch chan<- int // 只写通道,只能发送 int 类型的数据
WaitGroup
刚才使用无缓冲的通道模拟 java中的join api 效果
概念:
sync.WaitGroup 用于等待一组 goroutine 完成任务,确保所有并发操作完成后才继续执行。
var wg sync.WaitGroup
wg.Add(1) // 增加等待计数
go func() {
defer wg.Done() // 任务完成时减少计数
// 任务代码
}()
wg.Wait() // 等待所有任务完成
底层就是维护了一个原子性的任务数 wg.Done() 进行任务数自减1wg.Wait() // 等待所有任务完成 只有为0才会进行执行
这样就更直观的可以发出来 可以保证多个无序携程的有序性
func quicktest() {
var wait sync.WaitGroup
// 指定子协程的数量 本质是原子性的自增自减
wait.Add(1) //自增
go func() {
fmt.Println(1)
// 执行完毕
wait.Done() //自减
}()
// 等待子协程
wait.Wait() //判断是否为0 阻塞等待 只有为0才会继续执行 这样就保证了执行的有序性
wait.Add(1)
go func() {
fmt.Println(2)
// 执行完毕
wait.Done() //自减
}()
wait.Wait()
wait.Add(1)
go func() {
fmt.Println(3)
// 执行完毕
wait.Done() //自减
}()
wait.Wait()
wait.Add(1)
go func() {
fmt.Println(4)
// 执行完毕
wait.Done() //自减
}()
// 等待子协程
wait.Wait() //判断是否为0 阻塞等待
fmt.Println("完成")
}
Select
概念:
select 语句允许一个 goroutine 等待多个 channel 操作,并在其中一个操作准备好时执行相应的代码块。它类似于 Java 中的 Selector,用于处理多个异步事件,和netty一样 用到了seletor 多路复用思想
使用:
go
基本用法 类似 swtich case go中的stich 不会像Java一样需要显示申明return返回
select {
case msg := <-ch1:
fmt.Println("Received from ch1:", msg)
case msg := <-ch2:
fmt.Println("Received from ch2:", msg)
case <-time.After(time.Second):
fmt.Println("Timeout")
}
select 语句会阻塞,直到其中一个 case 准备好。time.After 是一个用于超时的示例,它创建一个定时器 channel。
默认分支:
可以使用 default 分支来避免 select 阻塞。
func main() {
test()
}
func test() {
chA := make(chan int)
chB := make(chan int)
chC := make(chan int)
defer func() {
close(chA)
close(chB)
close(chC)
}()
// 开启一个新的协程
go func() {
// 向A管道写入数据
chA <- 1
}()
go func() {
// 向A管道写入数据
chB <- 1
}()
go func() {
// 向A管道写入数据
chC <- 1
}()
// 选择器 这样就可以监控 多个通道 有数据的时候就触发 如果都没有数据 就阻塞
// 这里的select 语句会阻塞,直到有某个通道有数据可读
//如果都同时 检查到有数据可读,则随机选择一个进行读取
Loop: //break Loop 后不会执行:后的代码了
for {
select {
case n, ok := <-chA:
fmt.Println("写入通道A")
fmt.Println(n, ok)
case n, ok := <-chB:
fmt.Println("写入通道B")
fmt.Println(n, ok)
case n, ok := <-chC:
fmt.Println("写入通道C")
fmt.Println(n, ok)
//超时时间跳出进行下一次循环
case <-time.After(time.Second): // 设置1秒的超时时间 //返回只读通通道
break Loop // 退出循环 避免死循环锁毒素 还可以一直监听
}
}
}
底层原理:
select 通过轮询和系统调用来检查多个 channel 的状态,使用系统级的 poll 或 epoll(在 Linux 上)机制来实现高效的 I/O 多路复用。
锁
go中的锁也相对轻量
Mutex(互斥锁):
概念:
sync.Mutex 用于保护临界区,确保在同一时间只有一个 goroutine 可以访问共享资源。
使用:
var mu sync.Mutex
mu.Lock()
// 临界区代码
mu.Unlock()
比如这种代码进行应用
/*
*
这种情况肯定是脏数据的 10个携程并发执行肯定有脏数据 并且还无法确 还没有等待确定有序性
解决方案:
. 加锁 互斥锁l
*/
func main() {
wait.Add(10) // 等待10个协程完成
for i := 0; i < 10; i++ {
go func(data *int) {
// 模拟访问耗时
// 加锁 同一时间只有有所的次啊可以执行
lock.Lock()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
// 访问数据
temp := *data
// 模拟计算耗时
time.Sleep(time.Millisecond * time.Duration(rand.Intn(5000)))
ans := 1
// 修改数据
*data = temp + ans
lock.Unlock() //数据修改完毕了才可以解锁
fmt.Println(*data)
wait.Done()
}(&count)
}
wait.Wait()
fmt.Println("最终结果", count)
}
只有数据处理时候加锁才能保证有序性,多个携程并发处理数据 会造成脏数据
底层原理:基于自旋锁和操作系统的原子操作实现,使用互斥锁来保护数据的一致性。
RWMutex(读写锁):
概念:
sync.RWMutex 允许多个 goroutine 同时读取数据,但在写操作时独占访问。本质就是为了优化读写分离的情况,写锁还是写携程之间互斥,读锁是为了保证有读锁的携程允许时候写锁的携程就堵塞
func main() {
wait.Add(12) //12个任务携程
// 读多写少
go func() {
for i := 0; i < 3; i++ {
go Write(&count)
}
wait.Done()
}()
go func() {
for i := 0; i < 7; i++ {
go Read(&count)
}
wait.Done()
}()
// 等待子协程结束
wait.Wait()
fmt.Println("最终结果", count)
}
func Read(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(500)))
rw.RLock()
fmt.Println("拿到读锁")
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
fmt.Println("释放读锁", *i)
rw.RUnlock()
wait.Done()
}
func Write(i *int) {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
rw.Lock()
fmt.Println("拿到写锁")
temp := *i
time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
*i = temp + 1
fmt.Println("释放写锁", *i)
rw.Unlock()
wait.Done()
}
Context
概念:
context 包用于在 goroutine 中传递取消信号、截止时间和请求范围的信息,类似于 Java 中的 Future 或 ExecutorService 的取消机制。
在 Go 中,context 包提供了上下文管理的功能,主要用于控制 goroutine 的生命周期、传递请求范围的值以及处理超时和取消信号。context 是在并发编程中不可或缺的工具,尤其是在处理网络请求和后台任务时。
Context 的主要用途
-
控制 goroutine 的生命周期:通过 context,你可以在父任务结束或取消时,通知所有子 goroutine 停止执行,从而避免 goroutine 泄漏。
-
传递请求范围的值:context 可以在函数调用链之间传递值,这些值通常是与请求有关的信息,比如用户身份、授权令牌、请求截止时间等。
-
处理超时和取消信号:context 可以设定超时时间或在外部取消信号时终止操作,这对于网络请求和长时间运行的任务非常有用。
Context 的类型
- context.Background():通常作为主函数的起点使用,表示一个空的上下文。
- context.TODO():当你还不确定要用什么样的上下文时,可以使用它作为占位符。
Context 的使用模式
在实际编程中,context 通常以以下几种方式使用:
取消任务:
当需要在某个操作完成后取消所有相关的 goroutine 时,使用 context.WithCancel。
超时控制:
当一个操作需要在指定时间内完成时,使用 context.WithTimeout。
截止时间控制:
与超时类似,但使用的是具体的时间点而不是时间间隔,使用 context.WithDeadline。
Context 实例详细讲解
go
package main
import (
"context"
"fmt"
"time"
)
// 模拟一个处理请求的函数
func processRequest(ctx context.Context, duration time.Duration) {
select {
case <-time.After(duration):
fmt.Println("Request processed")
case <-ctx.Done():
fmt.Println("Request canceled:", ctx.Err())
}
}
func main() {
// 创建一个 context,设置超时时间为 3 秒
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() // 确保取消函数被调用,以释放资源
// 启动一个 goroutine 来处理请求,预计处理时间为 5 秒
go processRequest(ctx, 5*time.Second)
// 模拟一些其他操作
time.Sleep(2 * time.Second)
// 主 goroutine 等待 6 秒,以便观察子 goroutine 的处理情况
time.Sleep(6 * time.Second)
fmt.Println("Main function ends")
}
详细解释
创建 Context:context.WithTimeout(context.Background(), 3*time.Second) 创建了一个有超时限制的 context,3 秒后自动取消。
处理请求:processRequest 是一个模拟的请求处理函数,它要么在指定的 duration 后完成请求处理,要么在 context 被取消时停止操作。
启动 goroutine:我们在主函数中启动了一个 goroutine 来处理请求,这个 goroutine 模拟了一个 5 秒的处理时间。
观察取消效果:由于 context 设置的超时时间是 3 秒,而请求的处理时间是 5 秒,3 秒后 context 会被自动取消,导致 processRequest 提前退出并输出取消原因。
实际用途
在实际应用中,context 经常用于以下场景:
API 请求的超时控制:确保 API 请求在设定的时间内完成,避免服务阻塞。
批量任务的并发处理:在父任务取消时,自动终止所有子任务,避免资源浪费。
数据库操作:结合 context 来设置数据库查询的超时时间,确保数据库操作不会无限制地阻塞。
go中的池化技术–工作池
即使 Goroutine 很轻量,但在某些情况下,仍需要限制同时运行的 Goroutine 数量:
资源限制:
当每个任务都涉及到大量的资源(如 CPU、内存、网络)时,过多的 Goroutine 会导致资源竞争,影响系统性能。
后端服务限制:如果你的程序需要调用外部服务或数据库,这些服务可能有并发连接的限制,需要控制并发量。
稳定性和可控性:使用工作池可以更好地管理任务的执行,提供任务队列、超时、重试等机制,提升系统的稳定性和可靠性。
工作池的作用:
限制并发数:
通过固定数量的工作 Goroutine,限制同时执行的任务数量。
任务调度:将任务放入队列,等待空闲的 Worker 处理。
结果收集:汇总任务执行的结果,方便后续处理。
错误处理:统一管理任务执行中的错误和异常。
在 Go 中,实现工作池通常有以下几种方式:
使用 Channel 实现简单的工作池:通过创建一定数量的 Worker Goroutine,从任务 Channel 中获取任务并执行。
使用同步包(如 sync.WaitGroup)协调任务执行:确保所有任务执行完毕后再进行后续操作。
使用第三方库:社区中有一些成熟的工作池库,如 antlabs/workerpool、gammazero/workerpool 等,提供了丰富的功能和更好的性能。
接下来,下面的代码示例来演示如何在 Go 中实现工作池。
// 任务类型
type Task struct {
id int
}
// 模拟处理任务的方法
func (t Task) Process() {
fmt.Printf("Processing task with id: %d\n", t.id)
time.Sleep(time.Second) // 模拟任务处理耗时
}
// 工作池函数
func worker(id int, tasks <-chan Task, wg *sync.WaitGroup) {
defer wg.Done()
for task := range tasks {
fmt.Printf("Worker %d started task %d\n", id, task.id)
task.Process()
fmt.Printf("Worker %d finished task %d\n", id, task.id)
}
}
func main() {
const numWorkers = 3 // Worker 的数量
const numTasks = 10 // 任务的数量
tasks := make(chan Task, numTasks) // 创建一个任务通道
var wg sync.WaitGroup
// 启动 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, tasks, &wg)
}
// 发送任务到任务通道
for i := 1; i <= numTasks; i++ {
tasks <- Task{id: i}
}
close(tasks) // 关闭任务通道,不再发送新任务
// 等待所有 worker 完成
wg.Wait()
fmt.Println("All tasks processed")
}
详细解释
- Task 类型:
这是一个简单的任务类型,其中包含了任务的 ID。Process 方法模拟任务的处理过程。
Worker 函数:
worker 函数是每个 worker 执行的工作。它从 tasks 通道中接收任务,并调用 Process 方法处理任务。sync.WaitGroup 用于等待所有 worker 完成任务。
- 启动 Worker:
numWorkers 决定了 worker 的数量。在这里,启动了 3 个 worker。
发送任务:
- 将 numTasks 数量的任务发送到 tasks 通道,任务会被 worker 逐一处理。
关闭通道:
任务发送完毕后,关闭任务通道,告诉所有 worker 没有更多的任务了。
等待所有 Worker 完成:
使用 sync.WaitGroup 确保主程序在所有 worker 完成任务后再退出。
Java 线程池通过 Executors 工具类来创建不同类型的线程池,Go 则通过自定义的 worker pool 和 goroutine 来管理并发