当前位置: 首页 > article >正文

算法编程题-golang语言协程池

算法编程题-golang语言协程池

      • 协程池
      • 代码实现

实现线程池或者协程池是面试经常需要手写的题型。本文将介绍协程池如何实现。

协程池

池化技术是很重要的一种思想,将一些频繁使用但是创建开销比较大的对象自定义保存起来,反复使用,典型的有线程池、内存池、mysql连接池等等。协程是用户级别更加轻量化的线程,使用协程池来维护一个永远工作不回收的协程池也可以优化业务逻辑的效率。
笔者的协程在思路上借鉴了一些Java线程池七大参数的思想,分为核心协程和非核心协程,初始时只有核心协程工作,如果工作数量比较多还会起一些非核心协程来工作,但是这些非核心协程如果长时间没有工作也会被销毁,这其中涉及到的工作数量比较多的阈值、非核心协程多少时间不工作会被销毁都是可以由用户调整的参数。工作其实就是函数的抽象,本文的协程池通过对于信道的封装定义了一个future对象,用来保存一个任务的运行结果。协程池在启动后,会写一个协程daemon,用来监测协程池的状态,决定是否要起非核心协程。此外,还实现了一个优雅关闭协程池的功能,保证所有协程在完成自己手上的工作后立即退出,然后主程序里面将所有的信道关闭。任务的分发则是通过原生的信道来实现的。
不可否认,这里实现的这个协程池只是一个demo,后续可以参考golang优秀的第三方协程池库进行相应的优化。

代码实现

代码实现如下:

type Future struct {
	ch chan interface{}
}

func (f *Future) Get() interface{} {
	var res interface{}
	res, ok := <- f.ch
	for !ok {
		time.Sleep(1 * time.Millisecond)
		res, ok = <- f.ch
	}
	return res
}

type Work struct {
	work func(args []interface{}) interface{}
	args []interface{}
	res  *Future
}


type Worker struct {
	taskChannel chan Work
	isCore      bool
	quit        chan struct{}
	keepAliveTime  time.Duration
	wg *sync.WaitGroup
}

func NewWorker(taskChannle chan Work, isCore bool, quit chan struct{}, keepAliveTime time.Duration, wg *sync.WaitGroup) *Worker {
	return &Worker{taskChannel: taskChannle, isCore: isCore, quit: quit, keepAliveTime: keepAliveTime, wg: wg}
}


func (w *Worker) Work() {
	defer w.wg.Done()
	timer := time.NewTimer(w.keepAliveTime)
	for {
		select {
		case <- timer.C:
			if !w.isCore {
				return
			}
		case <- w.quit: 
			return
		case task := <- w.taskChannel:
			res := task.work(task.args)
			task.res.ch <- res
			timer.Reset(w.keepAliveTime)
			continue
		}
	}
}


type ThreadPool struct {
	workers  []*Worker
	coreThreads int
	maxThreads int
	keepAliveTime time.Duration
	taskChannel chan Work
	quit chan struct{}
	threshold int
	discount int
	wg *sync.WaitGroup
}

func NewThreadPool(coreThreads, maxThreads int, keepAliveTime time.Duration, threshold, discount int) *ThreadPool {
	taskChannel := make(chan Work, maxThreads * 2 + 1)
	quit := make(chan struct{}, maxThreads)
	workers := make([]*Worker, 0)
	var wg sync.WaitGroup
	for i := 0; i < coreThreads; i++ {
		worker := NewWorker(taskChannel, true, quit, keepAliveTime, &wg)
		go worker.Work()
		workers = append(workers, worker)
		wg.Add(1)
	}
	t := &ThreadPool{workers: workers, coreThreads: coreThreads, maxThreads: maxThreads, keepAliveTime: keepAliveTime, taskChannel: taskChannel, quit: quit, threshold: threshold, discount: discount, wg: &wg}
	go t.daemon()
	return t
}

// Submit 往协程池中提交任务
func (t *ThreadPool) Submit(function func(args []interface{}) interface{}, args []interface{}) *Future {
	resChannel := make(chan interface{}, 1)
	future := &Future{ch: resChannel}
	work := Work{work: function, args: args, res: future}
	t.taskChannel <- work
	return future
}

// 线程池后台监控程序
func (t *ThreadPool) daemon() {
	if len(t.taskChannel) > t.threshold {
		// 准备在起的协程数
		threadNum := min(t.maxThreads - len(t.workers), len(t.taskChannel) / t.discount)
		for i := 0; i < threadNum; i++ {
			worker := NewWorker(t.taskChannel, false, t.quit, t.keepAliveTime, t.wg)
			go worker.Work()
			t.workers = append(t.workers, worker)
			t.wg.Add(1)
		}
	}
}

// Close 优雅关闭协程池
func (t *ThreadPool) Close() {
	for i := 0; i < t.maxThreads; i++ {
		t.quit <- struct{}{}
	}
	t.wg.Wait()
	close(t.quit)
	close(t.taskChannel)
}

一个简单的单元测试例子:

func TestThreadPool(t *testing.T) {
	tp := NewThreadPool(2, 5, 2 * time.Second, 8, 4)
	args := []interface{}{3, 4}
	for i := 0; i < 10; i++ {
		tp.Submit(func(args []interface{}) interface{} {
			a := args[0].(int)
			b := args[1].(int)
			c := a + b
			time.Sleep(5 * time.Second)
			return c
		}, args)
	}

	f := tp.Submit(func(args []interface{}) interface{} {
		a := args[0].(int)
		b := args[1].(int)
		c := a + b
		time.Sleep(5 * time.Second)
		return c
	}, args)
	res := f.Get()
	t.Logf("get 4+3=%d", res.(int))
	tp.Close()
}


http://www.kler.cn/a/398564.html

相关文章:

  • 哈希表学习分享
  • vuex和pinia的区别
  • 活着就好20241118
  • 基础IO2
  • JS学习日记(jQuery库)
  • vue3 element el-table实现表格动态增加/删除/编辑表格行,带有校验规则
  • Java--反射
  • vTESTstudio系列15--vTESTstudio-Doors的需求和测试用例的管理
  • 5-对象的访问权限
  • 单片机 串口实验 实验五
  • MongoDB创建只读用户并授权指定集合的查询权限
  • 利用Excel批量生成含二维码的设备管理标签卡片
  • 多目标优化算法:多目标鹅算法(MOGOOSE)求解UF1-UF10,提供完整MATLAB代码
  • 计算机网络-理论部分(一):概览
  • 两周学习js总结
  • 详解八大排序(三)------(快速排序)
  • LLM性能优化中的一些概念扫盲
  • LabVIEW中的UDP与TCP比较
  • React Native 全栈开发实战班 - 网络与数据之网络请求基础
  • 实习冲刺练习 第二十四天
  • 《Django 5 By Example》阅读笔记:p54-p75
  • 无需制作PE系统盘,完成更换固态,数据迁移
  • Windows docker下载minio出现“Using default tag: latestError response from daemon”
  • Matlab使用深度网络设计器为迁移学习准备网络
  • Spark读MySQL数据rdd分区数受什么影响,读parquet、hdfs、hive、Doris、Kafka呢?
  • spring-gateway网关聚合swagger实现多个服务接口切换