当前位置: 首页 > article >正文

【深入理解SpringCloud微服务】Sentinel源码解析——FlowSlot流控规则

Sentinel源码解析——FlowSlot流控规则

  • StatisticNode与StatisticSlot
    • StatisticNode内部结构
    • StatisticSlot
  • FlowSlot流控规则

在前面的文章,我们对Sentinel的原理进行了分析,Sentinel底层使用了责任链模式,这个责任链就是ProcessorSlotChain对象,链中的每个节点都是一个ProcessorSlot,每个ProcessorSlot对应一个规则的处理。

在这里插入图片描述

然后我们又对Sentinel的整体流程进行了源码分析,我们分析了Sentinel的ProcessorSlotChain对象默认的构成:

在这里插入图片描述

但是我们没有对每个slot进行深入的分析,本篇文章就对它们进行深入的了解。

StatisticNode与StatisticSlot

FlowSlot是根据StatisticSlot中的统计数据进行流控规则校验的,而StatisticSlot的统计数据又是维护在StatisticNode对象中,呈现以下关系:

在这里插入图片描述

StatisticNode内部结构

public class StatisticNode implements Node {

    // 统计1秒内数据的滑动时间窗,1秒内有两个时间窗格,每个时间窗格的时间跨度是500ms
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    // 统计1分钟内数据的滑动时间窗,1分钟内有60个时间窗格,每个时间窗格的时间跨度时1s
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

    /**
     * 当前并发线程数计数器
     */
    private LongAdder curThreadNum = new LongAdder();

	...

}

StatisticNode有三个成员变量,其中rollingCounterInSecond和rollingCounterInMinute都是滑动时间窗计数器,用于分别统计1秒内和1分钟内的数据。

而curThreadNum则是一个LongAdder类型的并发线程数计数器。

在这里插入图片描述

rollingCounterInSecond和rollingCounterInMinute都是Metric类型,Metric是一个接口,实现类是ArrayMetric。

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

	...

}

ArrayMetric里面是一个LeapArray<MetricBucket>。

在这里插入图片描述

public abstract class LeapArray<T> {

    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;

    protected final AtomicReferenceArray<WindowWrap<T>> array;

    public LeapArray(int sampleCount, int intervalInMs) {
        ...
		
		// 单个时间窗口长度 500ms
        this.windowLengthInMs = intervalInMs / sampleCount;
        // 以毫秒为单位的计数器统计的时间跨度 1000ms
        this.intervalInMs = intervalInMs;
        // 以秒为单位的计数器统计的时间跨度 1s
        this.intervalInSecond = intervalInMs / 1000.0;
        // 时间窗口个数 2个
        this.sampleCount = sampleCount;
		
		// 时间窗口数组
        this.array = new AtomicReferenceArray<>(sampleCount);
    }

}

在这里插入图片描述

LeapArray中的成员属性都是根据构造器参数sampleCount(时间窗口个数)和intervalInMs(以毫秒为单位的计数器统计的时间跨度)计算得出的。

其中sampleCount和intervalInMs直接作为LeapArray的成员属性保存。

windowLengthInMs单个时间窗口长度,自然是intervalInMs除以sampleCount获得。rollingCounterInSecond的intervalInMs是1000,sampleCount是2,因此windowLengthInMs为500,表示500ms一个时间窗口。

intervalInSecond是以秒为单位的计数器统计的时间跨度,因此是intervalInMs除以1000,这里的intervalInSecond就是1,代表当前计数器统计的时间范围是1秒。

array是一个AtomicReferenceArray,他就是时间窗口数组,每个窗口是一个WindowWrap对象,泛型是MetricBucket,sampleCount就是AtomicReferenceArray的数组长度,因此是两个时间窗口。

在这里插入图片描述

WindowWrap里面的这个MetricBucket就是真正记录统计数值的。

public class MetricBucket {

	// 记录统计数组的数组,每个下标对应一个指标的统计值
    private final LongAdder[] counters;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
    }
    
	...
}

MetricBucket里面还有结构,并不是一个单一的value,而是一个LongAdder[],数组中的每个LongAdder对应一个指标的统计,具体有哪些指标,可以查看MetricEvent中的枚举。

