当前位置: 首页 > article >正文

sentinel学习笔记5-资源指标数据统计

本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,写的好值得推荐,我整理的有所删减,推荐看原文。

https://blog.csdn.net/baidu_28523317/category_10400605.html

 资源指标数据统计

回顾下责任链:Sentinel 学习笔记3-责任链与工作流程-CSDN博客

为了便于理解,在正式看 StatisticSlot代码之前,先看看责任链执行过程。

https://zhuanlan.zhihu.com/p/5675336888

 链表中装配的节点对象类型为AbstractLinkedProcessorSlot,也就是图中这些Slot的父类,它声明了两个方法,entry、exit分别作为Slot的入口和出口,并且在方法入参中携带了当前调用链路的上下文信息。请求在进入处理链条后,按先后顺序会经过三类Slot,前置的NodeSelectorSlot、ClusterBuilderSlot用于调用链路的提取,中间StatisticSlot是通用的滑动窗口计数功能,比如RT、通过数、拒绝数、异常数等,后置的ParamFlowSlot、SystemSlot等则为真正的规则验证链条。如下图所示

AbstractLinkedProcessSlot中fireEntry和fireExit,主要是用于在当前节点entry或exit方法执行过程中触发对next节点的调度,这就使得Slot链表触发顺序和完成顺序并不一定相同。每个 ProcessorSlot 都有权决定是先等后续的 ProcessorSlot 执行完成再做自己的事情,还是先完成自己的事情再让后续 ProcessorSlot 执行。比如StatisticSlot 在统计指标数据之前会先调用后续的 ProcessorSlot,根据后续 ProcessorSlot 判断是否需要拒绝该请求的结果决定记录哪些指标数据。

 StatisticSlot :

  • entry:先调用 fireEntry 方法完成调用后续的 ProcessorSlot#entry 方法,根据后续的 ProcessorSlot 是否抛出 BlockException 决定记录哪些指标数据,并将资源并行占用的线程数加 1。
  • exit:若无任何异常,则记录响应成功、请求执行耗时,将资源并行占用的线程数减 1。

entry 方法

正常情况

当后续的 ProcessorSlot 未抛出任何异常时,表示不需要拒绝该请求,放行当前请求。

 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // Do some checking. 先进行后面的slot进行流控/降级的判断
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();//并行线程数+1
            node.addPassRequest(count);//请求数+count(一般是1)
            // 如果调用来源不为空,也将调用来源的 StatisticNode 的当前并行占用线程数+1,通过请求数+1
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            //如果流量类型为 IN,增加资源全局唯一的 ClusterNode( 整个集群node)的统计信息
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }
             //回调所有 ProcessorSlotEntryCallback#onPass
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        }

其中:回调涉及的ProcessorSlotEntryCallback 接口的定义如下

public interface ProcessorSlotEntryCallback<T> {
    //在请求被放行时被回调执行
    void onPass(Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args) throws Exception;
   //在请求被拒绝时被回调执行
    void onBlocked(BlockException ex, Context context, ResourceWrapper resourceWrapper, T param, int count, Object... args);
}
   异常PriorityWaitException 

在需要对请求限流时,只有使用默认流量效果控制器才可能会抛出 PriorityWaitException 异常,说明当前请求已经被休眠了一会了,但请求还是允许通过的,只是不需要为 DefaultNode 记录这个请求的指标数据了,其他的自增,回调还有。

} catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        }
 异常BlockException
  } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            // 没有通过。将异常信息保存到当前的entry里面
            context.getCurEntry().setBlockError(e);

            // Add block count. 增加拒绝数量
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {//调用来源不为空,让调用来源的 StatisticsNode 也记录当前请求被拒绝
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }
            //流量类型为 IN
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                // 统计所有资源指标数据的 ClusterNode 也记录当前请求被拒绝
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers. 处理回调
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } 
其它异常

其它异常并非指业务异常,因为此时业务代码还未执行,而业务代码抛出的异常是通过调用 Tracer#trace 方法记录的。此时不再统计异常。

 } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }

