当前位置: 首页 > article >正文

Go基于协程池的延迟任务调度器

原理

通过用一个goroutine以及堆来存储要待调度的延迟任务,当达到调度时间后,将其添加到协程池中去执行。
主要是使用了chan、Mutex、atomic及ants协程池来实现。

用途

主要是用于高并发及大量定时任务要处理的情况,如果使用Go协程来实现每次延迟任务的调度,那么数量极大的goroutine将会占用内存,导致性能下降,使用协程池实现延迟任务的调度,会改善该情况。
如在物联网设备中,当连接数量达到几十万时,如果使用goroutine来处理心跳或者活跃检测,频繁的创建销毁goroutine会影响性能。

特色

在常见的cron等开源框架中使用的是数组存储待调度的任务,每次循环时都要排序,并且要删除某个任务则时间复杂度是O(n)。

本文通过使用堆及双重Map优化存储待调度的任务,使得添加任务时间复杂度为O(log n),获取任务时间复杂度为O(1),删除时间复杂度为O(1)。

调度器并不会真正的删除取消任务,当取消任务达到执行时间时,会直接continue,是为了提高删除效率,如果要删除取消任务,那么删除的时间复杂度为O(log n),当有极大量任务时,会占用一些内存,通过空间换时间来提高删除效率,下文也提供了删除取消任务的实现,根据不同的场景使用不同的定时任务。

API

创建

NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) 

//创建协程数是1的延迟任务调度器
s, _ := NewSchedule(1)

创建一个延迟调度任务器,workerNum是协程数量,options是ants协程池的配置,除了WithMaxBlockingTasks不能配置,别的都可以,具体参考:https://github.com/panjf2000/ants

调度一次

func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (TaskId, error) 

//1秒后打印一次时间
taskId, _ := s.ScheduleOne(func() {
		fmt.Println(time.Now())
}, time.Second)

重复调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) 

//每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {
		fmt.Println(time.Now())
}, time.Second)

取消调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) 

//每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {
		fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,取消调度
time.Sleep(3 * time.Second)
s.CancelTask(taskId)

停止调度

func (s *Schedule) Schedule(job func(), duration time.Duration) (TaskId, error) 

//每隔一秒打印一次时间
taskId, _ := s.Schedule(func() {
		fmt.Println(time.Now())
}, time.Second)
//休眠3秒后,停用延迟任务调度器
time.Sleep(3 * time.Second)
s.Shutdown()

代码

package schedule

import (
	"container/heap"
	"errors"
	"github.com/panjf2000/ants/v2"
	"math"
	"sync/atomic"
	"time"
)

var (
	// ErrScheduleShutdown 延迟任务调度器已关闭错误
	ErrScheduleShutdown = errors.New("schedule: schedule is already in shutdown")
)

const invalidTaskId = 0

type TaskId uint32
type OriginalTaskId uint32

// Schedule 延迟调度的结构体,提供延迟调度任务的全部方法
// 通过NewSchedule方法创建Schedule,通过Schedule、ScheduleOne方法添加延迟调度任务,通过CancelTask方法取消任务,通过Shutdown停止延迟任务
type Schedule struct {
	//任务堆,按时间排序
	taskHeap taskHeap
	//可执行的任务Map,key是当前的任务id,value是任务的第一次原始id,用于优化取消任务时需要遍历堆去删除
	executeTaskIdMap map[TaskId]OriginalTaskId
	//任务id的Map,key是任务的第一次原始id,value是当前的任务id,用于优化取消任务时需要遍历堆去删除
	originalTaskIdMap map[OriginalTaskId]TaskId
	//调度器是否运行中
	running atomic.Bool
	//下一个任务id
	nextTaskId atomic.Uint32
	//任务运行池
	pool *ants.Pool
	//添加任务Chan
	addTaskChan chan *Task
	//删除任务Chan
	stopTaskChan chan struct{}
	//取消任务Chan
	cancelTaskChan chan OriginalTaskId
}

