当前位置: 首页 > article >正文

sentinel学习笔记7-熔断降级

     本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,写的好值得推荐,我整理的有所删减,推荐看原文。

https://blog.csdn.net/baidu_28523317/category_10400605.html

       限流需要我们根据不同的硬件条件做好压测,不好准确评估,限流的阈值都会配置的比压测结果略大,这时就需要结合熔断降级做兜底。在配置了限流规则的基础上,我们还可以为同一资源配置熔断降级规则。当接口的 QPS 未达限流阈值却已经有很多请求超时的情况下,就可能达到熔断降级规则的阈值从而触发熔断,这就能很好地保护服务自身。降级这是一个常用的,对于限流少用从业务上出发考虑的是加机器扩容,不是简单限制掉。本篇主要关注com.alibaba.csp.sentinel.slots.block.degrade包 下面。

具体分析先从断路器开始吧。

断路器

Sentinel中的熔断降级使用断路器实现,先看下断路器概念,来自百科

断路器有分简单与较进阶的版本,简单的断路器只需要知道服务是否可用。而较进阶的版本比起前者更有效率。进阶的断路器带有至少三个状态:

  • 关闭(Closed):断路器在预设的情形下是呈现关闭的状态,而断路器本身“带有”计数功能,每当错误发生一次,计数器也就会进行“累加”的动作,到了一定的错误发生次数断路器就会被“开启”,这个时候亦会在内部启用一个计时器,一旦时间到了就会切换成半开启的状态。
  • 开启(Open):在开启的状态下任何请求都会“直接”被拒绝并且抛出异常讯息。
  • 半开启(Half-Open):在此状态下断路器会允许部分的请求,如果这些请求都能成功通过,那么就意味着错误已经不存在,则会被“切换回”关闭状态并“重置”计数。倘若请求中有“任一”的错误发生,则会回复到“开启”状态,并且重新计时,给予系统一段休息时间。

public interface CircuitBreaker {

    /**
     * Get the associated circuit breaking rule.
     *  获取断路规则
     * @return associated circuit breaking rule
     */
    DegradeRule getRule();

    /**
     * Acquires permission of an invocation only if it is available at the time of invoking.
     *  根据上下文判断请求是否通过
     * @param context context of current invocation
     * @return {@code true} if permission was acquired and {@code false} otherwise
     */
    boolean tryPass(Context context);

    /**
     * Get current state of the circuit breaker.
     * 断路器当前状态
     * @return current state of the circuit breaker
     */
    State currentState();

    /**
     * <p>Record a completed request with the context and handle state transformation of the circuit breaker.</p>
     * <p>Called when a <strong>passed</strong> invocation finished.</p>
     * 请求完成处理
     * @param context context of current invocation
     */
    void onRequestComplete(Context context);

    /**
     * Circuit breaker state. 断路器状态枚举
     */
    enum State {
        /**
         * In {@code OPEN} state, all requests will be rejected until the next recovery time point.
         */
        OPEN,
        /**
         * In {@code HALF_OPEN} state, the circuit breaker will allow a "probe" invocation.
         * If the invocation is abnormal according to the strategy (e.g. it's slow), the circuit breaker
         * will re-transform to the {@code OPEN} state and wait for the next recovery time point;
         * otherwise the resource will be regarded as "recovered" and the circuit breaker
         * will cease cutting off requests and transform to {@code CLOSED} state.
         */
        HALF_OPEN,
        /**
         * In {@code CLOSED} state, all requests are permitted. When current metric value exceeds the threshold,
         * the circuit breaker will transform to {@code OPEN} state.
         */
        CLOSED
    }
}

断路器CircuitBreaker 定义接口, 抽象断路器AbstractCircuitBreaker 主要实现了方法:getRuletryPasscurrentState,慢调用使用ResponseTimeCircuitBreaker,异常数和异常比例使用ExceptionCircuitBreaker ,这两个主要实现了onRequestComplete。

 AbstractCircuitBreaker

