Sentinel 学习笔记3-责任链与工作流程
本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,原文地址如下:
https://blog.csdn.net/baidu_28523317/category_10400605.html
上一篇梳理了概念与核心类:Sentinel 学习笔记2- 概念与核心类介绍-CSDN博客
补一个点:
Sph: 定义了获取资源访问权的接口, 代表性接口: Entry entry = sph.entry(String name);
CtSph: Sph接口的默认实现, 负责校验流控规则, 当校验不通过时抛出BlockException异常
SphU: 工具类, 封装了CtSph的调用, 未做特殊处理, 当校验不通过时抛出BlockException异常
SphO: 工具类, 封装了CtSph的调用, 与SphU不同的是, 校验不通过时不会抛出异常, 而是return false;
责任链
当执行到 SphU.entry
接口时,就到了 Sentinel 的核心骨架--ProcessorSlotChain,将不同的 Slot 按照顺序串在一起(责任链模式),从而将不同的功能(限流、降级、系统保护)组合在一起。slot chain 其实可以分为两部分:统计数据构建部分(statistic)和判断部分(rule checking)。
其中:辅助资源指标数据统计的 ProcessorSlot
NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot
判断部分:
- AuthoritySlot:实现黑白名单降级
- SystemSlot:实现系统自适应降级
- FlowSlot:实现限流降级
- DegradeSlot:实现熔断降级
核心结构如下图所示
- NodeSelectorSlot:给当前请求(context)绑定一个DefaultNode,如果不存在则根据 context 创建 DefaultNode,然后维护整个调用树的父子关系,然后将相关数据保存在 Context 中。
- ClusterBuilderSlot:给当前请求(context)绑定一个ClusterNode, 如果不存在首先根据 resourceName 创建 ClusterNode,并将其引用保存在 defaultNode中,然后再根据 origin 创建来源节点(类型为 StatisticNode),并将源节点保存在当前的调用点 Entry 中。一个资源(resource)的所有请求共享同一个ClusterNode。
- StatisticSlot:
StatisticSlot
是 Sentinel 最为重要的类之一,用于根据规则判断结果进行相应的统计操作,先调用后续 Slot 的检查过程,如果检查通过增加各个维度的计数器。- entry 过程:依次执行后面的判断 slot。每个 slot 触发流控的话会抛出异常(BlockException 的子类)。若有 BlockException 抛出,则记录 block 数据;若无异常抛出则算作可通过(pass),记录 pass 数据。
- exit 过程:若无 error(无论是业务异常还是流控异常),记录 complete(success)以及 RT,线程数-1。
Sentinel 的整体工具流程就是使用责任链模式将所有的 ProcessorSlot 按照一定的顺序串成一个单向链表。比如辅助完成资源指标数据统计的 ProcessorSlot 的排序顺序为:
NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot
责任链构建
实现将 ProcessorSlot 串成一个单向链表的是 ProcessorSlotChain,这个 ProcessorSlotChain 是由 SlotChainBuilder 构造的,代码如下:
@Spi(isDefault = true)
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
//sortedSlotList获取所有的处理器对象
List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
continue;
}
//通过尾添法将职责slot添加到DefaultProcessorSlotChain当中
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
- ProcessorSlotChain作为Slot的责任链,负责责任链的构建和执行。
- 责任链上的处理器对象 AbstractLinkedProcessorSlot 通过保存指向下一个处理器的对象的进行关联,整体以链表的形式进行串联。
处理器
ProcessorSlot 比较抽象,前面概念说了Entry表示一个资源的访问权, 通过Sph接口获取,这里的
fireEntry是获取访问权成功后的处理,具体的接口的定义如下:
public interface ProcessorSlot<T> {
/**
* Entrance of this slot.
* 入口方法
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param param generics parameter, usually is a {@link com.alibaba.csp.sentinel.node.Node}
* @param count tokens needed
* @param prioritized whether the entry is prioritized
* @param args parameters of the original call
* @throws Throwable blocked exception or unexpected error
*/
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
/**
* Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}.
* 调用下一个 ProcessorSlot#entry 方法
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param obj relevant object (e.g. Node)
* @param count tokens needed
* @param prioritized whether the entry is prioritized
* @param args parameters of the original call
* @throws Throwable blocked exception or unexpected error
*/
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
/**
* Exit of this slot.
* 出口方法
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param count tokens needed
* @param args parameters of the original call
*/
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
/**
* Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}.
* 调用下一个 ProcessorSlot#exit 方法
* @param context current {@link Context}
* @param resourceWrapper current resource
* @param count tokens needed
* @param args parameters of the original call
*/
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
方法参数解析:
- context:当前调用链路上下文。
- resourceWrapper:资源 ID。
- param:泛型参数,一般用于传递 DefaultNode。
- count:表示申请占用共享资源的数量,只有申请到足够的共享资源才能继续执行。
- prioritized:表示是否对请求进行优先级排序,SphU#entry 传递过来的值是 false。
- args:调用方法传递的参数,用于实现热点参数限流。
之所以能够将所有的 ProcessorSlot 构造成一个 ProcessorSlotChain,还是依赖这些 ProcessorSlot 继承了 AbstractLinkedProcessorSlot 类。
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
//当前节点的下一个节点
private AbstractLinkedProcessorSlot<?> next = null;
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) { // 触发下一个处理器对象的处理
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
// 执行具体处理器的逻辑,由具体的处理器自行实现
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {//调用下一个 ProcessorSlot 的 exit 方法
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next; // 绑定下一个处理器的逻辑
}
}
ProcessorSlotChain 也继承 AbstractLinkedProcessorSlot,多了2个方法
public abstract class ProcessorSlotChain extends AbstractLinkedProcessorSlot<Object> {
/**
* Add a processor to the head of this slot chain.
* 添加到链表的头节点
* @param protocolProcessor processor to be added.
*/
public abstract void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor);
/**
* Add a processor to the tail of this slot chain.
* 添加到链表末尾
* @param protocolProcessor processor to be added.
*/
public abstract void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor);
}
ProcessorSlotChain 的默认实现类是 DefaultProcessorSlotChain,DefaultProcessorSlotChain 有一个指向链表头节点的 first 字段和一个指向链表尾节点的 end 字段。
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
// first,指向链表头节点
AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
super.fireExit(context, resourceWrapper, count, args);
}
};//尾节点
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
说明:
- Sentinel 中的 Slot 需要实现 com.alibaba.csp.sentinel.slotchain.ProcessorSlot 的通用接口。
- 自定义 Slot 一般继承抽象类 AbstractLinkedProcessorSlot 且只要改写 entry/exit 方法实现自定义逻辑。
- Slot 通过 next 变量保存下一个处理器Slot对象。
- 在自定义实现的 entry 方法中需要通过 fireEntry 触发下一个处理器的执行,在 exit 方法中通过fireExit 触发下一个处理器的执行。
责任链执行
com.alibaba.csp.sentinel.CtSph#entryWithPriority()
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
//开始构造chain
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {// 驱动责任链上的第一个处理器,进而由处理器自驱动执行下一个处理器
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
说明:
- 整个责任链上处理器的执行通过Invoker对象的驱动,而非责任链对象的驱动。
- DefaultProcessorSlotChain的entry首先头部对象first,进而触发处理器的自驱实现处理器的执行。
- 整体按照entry → fireEntry → transformEntry → entry 的循环顺序依次触发处理器的自驱。