// NewSchedule 构建一个Schedule
// workerNum 工作的协程数量,options ants协程池的配置,除了WithMaxBlockingTasks不能配置,别的都可以,具体参考:https://github.com/panjf2000/ants
func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {
	//延迟任务的最大任务数量必须不限制
	options = append(options, ants.WithMaxBlockingTasks(0))
	//创建一个协程池
	pool, err := ants.NewPool(workerNum)
	if err != nil {
		return nil, err
	}
	//创建一个延迟调度结构体
	s := &Schedule{
		taskHeap:          make(taskHeap, 0),
		executeTaskIdMap:  make(map[TaskId]OriginalTaskId),
		originalTaskIdMap: make(map[OriginalTaskId]TaskId),
		running:           atomic.Bool{},
		nextTaskId:        atomic.Uint32{},
		pool:              pool,
		addTaskChan:       make(chan *Task),
		stopTaskChan:      make(chan struct{}),
		cancelTaskChan:    make(chan OriginalTaskId),
	}
	//启动调度 会开启一个协程去将即将要调度的任务添加到协程池中运行
	s.start()
	return s, nil
}

// ScheduleOne 添加延迟调度任务,只调度一次
// job 执行的方法 duration 周期间隔,如果是负数立马执行,如果是负数立马且只执行一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {
	return s.doSchedule(job, duration, true)
}

// Schedule 添加延迟调度任务,重复调度
// job 执行的方法 duration 周期间隔,如果是负数立马且只执行一次
func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {
	return s.doSchedule(job, duration, false)
}

// doSchedule 添加延迟调度任务的具体实现
func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {
	if s.running.Load() {
		//如果是负数 只执行一次
		if duration <= 0 {
			onlyOne = true
		}
		nextTaskId := s.getNextTaskId()
		task := new(Task)
		task.job = job
		task.executeTime = time.Now().Add(duration)
		task.onlyOne = onlyOne
		task.duration = duration
		task.originalId = OriginalTaskId(nextTaskId)
		task.id = TaskId(nextTaskId)
		s.addTaskChan <- task
		return uint32(task.originalId), nil
	} else {
		return invalidTaskId, ErrScheduleShutdown
	}
}

// CancelTask 取消延迟调度任务
// taskId 任务id
func (s *Schedule) CancelTask(taskId uint32) {
	if s.running.Load() {
		if taskId != invalidTaskId {
			s.cancelTaskChan <- OriginalTaskId(taskId)
		}
	}
}

// Shutdown 结束延迟任务调度
func (s *Schedule) Shutdown() {
	//通过cas设值
	if s.running.CompareAndSwap(true, false) {
		s.stopTaskChan <- struct{}{}
	}
}

// IsShutdown 延迟任务调度是否关闭
func (s *Schedule) IsShutdown() bool {
	return !s.running.Load()
}

// start 启动延迟任务调度
func (s *Schedule) start() {
	s.running.Store(true)
	go func() {
		for {
			now := time.Now()
			var timer *time.Timer
			//如果没有任务提交,睡眠等待任务
			if s.taskHeap.Len() == 0 {
				timer = time.NewTimer(math.MaxUint16 * time.Hour)
			} else {
				//查看第一个要执行的任务是否是被取消的
				task := s.taskHeap.Peek()
				_, ok := s.executeTaskIdMap[task.id]
				if !ok {
					//是被取消的任务,移除后continue
					heap.Pop(&s.taskHeap)
					continue
				} else {
					//设置执行间隔
					timer = time.NewTimer(task.executeTime.Sub(now))
				}
			}
			select {
			case <-timer.C:
				//到达第一个任务执行时间
				task := heap.Pop(&s.taskHeap).(*Task)
				//提交到线程池执行,返回的error不需要处理,因为任务池是无限大
				_ = s.pool.Submit(task.job)
				//单次执行则删除,多次执行,则更新
				if task.onlyOne {
					s.removeTask(task.originalId, task.id)
				} else {
					s.updateTask(task)
				}
			case originalTaskId := <-s.cancelTaskChan:
				timer.Stop()
				//如果取消的任务id在待执行任务列表中,则删除任务
				if taskId, ok := s.originalTaskIdMap[originalTaskId]; ok {
					s.removeTask(originalTaskId, taskId)
				}
			case task := <-s.addTaskChan:
				timer.Stop()
				//添加任务
				s.addTask(task)
			case <-s.stopTaskChan:
				timer.Stop()
				//关闭资源
				s.close()
				return
			}
		}
	}()
}