虽然不同熔断降级策略的熔断器实现逻辑不同,但差异只是阈值的判断不同或需要统计的指标数据不同,而是否放行请求只需要根据当前熔断器的状态判断,因此,Sentinel为不同熔断降级策略的熔断器提供了一个统一的抽象类——AbstractCircuitBreaker。

除了实现接口中的方法,抽象断路器定义了断路器状态转换的方法,断路器状态无法直接从开启状态到关闭状态,因此有四个状态转换方法:

  • fromCloseToOpen 从关闭到开启
  • fromHalfOpenToOpen 从半开启到开启
  • fromHalfOpenToClose 从半开启到关闭
  • fromOpenToHalfOpen 从开启到半开启

先不贴代码了,回到主流程看看。

熔断降级

熔断策略

sentinel 提供以下几种熔断策略:

  • 慢调用比例 (SLOW_REQUEST_RATIO):选择以慢调用比例作为阈值,需要设置允许的慢调用 RT(即最大的响应时间),请求的响应时间大于该值则统计为慢调用。当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求响应时间小于设置的慢调用 RT 则结束熔断,若大于设置的慢调用 RT 则会再次被熔断。
  • 异常比例 (ERROR_RATIO):当单位统计时长(statIntervalMs)内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。
  • 异常数 (ERROR_COUNT):当单位统计时长内的异常数目超过阈值之后会自动进行熔断。经过熔断时长后熔断器会进入探测恢复状态(HALF-OPEN 状态),若接下来的一个请求成功完成(没有错误)则结束熔断,否则会再次被熔断。

   熔断规则DegradeRule

public class DegradeRule extends AbstractRule {

    public DegradeRule() {}

    public DegradeRule(String resourceName) {
        setResource(resourceName);
    }

    /**
     * Circuit breaking strategy (0: average RT, 1: exception ratio, 2: exception count).
     * 降级策略
     */
    private int grade = RuleConstant.DEGRADE_GRADE_RT;

    /**
     * Threshold count. The exact meaning depends on the field of grade.
     * <ul>
     *     <li>In average RT mode, it means the maximum response time(RT) in milliseconds.</li>
     *     <li>In exception ratio mode, it means exception ratio which between 0.0 and 1.0.</li>
     *     <li>In exception count mode, it means exception count</li>
     * <ul/> 限流阈值
     */
    private double count;

    /**
     * Recovery timeout (in seconds) when circuit breaker opens. After the timeout, the circuit breaker will
     * transform to half-open state for trying a few requests.
     * 重置熔断的窗口时间,默认值 0
     */
    private int timeWindow;

    /**
     * Minimum number of requests (in an active statistic time span) that can trigger circuit breaking.
     * 当 grade 配置为 DEGRADE_GRADE_EXCEPTION_RATIO 时,该值表示可触发熔断的最小请求数
     * @since 1.7.0
     */
    private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

    /**
     * The threshold of slow request ratio in RT mode.
     * RT模式下,该值表示可触发熔断的超过阈值的慢请求数量
     * @since 1.8.0
     */
    private double slowRatioThreshold = 1.0d;

    /**
     * The interval statistics duration in millisecond.
     *
     * @since 1.8.0
     */
    private int statIntervalMs = 1000;

断路器的构建

熔断规则配置由 DegradeRuleManager 加载,跟之前看的flowruleManager差不多

通过DegradeRuleManager.loadRules使降级规则生效时,会将DegradeRule转换为断路器CircuitBreaker。

    private static class RulePropertyListener implements PropertyListener<List<DegradeRule>> {

        private synchronized void reloadFrom(List<DegradeRule> list) {
            //构建断路器
            Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
            Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());

            for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
                assert e.getValue() != null && !e.getValue().isEmpty();

                Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
                for (CircuitBreaker cb : e.getValue()) {
                    rules.add(cb.getRule());
                }
                rm.put(e.getKey(), rules);
            }