可以看到不同的异常有不同的统计方式。一般限流导致的异常是BlockException 。

 exit 方法

exit 方法被调用时,要么请求被拒绝,要么请求被放行并且已经执行完成,所以 exit 方法需要知道当前请求是否正常执行完成,这正是 StatisticSlot 在捕获异常时将异常记录到当前 Entry 的原因,exit 方法中通过 Context 可获取到当前 CtEntry,从当前 CtEntry 可获取 entry 方法中写入的异常

  public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        Node node = context.getCurNode();

        if (context.getCurEntry().getBlockError() == null) {
            // Calculate response time (use completeStatTime as the time of completion).
            long completeStatTime = TimeUtil.currentTimeMillis();
            context.getCurEntry().setCompleteTimestamp(completeStatTime);
            long rt = completeStatTime - context.getCurEntry().getCreateTimestamp();

            Throwable error = context.getCurEntry().getError();

            // Record response time and success count.   记录执行耗时与成功总数
            recordCompleteFor(node, count, rt, error);
            recordCompleteFor(context.getCurEntry().getOriginNode(), count, rt, error);
            if (resourceWrapper.getEntryType() == EntryType.IN) { // 流量类型为 in 时
                recordCompleteFor(Constants.ENTRY_NODE, count, rt, error);
            }
        }

        // Handle exit event with registered exit callback handlers. 回调
        Collection<ProcessorSlotExitCallback> exitCallbacks = StatisticSlotCallbackRegistry.getExitCallbacks();
        for (ProcessorSlotExitCallback handler : exitCallbacks) {
            handler.onExit(context, resourceWrapper, count, args);
        }

        // fix bug https://github.com/alibaba/Sentinel/issues/2374
        fireExit(context, resourceWrapper, count, args);
    }

    private void recordCompleteFor(Node node, int batchCount, long rt, Throwable error) {
        if (node == null) {
            return;
        }
        node.addRtAndSuccess(rt, batchCount);
        node.decreaseThreadNum();// 自减当前资源占用的线程数

        if (error != null && !(error instanceof BlockException)) {
            node.increaseExceptionQps(batchCount);
        }
    }

entry没抛出异常,会记录下rt,也就是业务代码执行的响应时间,然后将资源对应的并发线程数-1操作。

 资源指标数据的记录过程

ClusterNode 才是一个资源全局的指标数据统计节点,但是上面的StatisticSlot#entry 方法与 exit 方法中看到。因为 ClusterNode 被 ClusterBuilderSlot 交给了 DefaultNode 掌管,在 DefaultNode 的相关指标数据收集方法被调用时,ClusterNode 的对应方法也会被调用。

public class DefaultNode extends StatisticNode {
      /**
     * Associated cluster node.
     */
    private ClusterNode clusterNode;

...
    @Override
    public void increaseExceptionQps(int count) {
        super.increaseExceptionQps(count);
        this.clusterNode.increaseExceptionQps(count);
    }

    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        super.addRtAndSuccess(rt, successCount);
        this.clusterNode.addRtAndSuccess(rt, successCount);
    }

    @Override
    public void increaseThreadNum() {
        super.increaseThreadNum();
        this.clusterNode.increaseThreadNum();
    }

    @Override
    public void decreaseThreadNum() {
        super.decreaseThreadNum();
        this.clusterNode.decreaseThreadNum();
    }

    @Override
    public void addPassRequest(int count) {
        super.addPassRequest(count);
        this.clusterNode.addPassRequest(count);
    }

假设当前请求被成功处理,StatisticSlot 会调用 DefaultNode#addRtAndSuccess 方法记录请求处理成功、并且记录处理请求的耗时,DefaultNode 先调用父类StatisticNode的 addRtAndSuccess 方法,然后 DefaultNode 会调用 ClusterNode#addRtAndSuccess 方法.ClusterNode 与 DefaultNode 都是 StatisticNode 的子类