// updateTask 更新延迟调度任务
func (s *Schedule) updateTask(executedTask *Task) {
	//拷贝 并设置新的执行时间和ID
	task := *executedTask
	task.executeTime = time.Now().Add(task.duration)
	nextTaskId := s.getNextTaskId()
	task.id = TaskId(nextTaskId)
	//把已执行的任务删除
	s.removeTask(invalidTaskId, executedTask.id)
	//添加新的任务
	s.addTask(&task)
}

// removeTask 移除任务
func (s *Schedule) removeTask(originalTaskId OriginalTaskId, taskId TaskId) {
	//如果原始的任务ID不为空,则为使用者取消的,从任务Map中也删除
	if originalTaskId != invalidTaskId {
		delete(s.originalTaskIdMap, originalTaskId)
	}
	delete(s.executeTaskIdMap, taskId)
}

// addTask 添加任务
func (s *Schedule) addTask(task *Task) {
	s.originalTaskIdMap[task.originalId] = task.id
	s.executeTaskIdMap[task.id] = task.originalId
	heap.Push(&s.taskHeap, task)
}

// getNextTaskId 获取下一个任务id
func (s *Schedule) getNextTaskId() uint32 {
	taskId := s.nextTaskId.Add(1)
	if taskId == invalidTaskId {
		taskId = s.nextTaskId.Add(1)
	}
	return taskId
}

// close 关闭Schedule资源和协程池的资源
func (s *Schedule) close() {
	//关闭所有资源并设置为 nil help gc
	s.taskHeap = nil
	s.executeTaskIdMap = nil
	s.originalTaskIdMap = nil
	s.pool.Release()
	s.pool = nil
	close(s.addTaskChan)
	close(s.cancelTaskChan)
	close(s.stopTaskChan)
	s.addTaskChan = nil
	s.cancelTaskChan = nil
	s.stopTaskChan = nil
}

// Task 调度任务结构体,是一个调度任务的实体信息
type Task struct {
	// 原始id,用于Schedule本身的删除使用,用两层Map的方式优化数组删除的O(n)时间复杂度
	originalId OriginalTaskId
	// 任务id
	id TaskId
	// 执行的时间,每次执行完,如果重复调度就重新计算
	executeTime time.Time
	// 周期间隔
	duration time.Duration
	// 执行的任务
	job func()
	// 是否只执行一次
	onlyOne bool
}

// 任务的堆,使用队只需要在添加的时候进行排序,堆顶是最先要执行的任务
type taskHeap []*Task

// 下面都是堆接口的实现

func (t *taskHeap) Len() int {
	return len(*t)
}
func (t *taskHeap) Less(i, j int) bool {
	return (*t)[i].executeTime.Before((*t)[j].executeTime)
}

func (t *taskHeap) Swap(i, j int) {
	(*t)[i], (*t)[j] = (*t)[j], (*t)[i]
}

func (t *taskHeap) Push(x interface{}) {
	*t = append(*t, x.(*Task))
}

func (t *taskHeap) Pop() interface{} {
	old := *t
	n := len(old)
	x := old[n-1]
	old[n-1] = nil
	*t = old[:n-1]
	return x
}

// Peek 查看堆顶元素,非堆接口的实现
func (t *taskHeap) Peek() *Task {
	return (*t)[0]
}

代码加上详细的中文注解,大约300行。
github地址:
https://github.com/xzc-coder/go-schedule

另一个版本的实现,删除时间复杂度为:O(log n),相对上文中的实现,占用的内存会少,但是删除效率会变低。

package schedule

import (
	"container/heap"
	"errors"
	"github.com/panjf2000/ants/v2"
	"math"
	"sync/atomic"
	"time"
)

var (
	// ErrScheduleShutdown 延迟任务调度器已关闭错误
	ErrScheduleShutdown = errors.New("schedule: schedule is already in shutdown")
)

const invalidTaskId = 0

type TaskId uint32

// Schedule 延迟调度的结构体,提供延迟调度任务的全部方法
// 通过NewSchedule方法创建Schedule,通过Schedule、ScheduleOne方法添加延迟调度任务,通过CancelTask方法取消任务,通过Shutdown停止延迟任务
type Schedule struct {
	//任务堆,按时间排序
	taskHeap taskHeap
	taskMap  map[TaskId]*Task
	//调度器是否运行中
	running atomic.Bool
	//下一个任务id
	nextTaskId atomic.Uint32
	//任务运行池
	pool *ants.Pool
	//添加任务Chan
	addTaskChan chan *Task
	//删除任务Chan
	stopTaskChan chan struct{}
	//取消任务Chan
	cancelTaskChan chan TaskId
}

