【限流器】golang令牌桶限流源码分析
1.令牌桶限流算法
算法思想:系统以一定速率生成令牌,存放于桶中,在达到容量的最大值后停止生成令牌。用户生成请求后从令牌桶中消费令牌才能执行。否则延迟执行或被限制。
使用场景:平滑流量控制;在一定程度上可以处理突发流量,但维护了一个延时队列存放token,同时需要一个定时器定期生成token在性能上有损耗。
限流流程如下:
2.time/rate中的limiter核心函数
a.NewLimiter(v,cap)
初始化系统生成令牌的速度,令牌桶的容量。
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
burst表示最大并发量
limit表示每秒能够生产token的数量
b.WaitN(ctx,n)
func (lim *Limite) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
if n > burst && limit != Inf {
// 大于最大并发量,等待时间不大
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// 检查ctx是否已经结束
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 计算ctx结束前,可以等待的时间
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(t)
}
// 预留令牌
r := lim.reserveN(t, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// 看是否需要等待生产令牌
delay := r.DelayFrom(t)
if delay == 0 {
return nil
}
ch, stop, advance := newTimer(delay)
//等待生产令牌,期间如果ctx结束会产生预留错误
defer stop()
advance()
select {
case <-ch:
return nil
case <-ctx.Done():
r.Cancel()
return ctx.Err()
}
}
c.reserveN(t, n, waitLimit)
func (lim *Limite) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
defer lim.mu.Unlock()
//等待无限
if lim.limit == Inf {
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: t,
}
} else if lim.limit == 0 {
var ok bool
//不等待
if lim.burst >= n {
//桶里容量>
ok = true
lim.burst -= n
}
return Reservation{
ok: ok,
lim: lim,
tokens: lim.burst,
timeToAct: t,
}
}
//懒加载,两次请求之间生成的token数量
t, tokens := lim.advance(t)
//token不够用,还需要再生成
tokens -= float64(n)
var waitDuration time.Duration
if tokens < 0 {
//生成足够token等待时间
waitDuration = lim.limit.durationFromTokens(-tokens)
}
n的数量<最大并发量 且 等待时间可接受
ok := n <= lim.burst && waitDuration <= maxFutureReserve
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = t.Add(waitDuration)
// 执行等待的限流器,更新数据
lim.last = t
lim.tokens = tokens
lim.lastEvent = r.timeToAct
}
return r
}
d、advance(t time.Time)
懒加载两次请求中间生成的token数量
func (lim *Limite) advance(t time.Time) (newT time.Time, newTokens float64) {
last := lim.last
//上次请求时间
if t.Before(last) {
last = t
}
elapsed := t.Sub(last)
//懒加载两次请求中间生成的token数量
//token_n = 所有token数量/cap
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
//返回桶里的token
return t, tokens
}
e.ctx结束,请求失败使用cancel归还token
预存器:
type Reservation struct {
ok bool
lim *Limite
tokens int
timeToAct time.Time
limit Limit
}
预存器记录了当前的预存token,上一次取token操作时间,预取结果
func (r *Reservation) CancelAt(t time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
return
}
//获得过的token
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
//获得桶里的token
t, tokens := r.lim.advance(t)
// calculate new number of tokens
tokens += restoreTokens
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = t
r.lim.tokens = tokens
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(t) {
r.lim.lastEvent = prevEvent
}
}
}
3.使用限流器生成中间件
func Limiter(limit, cap int) MiddlewareFunc {
li := rate.NewLimiter(rate.Limit(limit), cap)
return func(next HandlerFunc) HandlerFunc {
return func(ctx *Context) {
//实现限流
con, cancel := context.WithTimeout(context.Background(), time.Duration(1)*time.Second)
defer cancel()
err := li.WaitN(con, 1)
if err != nil {
//没有拿到令牌的操作
li.Reserve().Cancel()
ctx.String(http.StatusForbidden, "限流了")
return
}
next(ctx)
}
}
}