限流算法,基于go的gRPC 实现的
目录
一、单机限流
1、令牌桶算法
3、固定窗口限流算法
4、滑动窗口
二、集群限流
1、分布式固定窗口 (基于redis)
2、分布式滑动窗口
一、单机限流
1、令牌桶算法
令牌桶算法是当流量进入系统前需要获取令牌,没有令牌那么就要进行限流
这个算法是怎么实现的呢
-
定义一个后台协程按照一定的频率去产生token
-
后台协程产生的token 放到固定大小容器里面
-
有流量进入系统尝试拿到token,没有token 就需要限流了
type TokenBucketLimiter struct {
token chan struct{}
stop chan struct{}
}
func NewTokenBucket(capactity int, timeInternal time.Duration) *TokenBucketLimiter {
te := make(chan struct{}, capactity)
stop := make(chan struct{})
ticker := time.NewTicker(timeInternal)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
select {
case te <- struct{}{}:
default:
}
case <-stop:
return
}
}
}()
return &TokenBucketLimiter{
token: te,
stop: stop,
}
}
func (t *TokenBucketLimiter) BuildServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
select {
case <-ctx.Done():
err = ctx.Err()
return
case <-t.token:
return handler(ctx, req)
case <-t.stop:
err = errors.New("缺乏保护")
return
}
}
}
func (t *TokenBucketLimiter) Stop() {
close(t.stop)
}
3、固定窗口限流算法
什么是固定窗口限流算法
固定窗口限流算法(Fixed Window Rate Limiting Algorithm)是一种最简单的限流算法,其原理是在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。
优点:实现简单
缺点:对于瞬时流量没发处理,也就是临界问题,比如下图在20t前后,在16t以及26t有大量流量进来,在这10t中,已经超过了流量限制,没法限流
实现如下
type fixWindow1 struct {
lastVistTime int64
vistCount int64
interval int64
maxCount int64
}
func NewfixWindow1(macCount int64) *fixWindow1 {
t := &fixWindow1{
maxCount: macCount,
}
return t
}
func (f *fixWindow1) FixWindow1() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
current := time.Now().UnixNano()
lasttime := atomic.LoadInt64(&f.lastVistTime)
if lasttime+f.interval > current {
if atomic.CompareAndSwapInt64(&f.lastVistTime, lasttime, current) {
atomic.StoreInt64(&f.lastVistTime, current)
atomic.StoreInt64(&f.maxCount, 0)
}
}
count := atomic.AddInt64(&f.vistCount, 1)
if count > f.maxCount {
return gen.GetByIDResp{}, errors.New("触发限流")
}
return handler(ctx, req)
}
}
4、滑动窗口
什么是滑动窗口算法:
滑动窗口限流算法是一种常用的限流算法,用于控制系统对外提供服务的速率,防止系统被过多的请求压垮。它将单位时间周期分为n
个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题。
type slideWindow struct {
timeWindow *list.List
interval int64
maxCnt int
lock sync.Mutex
}
func NewSlideWindow(interval time.Duration, maxCnt int) *slideWindow {
t := &slideWindow{
timeWindow: list.New(),
interval: interval.Nanoseconds(),
maxCnt: maxCnt,
}
return t
}
func (s *slideWindow) SlideWinowlimit() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
s.lock.Lock()
now := time.Now().UnixNano()
// 快路径
if s.timeWindow.Len() < s.maxCnt {
resp, err = handler(ctx, req)
s.timeWindow.PushBack(now)
s.lock.Unlock()
return
}
front := s.timeWindow.Front()
for front != nil && front.Value.(int64)+s.interval < now {
s.timeWindow.Remove(front)
front = s.timeWindow.Front()
}
if s.timeWindow.Len() >= s.maxCnt {
s.lock.Unlock()
return &gen.GetByIdReq{}, errors.New("触发限流")
}
s.lock.Unlock()
resp, err = handler(ctx, req)
s.timeWindow.PushBack(now)
return
}
}
二、集群限流
下面是分布式限流,为啥是分布式限流,单机限流只能对单台服务器进行限流,没发对集权进行限流,需要用分布式限流来进行集权限流
1、分布式固定窗口 (基于redis)
type redisFix struct {
serName string
interVal int
limitCnt int
redis redis.Cmdable
}
//go:embed lua/fixwindow.lua
var lua_redis_fix string
func NewRedisFix(serName string, interval int, limitCnt int, redis redis.Cmdable) *redisFix {
t := &redisFix{
serName: serName,
interVal: interval,
limitCnt: limitCnt,
redis: redis,
}
return t
}
func (r *redisFix) RedisFix() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
res, err := r.limit(ctx)
if err != nil {
return &gen.GetByIDResp{}, err
}
if res {
return &gen.GetByIdReq{}, errors.New("触发限流")
}
return handler(ctx, req)
}
}
func (r *redisFix) limit(ctx context.Context) (res bool, err error) {
keys := []string{r.serName}
res, err = r.redis.Eval(ctx, lua_redis_fix, keys, r.interVal, r.limitCnt).Bool()
return
}
lua
local key = KEYS[1]
local limitCnt = tonumber(ARGV[2])
local val = redis.call('get',key)
if val==false then
if limitCnt<1 then
return "true"
else
redis.call('set',key,1,'PX',ARGV[1])
return "false"
end
elseif tonumber(val)<limitCnt then
redis.call('incr',key)
return "false"
else
return "true"
end
2、分布式滑动窗口
//go:embed lua/slidewindow.lua
var slideWindLua string
type redisSlib struct {
serverName string
interVal time.Duration
maxCnt int64
redis redis.Cmdable
}
func NewRedisSlib(interval time.Duration, maxCnt int64, serverName string, clientCmd redis.Cmdable) *redisSlib {
t := &redisSlib{
serverName: serverName,
interVal: interval,
maxCnt: maxCnt,
redis: clientCmd,
}
return t
}
func (r *redisSlib) RedisSlibLimt() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
limt, err := r.limt(ctx)
if err != nil {
return nil, err
}
if limt {
return nil, errors.New("限流")
}
return handler(ctx, req)
}
}
func (r *redisSlib) limt(ctx context.Context) (bool, error) {
now := time.Now().UnixMilli()
return r.redis.Eval(ctx, slideWindLua, []string{r.serverName}, r.interVal.Milliseconds(), r.maxCnt, now).Bool()
}
lua
local key = KEYS[1]
local window = tonumber(ARGV[1])
local maxCnt = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
--- 窗口的最小边界
local min = now-window
redis.call('ZREMRANGEBYSCORE',key,'-inf',min)
local cnt = redis.call('ZCOUNT',key,'-inf','+inf')
if cnt>=maxCnt then
return "true"
else
redis.call('ZADD',key,now,now)
redis.call('PEXPIRE',key,window)
return "false"
end