剖析go协程池实现原理
go协程池实现
在go语言编程中有一种池肯定避免不了,那就是-协程池,无论你是日常工作还是面试中面试官都无法避免协程池,掌握协程池你也就算是入门go的并发编程了,打一波广告后面会有专门的文章来介绍如何在go中进行并发编程。
协程本身也是一种资源,但是协程池有自己的特殊性,那是由协程池执行任务的特殊性决定的,协程作为资源使用时其实是在消费其他资源,也就是说必须向协程池提供一个统一的接口,所有需要协程池执行的任务都需要实现该接口,然后被协程池统一调用。
这里我们就要求所有需要被协程执行的函数都要实现Worker接口的Task方法
type Worker interface {
Task()
}
另外一个特殊的点,在于go协程执行时特别是用户添加任务时最好能够让用户能够感知到协程池当前的工作状态,因此这里采用无缓冲区的chan作为协程池任务传递工具,能直接根据向chan添加任务时的状态感知到当前协程池的工作状态。这里采用最简单的阻塞方式来实现,当协程池忙的情况下直接阻塞直到协程池空闲用户才能将自己的任务添加到协程池的管道中进行执行。
go中使用协程进行工作,因此会创建并使用协程池进行工作非常的有必要,work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组goroutine 配合执
结构体
协程池工作比较单一,就是调用指定的Task方法,另外因为给出任务时最好能立刻执行或者不能立刻执行需要让用户等待,因此协程池最好使用无缓冲的通道,这样当用户需要执行Task时就能直接从接口中感知到当前协程池是否空闲了。
type Worker interface {
Task()
}
type Pool struct {
// 使用无缓冲通道实现协程池
work chan Worker
// 辅助计数器,用于协程池同步
wg sync.WaitGroup
}
创建协程池
创建协程池需要指定最大并发数,当有新的任务加入时,会立即被执行,而当没有任务时,所有协程池中的协程阻塞竞争等待通道中的"任务"。
// New 创建一个工作池
func New(maxGoroutine int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutine)
for i := 0; i < maxGoroutine; i++ {
go func() {
// 一直循环取任务,直到work被关闭为止并且通道中的任务执行完毕为止
for w := range p.work {
w.Task()
}
// 退出时候,减少一个计数器
p.wg.Done()
}()
}
return &p
}
协程池启动和关闭
触发协程池工作很简单,只需要向协程池等待的通道中放入一个任务即可,当协程池关闭时,所有任务都会被立即执行,当所有任务执行完毕,协程池中的所有协程都会退出。
// Run 将任务放入工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
// Shutdown 等待所有goroutine完成工作
func (p *Pool) Shutdown() {
// 关闭work所有协程完成任务之后会退出for循环
close(p.work)
p.wg.Wait()
}
将上述实现汇总之后如下
work/work.go
package work
import "sync"
// Worker interface 必须满足worker的要求才能使用工作池
type Worker interface {
Task()
}
// Pool 提供一个goroutine池,这个池可以完成任何已提交的woker任务
type Pool struct {
work chan Worker
wg sync.WaitGroup
}
// New 创建一个工作池
func New(maxGoroutine int) *Pool {
p := Pool{
work: make(chan Worker),
}
p.wg.Add(maxGoroutine)
for i := 0; i < maxGoroutine; i++ {
go func() {
// 一直循环取任务,直到work被关闭为止
for w := range p.work {
w.Task()
}
// 退出时候,减少一个计数器
p.wg.Done()
}()
}
return &p
}
// Run 将任务放入工作池
func (p *Pool) Run(w Worker) {
p.work <- w
}
// Shutdown 等待所有goroutine完成工作
func (p *Pool) Shutdown() {
// 关闭work所有协程完成任务之后会退出for循环
close(p.work)
p.wg.Wait()
}
对实现的接口进行功能测试
// This sample program demonstrates how to use the work package
// to use a pool of goroutines to get work done.
package main
import (
"log"
"sync"
"time"
"work"
)
// names provides a set of names to display.
var names = []string{
"steve",
"bob",
"mary",
"therese",
"jason",
}
// namePrinter provides special support for printing names.
type namePrinter struct {
name string
}
// Task implements the Worker interface.
func (m *namePrinter) Task() {
log.Println(m.name)
time.Sleep(time.Second)
}
// main is the entry point for all Go programs.
func main() {
// Create a work pool with 2 goroutines.
p := work.New(2)
var wg sync.WaitGroup
wg.Add(100 * len(names))
for i := 0; i < 100; i++ {
// Iterate over the slice of names.
for _, name := range names {
// Create a namePrinter and provide the
// specific name.
np := namePrinter{
name: name,
}
go func() {
// Submit the task to be worked on. When RunTask
// returns we know it is being handled.
p.Run(&np)
wg.Done()
}()
}
}
wg.Wait()
// Shutdown the work pool and wait for all existing work
// to be completed.
p.Shutdown()
}
***如果感觉文章对你有用欢迎点赞,评论和关注,谢谢! ***
附录
- 参考-《go语言实战》
- 代码仓库:gitee work