public enum MetricEvent {
	// 规则校验通过数
    PASS,
    // 规则校验失败数
    BLOCK,
    // 异常数
    EXCEPTION,
    // 成功数
    SUCCESS,
    // 所有成功调用的响应时间
    RT,
    OCCUPIED_PASS
}

在这里插入图片描述

StatisticSlot

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            fireEntry(context, resourceWrapper, node, count, prioritized, args);
            
            // 增加当前线程数
            node.increaseThreadNum();
            // 增加规则校验通过数
            node.addPassRequest(count);
			...
        } catch (PriorityWaitException ex) {
            ...
        } catch (BlockException e) {
            ...
            // 增加流控规则校验失败计数
            node.increaseBlockQps(count);
            ...
        } catch (Throwable e) {
        	// 设置异常到context
			context.getCurEntry().setError(e);

            throw e;
		}
    }

StatisticSlot的entry方法与其他的slot处理流程不大一样,它是先调用fireEntry方法让slot链继续往后执行。然后后面才进行相关指标的统计。
在这里插入图片描述

@Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        Node node = context.getCurNode();

        if (context.getCurEntry().getBlockError() == null) {
            // 计算响应时间rt
            long completeStatTime = TimeUtil.currentTimeMillis();
            context.getCurEntry().setCompleteTimestamp(completeStatTime);
            long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

			// 从context中取出error,(如果抛异常,上面的entry方法会设置到context中)
            Throwable error = context.getCurEntry().getError();

            
            recordCompleteFor(node, count, rt, error);
            // ...
        }

        // ...

        fireExit(context, resourceWrapper, count, args);
    }

    private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
        ...
        // 增加响应时间和成功数
        node.addRtAndSuccess(rt, batchCount);
        // 减去当前线程数
        node.decreaseThreadNum();

        if (error != null && !(error instanceof BlockException)) {
        	// 如果有异常,增加异常数
            node.increaseExceptionQps(batchCount);
        }
    }

StatisticSlot的exit方法最后做的就是增加响应时间和成功数以及减去当前线程数;如果context中有异常(就是entry方法塞进去的)还会增加异常数。

在这里插入图片描述

我们发现StatisticSlot做的这些指标统计,全是调用node对象的方法,这个node对象就是StatisticNode。

node.increaseThreadNum()增加并发线程数:
StatisticNode#increaseThreadNum()

    @Override
    public void increaseThreadNum() {
        curThreadNum.increment();
    }

并发线程数是直接加到StatisticNode中的curThreadNum变量中。

而其他的指标都是加到滑动时间窗计数器里面,我们挑一个增加规则校验通过数的node.addPassRequest(count)来看。
StatisticNode#addPassRequest():

    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

两个滑动时间窗计数器都增加。

在这里插入图片描述

ArrayMetric#addPass(int)

    @Override
    public void addPass(int count) {
    	// 根据当前时间戳定位对应的时间窗口
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        // 时间窗口中的paas计数加1
        wrap.value().addPass(count);
    }

先是根据当前时间戳定位对应的时间窗口,然后把时间窗口中的pass计数加1。

在这里插入图片描述

看下是如何根据当前时间戳定位对应的时间窗口的:
LeapArray#currentWindow()

    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }

	public WindowWrap<T> currentWindow(long timeMillis) {
		// 根据当前时间戳计算时间窗数组下标
        int idx = calculateTimeIdx(timeMillis);
        // 计算窗口开始时间windowStart
        long windowStart = calculateWindowStart(timeMillis);
        while (true) {
        	// 根据下标取得时间窗
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 定位到的时间窗口为空,创建,窗口开始时间就是windowStart
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                ...
            } else if (windowStart == old.windowStart()) {
            	// 时间窗口的开始时间等于windowStart,表示这个时间窗没过期,返回该时间窗
                return old;
            } else if (windowStart > old.windowStart()) {
            	// 时间窗口的开始时间小于windowStart,表示这个时间窗已过期,重置里面的计数,然后再返回这个时间窗口
                return resetWindowTo(old, windowStart);
            } ...
        }
    }