// NewSchedule 构建一个Schedule
// workerNum 工作的协程数量,options ants协程池的配置,除了WithMaxBlockingTasks不能配置,别的都可以,具体参考:https://github.com/panjf2000/ants
func NewSchedule(workerNum int, options ...ants.Option) (*Schedule, error) {
	//延迟任务的最大任务数量必须不限制
	options = append(options, ants.WithMaxBlockingTasks(0))
	//创建一个协程池
	pool, err := ants.NewPool(workerNum)
	if err != nil {
		return nil, err
	}
	//创建一个延迟调度结构体
	s := &Schedule{
		taskHeap:       make(taskHeap, 0),
		taskMap:        make(map[TaskId]*Task),
		running:        atomic.Bool{},
		nextTaskId:     atomic.Uint32{},
		pool:           pool,
		addTaskChan:    make(chan *Task),
		stopTaskChan:   make(chan struct{}),
		cancelTaskChan: make(chan TaskId),
	}
	//启动调度 会开启一个协程去将即将要调度的任务添加到协程池中运行
	s.start()
	return s, nil
}

// ScheduleOne 添加延迟调度任务,只调度一次
// job 执行的方法 duration 周期间隔,如果是负数立马执行,如果是负数立马且只执行一次
func (s *Schedule) ScheduleOne(job func(), duration time.Duration) (uint32, error) {
	return s.doSchedule(job, duration, true)
}

// Schedule 添加延迟调度任务,重复调度
// job 执行的方法 duration 周期间隔,如果是负数立马且只执行一次
func (s *Schedule) Schedule(job func(), duration time.Duration) (uint32, error) {
	return s.doSchedule(job, duration, false)
}

// doSchedule 添加延迟调度任务的具体实现
func (s *Schedule) doSchedule(job func(), duration time.Duration, onlyOne bool) (uint32, error) {
	if s.running.Load() {
		//如果是负数 只执行一次
		if duration <= 0 {
			onlyOne = true
		}
		nextTaskId := s.getNextTaskId()
		task := new(Task)
		task.job = job
		task.executeTime = time.Now().Add(duration)
		task.onlyOne = onlyOne
		task.duration = duration
		task.id = TaskId(nextTaskId)
		task.index = 0
		s.addTaskChan <- task
		return uint32(task.id), nil
	} else {
		return invalidTaskId, ErrScheduleShutdown
	}
}

// CancelTask 取消延迟调度任务
// taskId 任务id
func (s *Schedule) CancelTask(taskId uint32) {
	if s.running.Load() {
		if taskId != invalidTaskId {
			s.cancelTaskChan <- TaskId(taskId)
		}
	}
}

// Shutdown 结束延迟任务调度
func (s *Schedule) Shutdown() {
	//通过cas设值
	if s.running.CompareAndSwap(true, false) {
		s.stopTaskChan <- struct{}{}
	}
}

// IsShutdown 延迟任务调度是否关闭
func (s *Schedule) IsShutdown() bool {
	return !s.running.Load()
}

// start 启动延迟任务调度
func (s *Schedule) start() {
	s.running.Store(true)
	go func() {
		for {
			now := time.Now()
			var timer *time.Timer
			//如果没有任务提交,睡眠等待任务
			if s.taskHeap.Len() == 0 {
				timer = time.NewTimer(math.MaxUint16 * time.Hour)
			} else {
				task := s.taskHeap.Peek()
				//设置执行间隔
				timer = time.NewTimer(task.executeTime.Sub(now))
			}
			select {
			case <-timer.C:
				//到达第一个任务执行时间
				task := heap.Pop(&s.taskHeap).(*Task)
				//提交到线程池执行,返回的error不需要处理,因为任务池是无限大
				_ = s.pool.Submit(task.job)
				//单次执行则删除,多次执行,则更新
				if task.onlyOne {
					s.removeTask(false, task)
				} else {
					s.updateTask(task)
				}
			case taskId := <-s.cancelTaskChan:
				timer.Stop()
				//如果取消的任务id在待执行任务列表中,则删除任务
				if task, ok := s.taskMap[taskId]; ok {
					s.removeTask(true, task)
				}
			case task := <-s.addTaskChan:
				timer.Stop()
				//添加任务
				s.addTask(task)
			case <-s.stopTaskChan:
				timer.Stop()
				//关闭资源
				s.close()
				return
			}
		}
	}()
}

