当前位置: 首页 > article >正文

Sentinel 学习笔记3-责任链与工作流程

本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,原文地址如下:

https://blog.csdn.net/baidu_28523317/category_10400605.html

上一篇梳理了概念与核心类:Sentinel 学习笔记2- 概念与核心类介绍-CSDN博客

补一个点:

Sph: 定义了获取资源访问权的接口, 代表性接口: Entry entry = sph.entry(String name);

  CtSph: Sph接口的默认实现, 负责校验流控规则, 当校验不通过时抛出BlockException异常

  SphU: 工具类, 封装了CtSph的调用, 未做特殊处理, 当校验不通过时抛出BlockException异常

  SphO: 工具类, 封装了CtSph的调用, 与SphU不同的是, 校验不通过时不会抛出异常, 而是return false;

责任链

当执行到 SphU.entry 接口时,就到了 Sentinel 的核心骨架--ProcessorSlotChain,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。

其中:辅助资源指标数据统计的 ProcessorSlot

NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot

判断部分:

  • AuthoritySlot:实现黑白名单降级
  • SystemSlot:实现系统自适应降级
  • FlowSlot:实现限流降级
  • DegradeSlot:实现熔断降级

核心结构如下图所示

  • NodeSelectorSlot:给当前请求(context)绑定一个DefaultNode,如果不存在则根据 context 创建 DefaultNode,然后维护整个调用树的父子关系,然后将相关数据保存在 Context 中。
  • ClusterBuilderSlot:给当前请求(context)绑定一个ClusterNode, 如果不存在首先根据 resourceName 创建 ClusterNode,并将其引用保存在 defaultNode中,然后再根据 origin 创建来源节点(类型为 StatisticNode),并将源节点保存在当前的调用点 Entry 中。一个资源(resource)的所有请求共享同一个ClusterNode。
  • StatisticSlot:StatisticSlot 是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作,先调用后续 Slot 的检查过程,如果检查通过增加各个维度的计数器。
    • entry 过程:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。若有 BlockException 抛出,则记录 block 数据;若无异常抛出则算作可通过(pass),记录 pass 数据。
    • exit 过程:若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1。

   Sentinel 的整体工具流程就是使用责任链模式将所有的 ProcessorSlot 按照一定的顺序串成一个单向链表。比如辅助完成资源指标数据统计的 ProcessorSlot 的排序顺序为:

NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot

责任链构建

实现将 ProcessorSlot 串成一个单向链表的是 ProcessorSlotChain,这个 ProcessorSlotChain 是由 SlotChainBuilder 构造的,代码如下:

@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //sortedSlotList获取所有的处理器对象
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }
            //通过尾添法将职责slot添加到DefaultProcessorSlotChain当中
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }
}
  • ProcessorSlotChain作为Slot的责任链,负责责任链的构建和执行。
  • 责任链上的处理器对象 AbstractLinkedProcessorSlot 通过保存指向下一个处理器的对象的进行关联,整体以链表的形式进行串联。

处理器

 ProcessorSlot 比较抽象,前面概念说了Entry表示一个资源的访问权, 通过Sph接口获取,这里的

 fireEntry是获取访问权成功后的处理,具体的接口的定义如下:

public interface ProcessorSlot<T> {

    /**
     * Entrance of this slot.
     *  入口方法
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param param           generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
     * @param count           tokens needed
     * @param prioritized     whether the entry is prioritized
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;

    /**
     * Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.
     *  调用下一个 ProcessorSlot#entry 方法
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param obj             relevant object (e.g. Node)
     * @param count           tokens needed
     * @param prioritized     whether the entry is prioritized
     * @param args            parameters of the original call
     * @throws Throwable blocked exception or unexpected error
     */
    void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
                   Object... args) throws Throwable;

    /**
     * Exit of this slot.
     *  出口方法
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);

    /**
     * Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.
     * 调用下一个 ProcessorSlot#exit 方法
     * @param context         current {@link Context}
     * @param resourceWrapper current resource
     * @param count           tokens needed
     * @param args            parameters of the original call
     */
    void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}

方法参数解析:

  • context:当前调用链路上下文。
  • resourceWrapper:资源 ID。
  • param:泛型参数,一般用于传递 DefaultNode。
  • count:表示申请占用共享资源的数量,只有申请到足够的共享资源才能继续执行。
  • prioritized:表示是否对请求进行优先级排序,SphU#entry 传递过来的值是 false。
  • args:调用方法传递的参数,用于实现热点参数限流。

