kratos源码分析:熔断器
文章目录
- 为什么需要熔断
- Google sre弹性熔断算法
- kratos Breaker源码分析
- 公共接口
- sre实现
- 上报请求结果
- 判定是否熔断
为什么需要熔断
一般来说,当服务器过载(overload)时,需要给client返回服务过载的报错
但是拒接请求也有成本,可能响应错误码本身没啥成本,但处理请求协议栈,构建响应header等也有一笔开销
如果被拒绝的请求数量很大,后端任然会过载,因为其绝大多数CPU都花在拒绝请求上
因此最好的办法是客户端不要将请求发到服务端:当客户端检测到其最近的请求中有很大一部分因“服务过载”错误而被拒绝时,直接在本地失败,不会经过网络IO发给服务端
熔断也可以称为客户端限流
Google sre弹性熔断算法
google sre提供了一种自适应的客户端熔断算法,其维护了过去一段时间内的两个信息:
- requests:往下游发起请求的总数
- accepts:成功的请求数
- 正常情况下,这两个值是相等的
- 但当下游出现异常时,
accepts
会逐渐小于requests
- 一旦requests达到了accepts的
K
倍,客户端就要启动自适应限流,新产生的请求以一定概率被拒绝- 拒绝请求的概率计算公式为: m a x ( 0 , r e q u e s t s − K ∗ a c c e p t s r e q u e s t s + 1 ) max(0, \frac{requests - K * accepts}{requests + 1}) max(0,requests+1requests−K∗accepts)
- 当下游逐渐恢复时,accetps会增加,使得上述公式中分子变为负数,拒绝的概率降为0
可以调整K值,使算法产生不同的效果:
- 减少K值会使得行为更激进,也就是更容易发生熔断
- 增大K值会使得自适应熔断不那么激进
kratos Breaker源码分析
接下来分析kratos中熔断器,其采用了google sre的自适应客户端限流算法
公共接口
熔断器对外暴露3个方法:
Allow
:每次调下游之前判断熔断器状态,根据返回结果决定是否往下游发送请求MarkSuccess
:每次调下游如果成功,上报SuccMarkFailed
:每次调下游如果失败,上报Failed
type CircuitBreaker interface {
Allow() error
MarkSuccess()
MarkFailed()
}
业务中用起来大概是这样:
// 初始化breaker
b := sre.NewBreaker()
// 请求下游前判断熔断器状态
if err = breaker.Allow(); err != nil {
return
}
// 请求下游
err := fn()
// 执行成功或失败将结果告知 breaker
if(err != nil){
breaker.MarkFailed()
}else{
breaker.MarkSuccess()
}
sre实现
- stat:维护请求总数和成功数的滑动窗口
- k:熔断算法的K值
- request:开始熔断的请求数阈值,滑动窗口中请求数量达到
request
才开始熔断 - state:熔断器状态,该字段实际没啥用
type Breaker struct {
// 滑动窗口
stat window.RollingCounter
// 随机数产生器,同于根据概率熔断请求
r *rand.Rand
randLock sync.Mutex
// 熔断算法的K值
k float64
// 开始熔断的请求数阈值
request int64
state int32
}
上报请求结果
func (b *Breaker) MarkSuccess() {
b.stat.Add(1)
}
func (b *Breaker) MarkFailed() {
b.stat.Add(0)
}
- MarkSuccess:内部会将总数+1,成功数+1
- MarkFailed:内部只会将总数+1
本文的重点不是滑动窗口,这里知道其干了啥就好
判定是否熔断
- 调
summary()
拿到滑动窗口中的请求总数和成功数 - 如果没达到熔断条件,返回err=nil。两个判定条件:
- 条件一:滑动窗口中请求总数没达到阈值(
total < b.request
) - 条件二:近期失败的数量不够多(
k * accepts > total
)
- 条件一:滑动窗口中请求总数没达到阈值(
- 否则就需要熔断,根据公式计算熔断概率
dr
- 判定是否命中概率:生成一个0~1之间的随机数,如果小于
dr
说明命中
func (b *Breaker) Allow() error {
// 拿到滑动窗口中的请求总数和成功数
accepts, total := b.summary()
requests := b.k * float64(accepts)
// 没达到熔断条件
if total < b.request || float64(total) < requests {
atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed)
return nil
}
// 下面就是需要熔断
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
// 计算熔断概率
dr := math.Max(0, (float64(total)-requests)/float64(total+1))
drop := b.trueOnProba(dr)
// 需要熔断
if drop {
return circuitbreaker.ErrNotAllowed
}
return nil
}
从滑动窗口中获得请求总数total和成功请求数success:
func (b *Breaker) summary() (success int64, total int64) {
b.stat.Reduce(func(iterator window.Iterator) float64 {
for iterator.Next() {
bucket := iterator.Bucket()
// 统计总数
total += bucket.Count
for _, p := range bucket.Points {
// 统计成功的数量
success += int64(p)
}
}
return 0
})
return
}
trueOnProba就是生成一个0~1之间的随机数,看是否小于概率proba
func (b *Breaker) trueOnProba(proba float64) (truth bool) {
b.randLock.Lock()
truth = b.r.Float64() < proba
b.randLock.Unlock()
return
}