// updateTask 更新延迟调度任务
func (s *Schedule) updateTask(executedTask *Task) {
	//拷贝 并设置新的执行时间和ID
	task := *executedTask
	task.executeTime = time.Now().Add(task.duration)
	//把已执行的任务删除
	s.removeTask(false, executedTask)
	//添加新的任务
	s.addTask(&task)
}

// removeTask 移除任务
func (s *Schedule) removeTask(removeHeap bool, task *Task) {
	//从Map和堆中
	delete(s.taskMap, task.id)
	if removeHeap {
		heap.Remove(&s.taskHeap, task.index)
	}
}

// addTask 添加任务
func (s *Schedule) addTask(task *Task) {
	heap.Push(&s.taskHeap, task)
	s.taskMap[task.id] = task
}

// getNextTaskId 获取下一个任务id
func (s *Schedule) getNextTaskId() uint32 {
	taskId := s.nextTaskId.Add(1)
	if taskId == invalidTaskId {
		taskId = s.nextTaskId.Add(1)
	}
	return taskId
}

// close 关闭Schedule资源和协程池的资源
func (s *Schedule) close() {
	//关闭所有资源并设置为 nil help gc
	s.taskHeap = nil
	s.taskMap = nil
	s.pool.Release()
	s.pool = nil
	close(s.addTaskChan)
	close(s.cancelTaskChan)
	close(s.stopTaskChan)
	s.addTaskChan = nil
	s.cancelTaskChan = nil
	s.stopTaskChan = nil
}

// Task 调度任务结构体,是一个调度任务的实体信息
type Task struct {
	// 任务id
	id TaskId
	// 执行的时间,每次执行完,如果重复调度就重新计算
	executeTime time.Time
	// 周期间隔
	duration time.Duration
	// 执行的任务
	job func()
	// 是否只执行一次
	onlyOne bool
	//所在堆数组的下标位置
	index int
}

// 任务的堆,使用队只需要在添加的时候进行排序,堆顶是最先要执行的任务
type taskHeap []*Task

// 下面都是堆接口的实现

func (t *taskHeap) Len() int {
	return len(*t)
}
func (t *taskHeap) Less(i, j int) bool {
	return (*t)[i].executeTime.Before((*t)[j].executeTime)
}

func (t *taskHeap) Swap(i, j int) {
	(*t)[i], (*t)[j] = (*t)[j], (*t)[i]
	(*t)[i].index = i
	(*t)[j].index = j
}

func (t *taskHeap) Push(x interface{}) {
	*t = append(*t, x.(*Task))
}

func (t *taskHeap) Pop() interface{} {
	old := *t
	n := len(old)
	x := old[n-1]
	old[n-1] = nil
	*t = old[:n-1]
	return x
}

// Peek 查看堆顶元素,非堆接口的实现
func (t *taskHeap) Peek() *Task {
	return (*t)[0]
}


http://www.kler.cn/a/566076.html

相关文章:

  • js基础案例
  • 如何利用Java爬虫获取1688商品详情实战指南
  • Docker迁移/var/lib/docker之后镜像容器丢失问题
  • 初阶数据结构习题【3】(1时间和空间复杂度)——203移除链表元素
  • 渲染 101:Maya 项目的高效渲染利器
  • 使用Docker方式一键部署MySQL和Redis数据库详解
  • C语言数据类型及其使用 (带示例)
  • 数字样机:从技术革新到产业赋能的演进之路
  • 【AI+智造】基于阿里云Ubuntu24.04系统,使用Ollama部署开源DeepSeek模型并集成到企业微信
  • 华为云Deepseek API接口访问速率问题
  • Datawhale 数学建模导论二 笔记5 多模数据与智能模型
  • 服务器禁止操作汇总(Server Prohibits 0peration Summary)
  • 使用 Docker 管理 Alpine 镜像的完整指南
  • 【初阶数据结构】星河中的光影 “排” 象:排序(上)
  • Octave3D 关卡设计插件
  • GaussDB SQL 调优:从执行计划到AI驱动的进阶指南
  • 【搜广推算法的力量:如何用数据驱动用户体验与商业价值?】
  • 1-5gcc
  • 【Electron入门】进程环境和隔离
  • 水仙花数Ⅰ