StatisticNode 中rollingCounterInSecond 是一个秒级的滑动窗口,rollingCounterInMinute 是一个分钟级的滑动窗口,类型为 ArrayMetric,com.alibaba.csp.sentinel.node.StatisticNode#addRtAndSuccess

    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);//秒级
        rollingCounterInSecond.addRT(rt);

        rollingCounterInMinute.addSuccess(successCount);//分钟级
        rollingCounterInMinute.addRT(rt);
    }

com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addSuccess

    public void addSuccess(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addSuccess(count);
    }

com.alibaba.csp.sentinel.slots.statistic.data.MetricBucket#addSuccess

    public void addSuccess(int n) {
        add(MetricEvent.SUCCESS, n);
    }

MetricBucket 使用 LongAdder 记录各项指标数据的值,在 MetricEvent 枚举类中定义了 Sentinel 会收集哪些指标数据,

public enum MetricEvent {

    /**
     * Normal pass. 请求被放行
     */
    PASS,
    /**
     * Normal block. 请求被拒绝
     */
    BLOCK,
    EXCEPTION, //异常
    SUCCESS, //处理成功
    RT, //耗时

    /**
     * Passed in future quota (pre-occupied, since 1.5.0).
     * 预通过总数
     */
    OCCUPIED_PASS
}

 数据统计小结:

首先 NodeSelectorSlot 为资源创建 DefaultNode,将 DefaultNode 向下传递,ClusterBuilderSlot 负责给资源的 DefaultNode 加工,添加 ClusterNode 这个零部件,再将 DefaultNode 向下传递给 StatisticSlot。

  • 一个调用链路上只会创建一个 Context,在调用链路的入口创建(一个调用链路上第一个被 Sentinel 保护的资源)。
  • 一个 Context 名称只创建一个 EntranceNode,也是在调用链路的入口创建,调用 Context#enter 方法时创建。
  • 与方法调用的入栈出栈一样,一个线程上调用多少次 SphU#entry 方法就会创建多少个 CtEntry。
  • 一个调用链路上,如果多次调用 SphU#entry 方法传入的资源名称都相同,那么只会创建一个 DefaultNode,如果资源名称不同,会为每个资源名称创建一个 DefaultNode,当前 DefaultNode 会作为调用链路上的前一个 DefaultNode 的子节点。
  • 一个资源有且只有一个 ProcessorSlotChain,一个资源有且只有一个 ClusterNode。
  • 一个 ClusterNode 负责统计一个资源的全局指标数据。
  • StatisticSlot 负责记录请求是否被放行、请求是否被拒绝、请求是否处理异常、处理请求的耗时等指标数据,在 StatisticSlot 调用 DefaultNode 用于记录某项指标数据的方法时,DefaultNode 也会调用 ClusterNode 的相对应方法,完成两份指标数据的收集。
  • DefaultNode 统计当前资源的各项指标数据的维度是同一个 Context(名称相同),而 ClusterNode 统计当前资源各项指标数据的维度是全局。

http://www.kler.cn/a/453707.html

相关文章:

  • debezium独立版使用(不结合kafuka)
  • NLP中的神经网络基础
  • CSS(二):美化网页元素
  • 前端:改变鼠标点击物体的颜色
  • CSS快速入门
  • flask后端开发(7):加载静态文件
  • v3+ts 批量引入组件
  • DDI-GPT:使用知识图谱增强的大模型对药物相互作用进行可解释的预测
  • PPO 可能出现 KL 爆炸等问题的详细分析(KL Explosions in PPO): 中英双语
  • 无问社区-无问AI模型
  • 从零开始掌握Spring MVC:深入解析@Controller与@RequestMapping注解的使用
  • iic通信底层讲解
  • Golang微服务-protobuf
  • Niushop开源商城(漏洞复现)
  • 基于人工智能时代政务智慧转型的实现前景初探
  • 实战分享:开发设计文档模版及编写要点
  • 【高等数学】空间解析几何
  • BFS【东北大学oj数据结构11-3】C++
  • 如何在 Ubuntu 22.04 上安装以及使用 MongoDB
  • 牛客网刷题 ——C语言初阶——BC112小乐乐求和