当前位置: 首页 > article >正文

剖析go协程池实现原理

go协程池实现

在go语言编程中有一种池肯定避免不了,那就是-协程池,无论你是日常工作还是面试中面试官都无法避免协程池,掌握协程池你也就算是入门go的并发编程了,打一波广告后面会有专门的文章来介绍如何在go中进行并发编程。

协程本身也是一种资源,但是协程池有自己的特殊性,那是由协程池执行任务的特殊性决定的,协程作为资源使用时其实是在消费其他资源,也就是说必须向协程池提供一个统一的接口,所有需要协程池执行的任务都需要实现该接口,然后被协程池统一调用。

这里我们就要求所有需要被协程执行的函数都要实现Worker接口的Task方法

type Worker interface {
	Task()
}

另外一个特殊的点,在于go协程执行时特别是用户添加任务时最好能够让用户能够感知到协程池当前的工作状态,因此这里采用无缓冲区的chan作为协程池任务传递工具,能直接根据向chan添加任务时的状态感知到当前协程池的工作状态。这里采用最简单的阻塞方式来实现,当协程池忙的情况下直接阻塞直到协程池空闲用户才能将自己的任务添加到协程池的管道中进行执行。

在这里插入图片描述
go中使用协程进行工作,因此会创建并使用协程池进行工作非常的有必要,work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组 goroutine 配合执work 包的目的是展示如何使用无缓冲的通道来创建一个 goroutine 池,这些 goroutine 执行并控制一组工作,让其并发执行。在这种情况下,使用无缓冲的通道要比随意指定一个缓冲区大小的有缓冲的通道好,因为这个情况下既不需要一个工作队列,也不需要一组goroutine 配合执

结构体

协程池工作比较单一,就是调用指定的Task方法,另外因为给出任务时最好能立刻执行或者不能立刻执行需要让用户等待,因此协程池最好使用无缓冲的通道,这样当用户需要执行Task时就能直接从接口中感知到当前协程池是否空闲了。

type Worker interface {
	Task()
}

type Pool struct {
	// 使用无缓冲通道实现协程池
	work chan Worker
	// 辅助计数器,用于协程池同步
	wg   sync.WaitGroup
}

创建协程池

创建协程池需要指定最大并发数,当有新的任务加入时,会立即被执行,而当没有任务时,所有协程池中的协程阻塞竞争等待通道中的"任务"。

// New 创建一个工作池
func New(maxGoroutine int) *Pool {
	p := Pool{
		work: make(chan Worker),
	}
	p.wg.Add(maxGoroutine)
	for i := 0; i < maxGoroutine; i++ {
		go func() {
			// 一直循环取任务,直到work被关闭为止并且通道中的任务执行完毕为止
			for w := range p.work {
				w.Task()
			}
			// 退出时候,减少一个计数器
			p.wg.Done()
		}()
	}
	return &p
}

协程池启动和关闭

触发协程池工作很简单,只需要向协程池等待的通道中放入一个任务即可,当协程池关闭时,所有任务都会被立即执行,当所有任务执行完毕,协程池中的所有协程都会退出。

// Run 将任务放入工作池
func (p *Pool) Run(w Worker) {
	p.work <- w
}

// Shutdown 等待所有goroutine完成工作
func (p *Pool) Shutdown() {
	// 关闭work所有协程完成任务之后会退出for循环
	close(p.work)
	p.wg.Wait()
}

将上述实现汇总之后如下

work/work.go

package work

import "sync"

// Worker interface 必须满足worker的要求才能使用工作池
type Worker interface {
	Task()
}

// Pool 提供一个goroutine池,这个池可以完成任何已提交的woker任务
type Pool struct {
	work chan Worker
	wg   sync.WaitGroup
}

// New 创建一个工作池
func New(maxGoroutine int) *Pool {
	p := Pool{
		work: make(chan Worker),
	}
	p.wg.Add(maxGoroutine)
	for i := 0; i < maxGoroutine; i++ {
		go func() {
			// 一直循环取任务,直到work被关闭为止
			for w := range p.work {
				w.Task()
			}
			// 退出时候,减少一个计数器
			p.wg.Done()
		}()
	}
	return &p
}

// Run 将任务放入工作池
func (p *Pool) Run(w Worker) {
	p.work <- w
}

// Shutdown 等待所有goroutine完成工作
func (p *Pool) Shutdown() {
	// 关闭work所有协程完成任务之后会退出for循环
	close(p.work)
	p.wg.Wait()
}

对实现的接口进行功能测试

// This sample program demonstrates how to use the work package
// to use a pool of goroutines to get work done.
package main

import (
	"log"
	"sync"
	"time"

	"work"
)

// names provides a set of names to display.
var names = []string{
	"steve",
	"bob",
	"mary",
	"therese",
	"jason",
}

// namePrinter provides special support for printing names.
type namePrinter struct {
	name string
}

// Task implements the Worker interface.
func (m *namePrinter) Task() {
	log.Println(m.name)
	time.Sleep(time.Second)
}

// main is the entry point for all Go programs.
func main() {
	// Create a work pool with 2 goroutines.
	p := work.New(2)

	var wg sync.WaitGroup
	wg.Add(100 * len(names))

	for i := 0; i < 100; i++ {
		// Iterate over the slice of names.
		for _, name := range names {
			// Create a namePrinter and provide the
			// specific name.
			np := namePrinter{
				name: name,
			}

			go func() {
				// Submit the task to be worked on. When RunTask
				// returns we know it is being handled.
				p.Run(&np)
				wg.Done()
			}()
		}
	}

	wg.Wait()

	// Shutdown the work pool and wait for all existing work
	// to be completed.
	p.Shutdown()
}

***如果感觉文章对你有用欢迎点赞,评论和关注,谢谢! ***
在这里插入图片描述

附录

  1. 参考-《go语言实战》
  2. 代码仓库:gitee work

http://www.kler.cn/a/419648.html

相关文章:

  • 主持人婚礼司仪知识点题库300道;大型免费题库;大风车题库
  • 前端开发 之 15个页面加载特效中【附完整源码】
  • Linux CentOS
  • 图解SSL/TLS 建立加密通道的过程
  • Android 应用单元测试涉及 Telephony 环境初始化问题
  • 用于LiDAR测量的1.58um单芯片MOPA(一)
  • (简单5步实现)部署本地AI大语言模型聊天系统:Chatbox AI + grok2.0大模型
  • 6.824/6.5840 Lab 2: Key/Value Server
  • 农业强国助农平台:科技赋能,助力乡村振兴
  • 【学习笔记】检测基于RTOS的设计中的堆栈溢出-第2部分
  • 威胁驱动的网络安全方法论
  • 如何在 Ubuntu 18.04 上设置 Apache 虚拟主机
  • 家校通小程序实战教程03学生管理
  • 【特斯拉的自动驾驶好在哪】
  • kernel crash数据解析
  • 【Linux】————(日志、线程池及死锁问题)
  • 贪心算法专题(四)
  • Linux的奇妙冒险——进程PCB第一讲
  • 前缀和篇——繁星斗斗数字交织中,觅得效率明月辉光(1)
  • 利用oracle spool配置数据导出脚本
  • 5.2.2 动作标记 getproperty
  • Linux的基本操作及虚拟机设置
  • Spring中@Transactional注解与事务传播机制
  • 【小记】如何刷机
  • Linux:内存文件 基础io
  • 【云原生系列】如何判断哪家云服务器提供商更适合我