            DegradeRuleManager.circuitBreakers = cbs;
            DegradeRuleManager.ruleMap = rm;
        }


        @Override
        public void configUpdate(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules has been updated to: {}", ruleMap);
        }

        @Override
        public void configLoad(List<DegradeRule> conf) {
            reloadFrom(conf);
            RecordLog.info("[DegradeRuleManager] Degrade rules loaded: {}", ruleMap);
        }
         private Map<String, List<CircuitBreaker>> buildCircuitBreakers(List<DegradeRule> list) {
            Map<String, List<CircuitBreaker>> cbMap = new HashMap<>(8);
            if (list == null || list.isEmpty()) {
                return cbMap;
            }
            for (DegradeRule rule : list) {
                if (!isValidRule(rule)) {
                    RecordLog.warn("[DegradeRuleManager] Ignoring invalid rule when loading new rules: {}", rule);
                    continue;
                }

                if (StringUtil.isBlank(rule.getLimitApp())) {
                    rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
                }//核心方法
                CircuitBreaker cb = getExistingSameCbOrNew(rule);
                if (cb == null) {
                    RecordLog.warn("[DegradeRuleManager] Unknown circuit breaking strategy, ignoring: {}", rule);
                    continue;
                }

                String resourceName = rule.getResource();

                List<CircuitBreaker> cbList = cbMap.get(resourceName);
                if (cbList == null) {
                    cbList = new ArrayList<>();
                    cbMap.put(resourceName, cbList);
                }
                cbList.add(cb);
            }
            return cbMap;
        }
    }

DegradeRuleManager.RulePropertyListener#buildCircuitBreakers 是构建断路器方法,

 private static CircuitBreaker getExistingSameCbOrNew(/*@Valid*/ DegradeRule rule) {
        List<CircuitBreaker> cbs = getCircuitBreakers(rule.getResource());
        if (cbs == null || cbs.isEmpty()) {//断路器为空直接创建
            return newCircuitBreakerFrom(rule);
        }
        for (CircuitBreaker cb : cbs) {
            if (rule.equals(cb.getRule())) {//返回已有断路器
                // Reuse the circuit breaker if the rule remains unchanged.
                return cb;
            }
        }//创建新的断路器
        return newCircuitBreakerFrom(rule);
    }
    private static CircuitBreaker newCircuitBreakerFrom(/*@Valid*/ DegradeRule rule) {
        switch (rule.getGrade()) {
            case RuleConstant.DEGRADE_GRADE_RT:// 慢调用比例
                return new ResponseTimeCircuitBreaker(rule);
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO: //异常比例
            case RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT:  //异常数
                return new ExceptionCircuitBreaker(rule);
            default:
                return null;
        }
    }

根据不同的降级策略创建不同的断路器。

熔断降级的流程

DegradeSlot 是实现熔断降级的切入点,它作为 ProcessorSlot 插入到 ProcessorSlotChain 链表中,在 entry 方法中调用 CircuitBreaker去判断是否熔断当前请求,如果熔断则抛出 Block 异常 