之所以能够将所有的 ProcessorSlot 构造成一个 ProcessorSlotChain,还是依赖这些 ProcessorSlot 继承了 AbstractLinkedProcessorSlot 类。

public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
    //当前节点的下一个节点
    private AbstractLinkedProcessorSlot<?> next = null;

    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) { // 触发下一个处理器对象的处理
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }

    @SuppressWarnings("unchecked")
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        // 执行具体处理器的逻辑,由具体的处理器自行实现
        entry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        if (next != null) {//调用下一个 ProcessorSlot 的 exit 方法
            next.exit(context, resourceWrapper, count, args);
        }
    }

    public AbstractLinkedProcessorSlot<?> getNext() {
        return next;
    }

    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        this.next = next; // 绑定下一个处理器的逻辑
    }

}

ProcessorSlotChain 也继承 AbstractLinkedProcessorSlot,多了2个方法

public abstract class ProcessorSlotChain extends AbstractLinkedProcessorSlot<Object> {

    /**
     * Add a processor to the head of this slot chain.
     * 添加到链表的头节点
     * @param protocolProcessor processor to be added.
     */
    public abstract void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor);

    /**
     * Add a processor to the tail of this slot chain.
     * 添加到链表末尾
     * @param protocolProcessor processor to be added.
     */
    public abstract void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor);
}

ProcessorSlotChain 的默认实现类是 DefaultProcessorSlotChain,DefaultProcessorSlotChain 有一个指向链表头节点的 first 字段和一个指向链表尾节点的 end 字段。

public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    // first,指向链表头节点
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };//尾节点
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }

 说明:

  • Sentinel 中的 Slot 需要实现 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 的通用接口。
  • 自定义 Slot 一般继承抽象类 AbstractLinkedProcessorSlot 且只要改写 entry/exit 方法实现自定义逻辑。
  • Slot 通过 next 变量保存下一个处理器Slot对象。
  • 在自定义实现的 entry 方法中需要通过 fireEntry 触发下一个处理器的执行,在 exit 方法中通过fireExit 触发下一个处理器的执行。

责任链执行

  com.alibaba.csp.sentinel.CtSph#entryWithPriority()

  private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //开始构造chain
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {// 驱动责任链上的第一个处理器,进而由处理器自驱动执行下一个处理器
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

说明:

  • 整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
  • DefaultProcessorSlotChain的entry首先头部对象first,进而触发处理器的自驱实现处理器的执行。
  • 整体按照entry → fireEntry → transformEntry → entry 的循环顺序依次触发处理器的自驱。


http://www.kler.cn/a/454493.html

相关文章:

  • js版本之ES6特性简述【Proxy、Reflect、Iterator、Generator】(五)
  • 微积分复习(微分方程)
  • 阿里云新用户服务器配置
  • Educational Codeforces Round 173 (Rated for Div. 2) - Codeforces
  • SQL 实战:日期与时间函数 – 统计数据的时间跨度与趋势
  • 理解神经网络
  • 关于分布式数据库需要了解的相关知识!!!
  • 【NIFI】实现HANA->ORACLE数据同步
  • SQLMAP注入之MySQL注入总结
  • Windows脚本清理C盘缓存
  • 电脑提示报错NetLoad.dll文件丢失或损坏?是什么原因?
  • (亲测)frp对外提供简单的文件访问服务-frp静态文件效果
  • STUN服务器实现NAT穿透
  • JSON 系列之2:JSON简单查询
  • Java中三大构建工具的发展历程(Ant、Maven和Gradle)
  • Vue中动态样式绑定+CSS变量实现切换明暗主题功能——从入门到进阶
  • 如何利用Python爬虫精准获取苏宁易购商品详情
  • K8s DaemonSet的介绍
  • Android WebView 与 H5 双向通信实现详解
  • 【商城源码的开发环境】
  • VSCode 插件开发实战(三):插件配置项自定义设置
  • 如何在服务器上克隆、pull、push GitHub私有项目
  • GraalVM完全指南:云原生时代下使用GraalVM将Spring Boot 3应用转换为高效Windows EXE文件
  • 12.24 k8s yaml文件类型和介绍
  • 通过WSL 在 Windows 11中实现Linux虚拟环境并连接给项目部署使用的办法
  • FlaskAPI-初识