首先根据当前时间戳计算时间窗数组下标idx,通过下标就可以取得对应的时间窗old = array.get(idx)。

除此以外,还会根据当前时间戳计算一个窗口开始时间windowStart,然后每个时间窗创建的时候都会记录一个开始时间old.windowStart(),两个开始时间一比较,就可得知当前时间窗是否已过期。如果old.windowStart()小于windowStart,那么表示时间窗口old已经过期了。

在这里插入图片描述

根据当前时间戳计算目标窗口下标:
LeapArray#calculateTimeIdx(long)

    private int calculateTimeIdx(long timeMillis) {
    	// 当前时间戳除以当个窗口的时间跨度(500ms),得到timeId
        long timeId = timeMillis / windowLengthInMs;
        // timeId对时间窗数组长度取模,得到下标idx
        return (int)(timeId % array.length());
    }

根据当前时间戳计算窗口开始时间windowStart:
LeapArray#calculateWindowStart(long)

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    	// 当前时间戳 - 当前时间戳 % 当个时间窗的时间跨度
        return timeMillis - timeMillis % windowLengthInMs;
    }

在这里插入图片描述

得到时间窗后,就是执行“wrap.value().addPass(count);”这行代码,首先时间窗WindowWrap的泛型是MetricBucket,因此wrap.value()取到的是MetricBucket对象,然后调用MetricBucket的addPass(count)方法。

MetricBucket#addPass(int)

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public MetricBucket add(MetricEvent event, long n) {
    	// counters是个LongAdder[]
    	// PASS对应枚举值0
    	// 因此这里就是对MetricBucket中的LongAdder数组counters中下标为0的LongAdder增加n
        counters[event.ordinal()].add(n);
        return this;
    }

我们上面已经说过MetricBucket中是一个LongAdder[]记录不同指标的计数。而Event.PASS对应的枚举值是0,因此这里就是对LongAdder数组counters中下标为0的LongAdder增加n,正好对应的就是paas指标的LongAdder。

在这里插入图片描述

FlowSlot流控规则

FlowSlot的流控规则校验逻辑全在entry方法中,而exit直接调用fireExit方法往下走,因此我们只看FlowSlot的entry方法即可。

FlowSlot#entry(…)

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
		// 流控规则校验
        checkFlow(resourceWrapper, context, node, count, prioritized);

        fireEntry(...);
    }

FlowSlot#checkFlow(…)

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        // 流控规则校验
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }

FlowRuleChecker#checkFlow(…)

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        ...
        // 从ruleProvider中根据资源名称取得对应的流控规则集合
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
            	// 逐个校验每个流控规则,一旦有一个校验不通过,则抛出FlowException
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

沿着FlowSlot的entry方法一路进来,到FlowRuleChecker的checkFlow方法。主体流程就是先根据资源名称取到对应的流控规则集合,然后再遍历这个流控规则集合,逐一校验每个流程规则,如果有哪一个规则校验没有通过,那么就抛出FlowException。

在这里插入图片描述

FlowRuleChecker#canPassCheck(FlowRule, …)

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        ...
        // 单个流控规则校验
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
        ...
        // 单个流控规则校验
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }    

“return rule.getRater().canPass(selectedNode, acquireCount, prioritized);”这一行代码有可能进入不同的实现类,视我们选择的“流控效果”而定。

  • DefaultController#canPass(Node, int, boolean):快速失败(滑动时间窗算法)
  • WarmUpController#canPass(Node, int, boolean):Warm Up(令牌桶算法)
  • RateLimiterController#canPass(Node, int, boolean):排队等待(漏桶算法)

在这里插入图片描述

在这里插入图片描述

DefaultController#canPass(Node, int, boolean):

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    	// 当前的QPS(或者并发线程数,看我们选的“阈值类型”是什么)
        int curCount = avgUsedTokens(node);
        // 加上要增加的数目,如果超了,返回false表示校验失败,没超则返回true表示校验通过
        if (curCount + acquireCount > count) {
            ...
            return false;
        }
        return true;
    }