@Spi(order = Constants.ORDER_DEGRADE_SLOT)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        //在触发后续slot前执行熔断的检查  
        performChecking(context, resourceWrapper);
        //触发后续的slot
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        //根据资源名称获取断路器CircuitBreaker
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            return;
        }//遍历
        for (CircuitBreaker cb : circuitBreakers) {
            if (!cb.tryPass(context)) {//校验是否通过,不通过抛异常
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
public boolean tryPass(Context context) {
        // Template implementation. 允许通行
        if (currentState.get() == State.CLOSED) {
            return true;
        }//尝试通行
        if (currentState.get() == State.OPEN) {
            // For half-open state we allow a request for probing.
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }

前面列出断路器有三种状态,CLOSE:正常通行,HALF_OPEN:允许探测通行,OPEN:拒绝通行,这里判断逻辑:

  • 如果熔断器状态为关闭,则返回true,即允许请求通过。

  • 如果熔断器状态为开启,并且已经超过熔断时长以及开启状态成功转换为半开启(探测)状态,则返回true,即允许请求通过。

  • 如果熔断器状态为开启,并且还在熔断时长内,则返回false,禁止请求通过。

public abstract class AbstractCircuitBreaker implements CircuitBreaker {

    protected final DegradeRule rule;
    protected final int recoveryTimeoutMs;

    private final EventObserverRegistry observerRegistry;

    protected final AtomicReference<State> currentState = new AtomicReference<>(State.CLOSED);
    
      protected boolean fromOpenToHalfOpen(Context context) {
        //尝试将状态从OPEN设置为HALF_OPEN
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            // 状态变化通知
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            Entry entry = context.getCurEntry();
            // 在entry添加一个exitHandler entry.exit()时会调用
            entry.whenTerminate(new BiConsumer<Context, Entry>() {
                @Override
                public void accept(Context context, Entry entry) {
                    // Note: This works as a temporary workaround for https://github.com/alibaba/Sentinel/issues/1638
                    // Without the hook, the circuit breaker won't recover from half-open state in some circumstances
                    // when the request is actually blocked by upcoming rules (not only degrade rules).
                    if (entry.getBlockError() != null) {
                        // Fallback to OPEN due to detecting request is blocked
                        // 如果有发生异常,重新将状态设置为OPEN 请求不同通过
                        currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                        notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                    }
                }
            });// 此时状态已设置为HALF_OPEN正常通行
            return true;
        }
        return false;
    }

在调用Entry#exit()时,会触发插槽链条的退出调用。具体到熔断降级DegradeSlot#exit方法。

  public void exit(Context context, ResourceWrapper r, int count, Object... args) {
        Entry curEntry = context.getCurEntry();
        //如果当前其他solt已经有了BlockException直接调用fireExit 不用继续走熔断逻辑了
        if (curEntry.getBlockError() != null) {
            fireExit(context, r, count, args);
            return;
        }
        //通过资源名称获取所有的熔断CircuitBreaker
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers == null || circuitBreakers.isEmpty()) {
            fireExit(context, r, count, args);
            return;
        }

        if (curEntry.getBlockError() == null) {
            // passed request
            for (CircuitBreaker circuitBreaker : circuitBreakers) {
                circuitBreaker.onRequestComplete(context);
            }
        }

        fireExit(context, r, count, args);
    }

断路器状态转换

接上面的exit,通过circuitBreaker.onRequestComplete回调熔断器执行状态切换。

ExceptionCircuitBreaker

ExceptionCircuitBreaker负责异常数/异常比例的熔断,通过滑动窗口统计发生错误数及请求总数

ExceptionCircuitBreaker#onRequestComplete

    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }
        Throwable error = entry.getError();
        //异常时间窗口计数器
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {// 发生异常,异常数加1
            counter.getErrorCount().add(1);
        } //总数加1
        counter.getTotalCount().add(1);
        //异常状态处理
        handleStateChangeWhenThresholdExceeded(error);
    }

ExceptionCircuitBreaker#handleStateChangeWhenThresholdExceeded

private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        //如果熔断开启,发生错误继续熔断
        if (currentState.get() == State.OPEN) {
            return;
        }
        //断路器为半开启状态
        if (currentState.get() == State.HALF_OPEN) {
            //没有异常,熔断器由半开启转换为关闭,允许所有请求通过
            // 未发生异常 HALF_OPEN >>> CLOSE
            // In detecting request
            if (error == null) {
                fromHalfOpenToClose();
            } else {
                //请求还是发生异常,熔断器由半开起转为开启,熔断所有请求
                // 发生异常 HALF_OPEN >>> OPEN
                fromHalfOpenToOpen(1.0d);
            }
            return;
        }
        //下面为熔断器关闭状态
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            //计算异常请求数量以及请求总数
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        //最小请求数内不发生熔断
        if (totalCount < minRequestAmount) {
            return;
        }
        // 当前异常数
        double curCount = errCount;
        if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
            // Use errorRatio
            // 算出当前的异常比例
            curCount = errCount * 1.0d / totalCount;
        }
        // 判断当前异常数或异常比例是否达到设定的阀值
        if (curCount > threshold) {
            transformToOpen(curCount);
        }
    }
ResponseTimeCircuitBreaker

ResponseTimeCircuitBreaker负责慢调用的熔断,通过滑动窗口统计慢调用数量及总的请求数

