当前位置: 首页 > article >正文

【限流器】golang令牌桶限流源码分析

1.令牌桶限流算法

算法思想:系统以一定速率生成令牌,存放于桶中,在达到容量的最大值后停止生成令牌。用户生成请求后从令牌桶中消费令牌才能执行。否则延迟执行或被限制。

使用场景:平滑流量控制;在一定程度上可以处理突发流量,但维护了一个延时队列存放token,同时需要一个定时器定期生成token在性能上有损耗。
限流流程如下:
在这里插入图片描述

2.time/rate中的limiter核心函数

a.NewLimiter(v,cap)

初始化系统生成令牌的速度,令牌桶的容量。

func NewLimiter(r Limit, b int) *Limiter {
	return &Limiter{
		limit: r,
		burst: b,
	}
}

burst表示最大并发量
limit表示每秒能够生产token的数量

b.WaitN(ctx,n)

在这里插入图片描述

func (lim *Limite) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
   lim.mu.Lock()
   burst := lim.burst
   limit := lim.limit
   lim.mu.Unlock()

   if n > burst && limit != Inf {
//   大于最大并发量,等待时间不大
   	return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
   }
   // 检查ctx是否已经结束
   select {
   case <-ctx.Done():
   	return ctx.Err()
   default:
   }
   // 计算ctx结束前,可以等待的时间
   waitLimit := InfDuration
   if deadline, ok := ctx.Deadline(); ok {
   	waitLimit = deadline.Sub(t)
   }
   // 预留令牌
   r := lim.reserveN(t, n, waitLimit)
   if !r.ok {
   	return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
   }
   // 看是否需要等待生产令牌
   delay := r.DelayFrom(t)
   if delay == 0 {
   	return nil
   }
   ch, stop, advance := newTimer(delay)
   //等待生产令牌,期间如果ctx结束会产生预留错误
   defer stop()
   advance()
   select {
   case <-ch:
   	return nil
   case <-ctx.Done():
   	r.Cancel()
   	return ctx.Err()
   }
}

c.reserveN(t, n, waitLimit)

在这里插入图片描述

func (lim *Limite) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()
	defer lim.mu.Unlock()
	//等待无限
	if lim.limit == Inf {
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: t,
		}
	} else if lim.limit == 0 {
		var ok bool
		//不等待
		if lim.burst >= n {
			//桶里容量>
			ok = true
			lim.burst -= n
		}
		return Reservation{
			ok:        ok,
			lim:       lim,
			tokens:    lim.burst,
			timeToAct: t,
		}
	}
//懒加载,两次请求之间生成的token数量
	t, tokens := lim.advance(t)
//token不够用,还需要再生成
	tokens -= float64(n)

	var waitDuration time.Duration
	if tokens < 0 {
		//生成足够token等待时间
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}
n的数量<最大并发量 且 等待时间可接受
	ok := n <= lim.burst && waitDuration <= maxFutureReserve
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = t.Add(waitDuration)

		// 执行等待的限流器,更新数据
		lim.last = t
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	}

	return r
}

d、advance(t time.Time)

懒加载两次请求中间生成的token数量

func (lim *Limite) advance(t time.Time) (newT time.Time, newTokens float64) {
	last := lim.last
	//上次请求时间
	if t.Before(last) {
		last = t
	}

	elapsed := t.Sub(last)
	//懒加载两次请求中间生成的token数量
	//token_n = 所有token数量/cap
	delta := lim.limit.tokensFromDuration(elapsed)
	tokens := lim.tokens + delta
	if burst := float64(lim.burst); tokens > burst {
		tokens = burst
	}
	//返回桶里的token
	return t, tokens
}

e.ctx结束,请求失败使用cancel归还token

预存器:

type Reservation struct {
	ok        bool
	lim       *Limite
	tokens    int
	timeToAct time.Time
	limit     Limit
}

预存器记录了当前的预存token,上一次取token操作时间,预取结果

func (r *Reservation) CancelAt(t time.Time) {
	if !r.ok {
		return
	}

	r.lim.mu.Lock()
	defer r.lim.mu.Unlock()

	if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
		return
	}
	//获得过的token
	restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
	if restoreTokens <= 0 {
		return
	}
	//获得桶里的token
	t, tokens := r.lim.advance(t)
	// calculate new number of tokens
	tokens += restoreTokens
	if burst := float64(r.lim.burst); tokens > burst {
		tokens = burst
	}
	// update state
	r.lim.last = t
	r.lim.tokens = tokens
	if r.timeToAct == r.lim.lastEvent {
		prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
		if !prevEvent.Before(t) {
			r.lim.lastEvent = prevEvent
		}
	}
}

3.使用限流器生成中间件

func Limiter(limit, cap int) MiddlewareFunc {
	li := rate.NewLimiter(rate.Limit(limit), cap)
	return func(next HandlerFunc) HandlerFunc {
		return func(ctx *Context) {
			//实现限流
			con, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
			defer cancel()
			err := li.WaitN(con, 1)
			if err != nil {
				//没有拿到令牌的操作
				li.Reserve().Cancel()
				ctx.String(http.StatusForbidden, "限流了")
				return
			}

			next(ctx)
		}
	}
}


http://www.kler.cn/a/292931.html

相关文章:

  • 高防服务器的费用受到哪些原因影响?
  • HTTP常见的状态码有哪些,都代表什么意思
  • C语言入门到精通(第六版)——第十六章
  • 专题十八_动态规划_斐波那契数列模型_路径问题_算法专题详细总结
  • Mysql数据库里的SSH连接
  • 大数据新视界 -- 大数据大厂之 Impala 性能飞跃:动态分区调整的策略与方法(上)(21 / 30)
  • 【Linux】常见指令及权限相关知识详细梳理
  • 找不同-第15届蓝桥省赛Scratch初级组真题第4题
  • ffmpeg视频转切片m3u8并加密videojs播放hls.js播放dplayer播放(弹幕效果)
  • Flutter中自定义气泡框效果的实现
  • SQL进阶技巧:如何利用SQL解决趣味赛马问题?| 非等值关联匹配问题
  • 第十九章 rust服务器开发:axum框架详解
  • Self-study Python Fish-C Note20 P64to65
  • 开源 AI 智能名片 O2O 商城小程序在营销中的应用
  • 在qt中,用户输入了16进制的字符串,如何按照用户的16进制格式发送
  • C语言第一周课
  • TypeScript(TS) 实现消息通知(发布订阅)
  • 视频监控系统布局策略:EasyCVR视频汇聚平台构建高效、全面的安全防线
  • 微服务配置管理
  • nodejs发邮件如何实现自动化邮件发送功能?
  • jenkins web界面构建job时平台展现的时间是6点,可是当前北京是14点,如何调整这个时间,如何调整 Jenkins 的时间显示
  • ant mobile design组件库的PickerView组件不能滑动
  • 思科IP访问控制列表3
  • SpringBoot配置返回数据不存在null
  • JUnit 5和Mockito进行单元测试!
  • Redis配置