DefaultController#canPass(Node, int, boolean)方法是流控效果为“快速失败”对应的流控规则校验类,使用的时滑动时间窗算法。首先计算获取当前的QPS或者并发线程数,这个视乎我们选的“阈值类型”而定,然后加上当前要申请的数目acquireCount,如果超过了阈值,返回false表示校验失败,没超则返回true表示校验通过。

在这里插入图片描述

DefaultController#avgUsedTokens(Node):

    private int avgUsedTokens(Node node) {
        ...
        // node.curThreadNum()取得的时并发线程数
        // node.passQps()取得的是QPS
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }

node.curThreadNum()其实就是取的StatisticNode中curThreadNum属性的值,它是一个LongAdder类型,上面已经介绍过。
、StatisticNode#curThreadNum()

    @Override
    public int curThreadNum() {
        return (int)curThreadNum.sum();
    }

node.passQps()则要进行计算。
StatisticNode#passQps()

    @Override
    public double passQps() {
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
    }

在这里插入图片描述

rollingCounterInSecond.pass()取得的是所有的时间窗口pass计数的汇总:
ArrayMetric#pass()

    @Override
    public long pass() {
    	// 这里是判断如果当前的时间窗口过期,则重置它
        data.currentWindow();
        // 所有的时间窗口的paas计数加总到pass
        long pass = 0;
        List<MetricBucket> list = data.values();

        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
    }

在这里插入图片描述
window.pass()则是取得单个MetricBucket中的pass计数。
MetricBucket#pass():

    public long pass() {
        return get(MetricEvent.PASS);
    }
    public long get(MetricEvent event) {
        return counters[event.ordinal()].sum();
    }

在这里插入图片描述

这样就取得了计数器中每个时间窗口的pass计数加总后的值,但这个并不是QPS,因为有可能这个计数器的时间跨度是大于1秒的,因此还要除以rollingCounterInSecond.getWindowIntervalInSec()。

rollingCounterInSecond.getWindowIntervalInSec()取得的是整个计数器的时间跨度(以秒为单位):
ArrayMetric#getWindowIntervalInSec()

    @Override
    public double getWindowIntervalInSec() {
        return data.getIntervalInSecond();
    }

data是LeapArray类型,进入LeapArray的getIntervalInSecond方法:
LeapArray#getIntervalInSecond()

    public double getIntervalInSecond() {
        return intervalInSecond;
    }

返回的是LeapArray中的intervalInSecond,也就是计数器统计的时间跨度(以秒为单位),那么“rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec()”得到的就是平均每秒的通过数(pass),也就是QPS。

在这里插入图片描述

以下是FlowSlot流控规则校验的整体流程,结合StatisticNode内部结构的那张大图看,思路就非常清晰了。

在这里插入图片描述


http://www.kler.cn/a/457366.html

相关文章:

  • Oracle ASM命令行工具asmcmd命令及其使用方法
  • seata分布式事务详解(AT)
  • kafka开机自启失败问题处理
  • 学技术学英文:Tomcat的线程模型调优
  • Ubuntu 安装英伟达显卡驱动问题记录
  • 单元测试3.0+ @RunWith(JMockit.class)+mock+injectable+Expectations
  • 从2024 re:Invent,看亚马逊云科技的AI布局
  • 【机器学习】分类
  • Element-plus自动导入
  • Crawler实现英语单词的翻译
  • linux内核如何实现TCP的?
  • 【Bug记录】黑马点评使用jmeter进行秒杀抢购时报401以及200后HTTP请求依旧异常的解决办法
  • Cpp::AVL树的机制详解与实现(23)
  • 产品原型设计
  • IntelliJ IDEA 远程调试
  • 在Ubuntu下通过Docker部署Misskey服务器
  • MATLAB语言的数据库编程
  • 基于STM32F103控制L298N驱动两相四线步进电机
  • 【递归与回溯深度解析:经典题解精讲(中篇)】—— LeetCode
  • 新版IDEA配置 Tomcat
  • 期末算法分析程序填空题
  • 32132132123
  • Leetcode经典题20--长度最小的子数组
  • SpringSecurity使用过滤器实现图形验证码
  • matlab smith自适应模糊PID房间湿度控制
  • 基于TCP的Qt网络通信