当前位置: 首页 > article >正文

剖析sentinel的限流和熔断

sentinel的限流和熔断

  • 前言
  • 源码分析
    • 滑动窗口源码
    • 限流源码
    • 熔断源码
  • 完结撒花,sentinel源码还是挺简单的,如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

前言

平时发起一个请求,在系统内部微服务之间调用就会形成一条调用链路,那对于每个服务要起到保护的作用,如何保护呢?本文就来做一个介绍,笔者平时是使用sentinel居多,是通过其提供的限流和熔断的措施来保护的,于是文本来剖析sentinel的限流和熔断;
先介绍雪崩问题:
例如a1服务——》b服务——》c服务;
a2服务——》b服务——》c服务;
a3服务——》b服务;
d服务——》a3服务;
在a1服务调用b时,往下调用c服务,因服务c故障,会占用着b服务连接至超时时间,此时又会有服务a2调b服务,再调c服务,会因服务c故障,继续堆积连接在b服务上至连接超时,堆积到服务b性能瓶颈也会导致服务b故障,此时a3服务调用b服务也会继续导致a3服务堆积连接,依次导致所有服务故障,这就是雪崩
市面上解决雪崩的方案:
在这里插入图片描述
流量控制是限流的,用于保护下游服务,然后其余三种是下游发生了故障的前提下,保护上游服务的方案,其中舱壁模式,也是线程隔离,下游发生故障时,是为调上游的多个服务,建立属于它们的线程池,仍然会调下游,这样就会占用着线程;而熔断降级则不会,下游有问题,例如上游根据调用下游的失败比例、失败数来决定触发熔断;
所以说流量控制避免雪崩,有了流量控制就不会发生雪崩,而其余三种则解决雪崩问题;

再介绍限流与熔断的概念:
限流是保护当前服务,提前对服务做了qps限流保护;
熔断则是某个服务因没有限流发生了故障,然后上游调用该服务时,不会再调用了,而是在上游定义了关于该服务的降级逻辑,起到保护上游服务的作用,避免因为该服务的故障,导致雪崩问题;

那限流、熔断、雪崩之间的关系?
更像是因为没有限流容易导致服务提供方问题,然后服务消费方用熔断来保护自己,从而避免雪崩;

源码分析

在 Sentinel 里面,所有的资源都对应一个资源名称以及一个 Entry。Entry 可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用 API 显式创建;每一个 Entry 创建的时候,同时也会创建一系列功能插槽(slot chain);
例如服务a——》服务b——》服务c,每个服务就是一个保护资源,默认是一个controller一个保护资源,在一个服务里其实可以有多个保护资源,用SentinelResource注解标识controller往后调的一个个service层的方法即可,sentinel对于保护资源的调用链路是如下图:
在这里插入图片描述
调用链路就是这8个slot,简单来说就是统计访问的qps,判断是否达到触发限流(FlowSlot)或熔断(DegradeSlot)阈值,每个slot抛出blockExeception,就代表访问停止了,就不会触发往后的slot调用,它是使用责任链模式调用的。
在这里插入图片描述
SentinelResource的切面,主要是调SphU.entry方法

//该方法是查找slot链,上面说的slot责任链就是指这个链
com.alibaba.csp.sentinel.CtSph#entryWithPriority
ProcessorSlot<Object> chain = this.lookProcessChain(resourceWrapper);

//lookProcessChain方法:第一次访问资源时,使用copyOnWrite的方式添加到map中,后面就从map获取即可
ProcessorSlotChain chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized(LOCK) {
                chain = (ProcessorSlotChain)chainMap.get(resourceWrapper);
                if (chain == null) {
                    if (chainMap.size() >= 6000) {
                        return null;
                    }

                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap(chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }

//创建的slotChain是通过spi的方式从METAINFO目录获取所有的slot
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        Iterator var3 = sortedSlotList.iterator();

        while(var3.hasNext()) {
            ProcessorSlot slot = (ProcessorSlot)var3.next();
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain", new Object[0]);
            } else {
            	//获取的每个slot形成一个AbstractLinkedProcessorSlot链表,因为每个slot继承了该类
                chain.addLast((AbstractLinkedProcessorSlot)slot);
            }
        }

        return chain;
    }
//创建完slotChain后,就开始执行每个slot
chain.entry(context, resourceWrapper, (Object)null, count, prioritized, args);

public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
    	//这里是第一个slot,从这里开始调用
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };
}


//调用下一个slot是通过当前slot的next属性(就是下一个slot元素)
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
        if (this.next != null) {
            this.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }

    }

这里详细介绍了slot调用链路的过程,本文主要是关注限流和熔断,所以只需看FlowSlot和DegradeSlot即可;

滑动窗口源码