ResponseTimeCircuitBreaker#onRequestComplete

 public void onRequestComplete(Context context) {
        //获取当前滑动窗口
        SlowRequestCounter counter = slidingCounter.currentWindow().value();
        Entry entry = context.getCurEntry();
        if (entry == null) {
            return;
        }//请求完成时间
        long completeTime = entry.getCompleteTimestamp();
        if (completeTime <= 0) {
            completeTime = TimeUtil.currentTimeMillis();
        }//计算响应时间rt
        long rt = completeTime - entry.getCreateTimestamp();
        if (rt > maxAllowedRt) {
            //rt时间超时,慢调用数加1
            counter.slowCount.add(1);
        }
        counter.totalCount.add(1);
        //状态转换处理
        handleStateChangeWhenThresholdExceeded(rt);
    }

ResponseTimeCircuitBreaker#handleStateChangeWhenThresholdExceeded

  private void handleStateChangeWhenThresholdExceeded(long rt) {
        //如果熔断开启,拦截所有请求
        if (currentState.get() == State.OPEN) {
            return;
        }
        //如果熔断半开启状态
        if (currentState.get() == State.HALF_OPEN) {
            // In detecting request
            // TODO: improve logic for half-open recovery
            if (rt > maxAllowedRt) {
                //请求RT大于设置的阈值,熔断状态由半开启转换为开启
                fromHalfOpenToOpen(1.0d);
            } else {
                //请求RT小于设置的阈值,熔断状态由半开启转换为关闭
                fromHalfOpenToClose();
            }
            return;
        }

        //下面熔断状态为关闭
        List<SlowRequestCounter> counters = slidingCounter.values();
        long slowCount = 0;
        long totalCount = 0;
        for (SlowRequestCounter counter : counters) {
            //统计慢调用数量和总调用数量
            slowCount += counter.slowCount.sum();
            totalCount += counter.totalCount.sum();
        }
        //总调用小于最小请求阈值,不做熔断
        if (totalCount < minRequestAmount) {
            return;
        }//计算慢请求比例
        double currentRatio = slowCount * 1.0d / totalCount;
        if (currentRatio > maxSlowRequestRatio) {
            //慢调用比例大于阈值,熔断状态由关闭转变为开启
            transformToOpen(currentRatio);
        }
        //慢调用比例等于阈值,慢调用比例等于慢速请求比率最大值
        //熔断状态由关闭转变为开启
        if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
                Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
            transformToOpen(currentRatio);
        }
    }

先到这里。


http://www.kler.cn/a/453611.html

相关文章:

  • Python机器学习笔记(十三、k均值聚类)
  • 【java面向对象编程】第九弹----抽象类、接口、内部类
  • Docker 安装mysql ,redis,nacos
  • 5.近实时数仓数据更新和ID 管理上的优化方案
  • No.1免费开源ERP:Odoo自定义字段添加到配置页中的技术分享
  • R语言的下载、安装及环境配置(RstudioVSCode)
  • 基于 Python 考研历年国家分数线大数据分析设计与实现
  • 利用Python实现排序算法与Web交互的实验项目
  • 【ARM】PK51关于内存模式的解析与区别
  • Python 端口访问邮件提醒工具
  • AndroidKMP跨平台开发基础1-编译发布
  • AWS、Google Cloud Platform (GCP)、Microsoft Azure、Linode和 桔子数据 的 价格对比
  • 解决 Node.js 单线程限制的有效方法
  • ssh免密登录服务器
  • C# Winfrom chart图 实例练习
  • 「Mysql优化大师一」mysql服务性能剖析工具
  • 大模型推理性能优化之KV Cache解读
  • Qt使用QZipWriter和QZipReader来解压、压缩文件
  • MySql B树 B+树
  • 8.zynq编译应用程序
  • 【windows】组合的 Windows 系统调用表
  • 视频会议是如何实现屏幕标注功能的?
  • 美畅物联丨如何在视频汇聚平台上添加RTMP主动推流设备?
  • 三维场景重建与3D高斯点渲染技术探讨
  • Spring Boot项目开发常见问题及解决方案(上)
  • 阿里推出QVQ 视觉推理模型,解锁视觉智能新维度