是用StatisticSlot统计访问资源的线程数、qps。
滑动窗口的原理:时间窗口设定qps阈值,先看看固定时间窗的缺点,一个时间窗口内是不会超出阈值,但一个时间窗的后半个时间窗和下一个时间窗的前半个时间窗加起来会超过阈值。于是将一个时间窗划分为多个时间样本,例如1s的时间窗划分为5个样本时间窗(每个200ms),即使统计到一个时间窗的后半个时间窗和后一个时间窗的前半个时间窗时,会统计每个样本时间窗来是否达到阈值,从而解决该问题;

注意:样本时间窗划分得越小,统计得越准确;

com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
			//访问链表所有的slot
            this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
            //统计线程数
            node.increaseThreadNum();
            //统计qps
            node.addPassRequest(count);

这里有觉得奇怪吗?先是访问所有的slot是否通过后(任何一个slot都不抛出BlockException),再统计资源的线程数和qps,其实是类似于elastic search写文档时,先写内存,然后再写transaction log文件,这样是为了写内存成功后再写入transaction log文件,不至于写入文件后,再回滚内存。同样是为了校验通过后,再统计,不至于统计完才校验失败。

基础知识:

在这里插入图片描述
这里是时间窗口类

在这里插入图片描述
时间窗口类包含的属性,以及包含的时间样本窗口类WindowWrap

public void addPass(int count) {
	//获取当前时间点所在的样本窗口
    WindowWrap<MetricBucket> wrap = this.data.currentWindow();
    //将当前请求的计数量添加到当前样本窗口的统计数据中
    ((MetricBucket)wrap.value()).addPass(count);
}

就不往里跟了,主要是根据当前时间点统计是在当前时间窗的哪个样本时间窗里,然后往里添加

限流源码

在这里插入图片描述
对应的就是这些规则
是在FlowSlot中,根据staticFlow统计的qps、线程数,来判断是否达到阈值,从而触发限流

com.alibaba.csp.sentinel.slots.block.flow.FlowSlot#entry
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
	//检测并应用流控规则
    this.checkFlow(resourceWrapper, context, node, count, prioritized);
    //触发下一个slot
    this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
}


    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider != null && resource != null) {
        //获取到指定资源的所有流控规则
            Collection<FlowRule> rules = (Collection)ruleProvider.apply(resource.getName());
            if (rules != null) {
                Iterator var8 = rules.iterator();
				//这个应用流控规则。若是无法通过则抛出异常,后续规则不再应用
                while(var8.hasNext()) {
                    FlowRule rule = (FlowRule)var8.next();
                    if (!this.canPassCheck(rule, context, node, count, prioritized)) {
                        throw new FlowException(rule.getLimitApp(), rule);
                    }
                }
            }

        }
    }

熔断源码

public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
        this.performChecking(context, resourceWrapper);
        this.fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
    	//获取当前资源所有熔断器
        List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (circuitBreakers != null && !circuitBreakers.isEmpty()) {
            Iterator var4 = circuitBreakers.iterator();

            CircuitBreaker cb;
            //逐个尝试所有熔断器
            do {
                if (!var4.hasNext()) {
                    return;
                }

                cb = (CircuitBreaker)var4.next();
                //若没有通过当前熔断器,则直接抛出异常
            } while(cb.tryPass(context));

            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}

在这里插入图片描述
一般来说,熔断降级其实是对于服务的调用方来说的。在项目中会经常调用其它服务或者是第三方接口,而对于这些接口,一旦它们出现不稳定,就有可能导致自身服务长时间等待,从而出现响应延迟等等问题。此时服务调用方就可基于熔断降级方式解决。一旦第三方接口响应时间过长,那么就可以使用慢调用比例规则,当出现大量长时间响应的情况,那么就直接熔断,不去请求。虽然说熔断降级是针对服务的调用方来说,但是Sentinel本身并没有限制熔断降级一定是调用其它的服务。

完结撒花,sentinel源码还是挺简单的,如有需要收藏的看官,顺便也用发财的小手点点赞哈,如有错漏,也欢迎各位在评论区评论!

原文地址:https://blog.csdn.net/Deligent_lvan/article/details/146267666
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/588083.html

相关文章:

  • “driver-class-name: com.mysql.cj.jdbc.Driver“报错问题的解决
  • Gitee重新远程连接仓库(Linux)
  • 【Redis】缓存穿透、缓存击穿、缓存雪崩
  • Leetcode2272:最大波动的子字符串
  • 文档搜索引擎
  • Gluten 项目贡献指南
  • 行为模式---模版模式
  • S32K144入门笔记(十):TRGMUX的初始化
  • 区块链知识点2
  • 3.水中看月
  • IP 地址
  • 一级运动员最小几岁·棒球1号位
  • 使用OpenResty(基于Nginx和Lua)优化Web服务性能
  • k8s系统学习路径
  • C语言之 条件编译和预处理指令
  • ospf单区域
  • 【MySQL】多表查询(笛卡尔积现象,联合查询、内连接、左外连接、右外连接、子查询)-通过练习快速掌握法
  • 【leetcode hot 100 108】将有序数组转换为二叉搜索树
  • 英语面试常见问题
  • 前缀和算法第一弹(一维前缀和和二维前缀和)