Sentinel 源码深度解析
一、引言
长路漫漫~
二、Sentinel 整体架构概述
2.1 架构设计目标
Sentinel 的设计旨在为分布式系统提供全面、灵活且高效的流量治理解决方案。具体目标包括:
- 高可用性:确保在高并发场景下,系统能够稳定运行,不会因为流量过大而崩溃。
- 灵活性:支持多种流量控制规则和熔断降级策略,能够根据不同的业务场景进行灵活配置。
- 可扩展性:采用模块化和插件化的设计,方便用户扩展和定制功能。
- 实时性:能够实时监测和统计流量信息,及时做出流量控制和熔断降级决策。
2.2 整体架构图
Sentinel 的整体架构主要由以下几个部分组成:
- 核心库:包含 Sentinel 的核心逻辑,如流量控制、熔断降级、统计等。
- 规则管理模块:负责规则的存储、加载和更新,支持多种规则源,如内存、文件、远程配置中心等。
- 传输模块:用于规则的推送和拉取,支持与控制台的通信。
- 控制台:提供可视化的管理界面,方便用户配置规则、查看统计信息和监控系统状态。
- 扩展模块:允许用户根据自己的需求开发自定义的插件,如自定义规则检查器、统计器等。
+-------------------+
| 控制台 |
+-------------------+
|
| 规则推送/拉取
v
+-------------------+
| 传输模块 |
+-------------------+
|
| 规则加载/更新
v
+-------------------+
| 规则管理模块 |
+-------------------+
|
| 规则应用
v
+-------------------+
| 核心库 |
| - 流量控制 |
| - 熔断降级 |
| - 统计 |
+-------------------+
|
| 自定义扩展
v
+-------------------+
| 扩展模块 |
+-------------------+
三、核心概念解析
3.1 资源(Resource)
资源是 Sentinel 中最基本的概念,它代表了系统中的一个可被保护的操作或服务。可以是一个方法、一个接口、一个数据库查询等。在代码中,我们可以通过 SphU.entry
方法来标记对资源的访问。例如:
Entry entry = null;
try {
entry = SphU.entry("myResource");
// 执行业务逻辑
} catch (BlockException e) {
// 处理被限流或熔断的情况
} finally {
if (entry != null) {
entry.exit();
}
}
这里的 "myResource"
就是一个资源的名称。
3.2 规则(Rule)
规则是 Sentinel 实现流量控制和熔断降级的依据。Sentinel 支持多种类型的规则,主要包括:
- 流量控制规则(FlowRule):用于控制资源的访问流量,例如限制 QPS、并发线程数等。
- 熔断降级规则(DegradeRule):当资源的某个指标(如平均响应时间、异常比例)达到一定阈值时,对资源进行熔断降级。
- 系统保护规则(SystemRule):根据系统的整体负载情况,对所有资源进行统一的流量控制。
- 热点参数规则(ParamFlowRule):对资源的热点参数进行流量控制。
3.3 入口(Entry)
Entry
表示一次资源访问的入口,每个 Entry
对应一个资源的访问。在进入资源访问时,会创建一个 Entry
对象,并在访问结束时释放该对象。Entry
对象包含了资源的相关信息,如资源名称、入口类型等。
3.4 上下文(Context)
上下文用于存储一次请求的相关信息,包括请求的来源、调用链等。每个请求都会关联一个上下文,Entry
对象也会与上下文进行绑定。上下文的主要作用是在处理请求的过程中传递信息,确保请求在不同的处理环节中能够正确处理。
3.5 节点(Node)
节点用于记录资源的统计信息,如 QPS、响应时间、异常数等。Sentinel 中存在多种类型的节点,主要包括:
- DefaultNode:记录某个资源在当前线程中的统计信息。
- ClusterNode:记录某个资源在整个集群中的统计信息。
- StatisticNode:是
DefaultNode
和ClusterNode
的基类,负责具体的统计逻辑。
四、源码结构分析
4.1 核心模块(core)
核心模块是 Sentinel 的核心代码所在,包含了流量控制、熔断降级、统计等核心逻辑。主要的包结构如下:
com.alibaba.csp.sentinel.context
:包含上下文相关的类,如Context
、ContextUtil
等。com.alibaba.csp.sentinel.entry
:包含Entry
相关的类,如Entry
、SphU
等。com.alibaba.csp.sentinel.node
:包含节点相关的类,如DefaultNode
、ClusterNode
、StatisticNode
等。com.alibaba.csp.sentinel.slotchain
:包含处理槽链相关的类,如ProcessorSlotChain
、ProcessorSlot
等。com.alibaba.csp.sentinel.slots
:包含各种处理槽的实现,如FlowSlot
、DegradeSlot
、StatisticSlot
等。com.alibaba.csp.sentinel.util
:包含一些工具类,如TimeUtil
、StringUtil
等。
4.2 规则管理模块(rule)
规则管理模块负责规则的存储、加载和更新。主要的包结构如下:
com.alibaba.csp.sentinel.slots.block
:包含规则相关的类,如Rule
、FlowRule
、DegradeRule
等。com.alibaba.csp.sentinel.slots.block.flow
:包含流量控制规则相关的类,如FlowRuleManager
、FlowRuleChecker
等。com.alibaba.csp.sentinel.slots.block.degrade
:包含熔断降级规则相关的类,如DegradeRuleManager
、DegradeRuleChecker
等。com.alibaba.csp.sentinel.slots.block.system
:包含系统保护规则相关的类,如SystemRuleManager
、SystemRuleChecker
等。com.alibaba.csp.sentinel.slots.block.param
:包含热点参数规则相关的类,如ParamFlowRuleManager
、ParamFlowRuleChecker
等。
4.3 传输模块(transport)
传输模块用于规则的推送和拉取,支持与控制台的通信。主要的包结构如下:
com.alibaba.csp.sentinel.transport
:包含传输相关的基类和接口。com.alibaba.csp.sentinel.transport.command
:包含命令相关的类,用于与控制台进行交互。com.alibaba.csp.sentinel.transport.config
:包含传输配置相关的类。com.alibaba.csp.sentinel.transport.heartbeat
:包含心跳相关的类,用于保持与控制台的连接。com.alibaba.csp.sentinel.transport.netty
:包含基于 Netty 实现的传输模块。
4.4 控制台模块(dashboard)
控制台模块提供可视化的管理界面,方便用户配置规则、查看统计信息和监控系统状态。主要的包结构如下:
com.alibaba.csp.sentinel.dashboard
:包含控制台的主类和配置类。com.alibaba.csp.sentinel.dashboard.controller
:包含控制器类,处理用户的请求。com.alibaba.csp.sentinel.dashboard.domain
:包含数据模型类,如规则信息、统计信息等。com.alibaba.csp.sentinel.dashboard.repository
:包含数据存储相关的类,如规则存储、统计信息存储等。com.alibaba.csp.sentinel.dashboard.service
:包含服务类,处理业务逻辑。
五、核心流程解析
5.1 资源访问流程
资源访问是 Sentinel 的核心流程之一,主要涉及 SphU.entry
方法和 entry.exit
方法。下面详细分析这个流程:
5.1.1 SphU.entry
方法
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
SphU.entry
方法是进入资源访问的入口,它会调用 Env.sph.entry
方法。Env.sph
是 Sph
接口的实现类,通常是 CtSph
。
public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(null, null, context);
}
if (context == null) {
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 省略部分代码
ProcessorSlotChain chain = lookProcessChain(resourceWrapper);
if (chain == null) {
chain = SlotChainProvider.newSlotChain();
if (chain != null) {
lookProcessChainMap.putIfAbsent(resourceWrapper, chain);
}
}
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException ex) {
e.exit(count, args);
throw ex;
} catch (Throwable ex) {
RecordLog.info("Sentinel unexpected exception", ex);
}
return e;
}
该方法的主要步骤如下:
- 获取当前请求的上下文
Context
,如果上下文为空,则创建一个默认的上下文。 - 查找或创建处理槽链
ProcessorSlotChain
,处理槽链是一个由多个ProcessorSlot
组成的链表,每个ProcessorSlot
负责不同的处理逻辑。 - 创建一个
CtEntry
对象,表示一次资源访问的入口。 - 调用处理槽链的
entry
方法,将请求传递给处理槽链进行处理。 - 如果在处理过程中抛出
BlockException
异常,表示请求被限流或熔断,调用entry.exit
方法退出资源访问,并抛出异常。
5.1.2 entry.exit
方法
public void exit(int count, Object... args) throws ErrorEntryFreeException {
if (context != null) {
if (context instanceof NullContext) {
return;
}
if (this.resourceWrapper == null) {
RecordLog.info("Trying to exit null resource");
return;
}
if (curexit > 0) {
throw new ErrorEntryFreeException("The entry has been released!");
}
curexit++;
try {
this.slotsChain.exit(context, this.resourceWrapper, count, args);
} catch (Throwable e) {
RecordLog.warn("Exception occurred when exiting entry", e);
} finally {
// 省略部分代码
}
}
}
entry.exit
方法用于退出资源访问,它会调用处理槽链的 exit
方法,将请求传递给处理槽链进行处理,处理槽链会依次执行各个 ProcessorSlot
的 exit
方法,完成资源的释放和统计信息的更新。
5.2 处理槽链(ProcessorSlotChain)
处理槽链是 Sentinel 的核心处理流程,它由一系列的 ProcessorSlot
组成,每个 ProcessorSlot
负责不同的处理逻辑。常见的处理槽包括:
StatisticSlot
:负责统计请求的流量信息,如 QPS、响应时间、异常数等。FlowSlot
:负责流量控制,根据流量控制规则判断请求是否可以通过。DegradeSlot
:负责熔断降级,根据熔断降级规则判断资源是否需要熔断。SystemSlot
:负责系统保护,根据系统保护规则对所有资源进行统一的流量控制。AuthoritySlot
:负责黑白名单控制,根据黑白名单规则判断请求是否允许访问。
5.2.1 处理槽链的创建
处理槽链的创建是通过 SlotChainProvider.newSlotChain
方法实现的:
public static ProcessorSlotChain newSlotChain() {
if (builder != null) {
return builder.build();
}
resolveSlotChainBuilder();
if (builder == null) {
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
builder = new DefaultSlotChainBuilder();
}
return builder.build();
}
该方法会先检查是否已经有处理槽链构建器 builder
,如果没有则调用 resolveSlotChainBuilder
方法进行解析。默认情况下,使用 DefaultSlotChainBuilder
来构建处理槽链。
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new SystemSlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
DefaultSlotChainBuilder
会依次添加各个处理槽到处理槽链中,形成一个完整的处理流程。
5.2.2 处理槽链的执行
处理槽链的执行是通过调用 ProcessorSlotChain.entry
方法实现的:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
if (this.head.next != null) {
this.head.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
该方法会将请求传递给处理槽链的第一个处理槽的 transformEntry
方法,然后每个处理槽会依次调用下一个处理槽的 transformEntry
方法,直到最后一个处理槽。在处理过程中,如果某个处理槽判断请求不通过,会抛出 BlockException
异常,中断处理流程。
5.3 规则管理流程
规则管理是 Sentinel 的重要功能之一,主要涉及规则的存储、加载和更新。下面详细分析规则管理的流程:
5.3.1 规则的存储
Sentinel 支持多种规则源,如内存、文件、远程配置中心等。不同类型的规则有对应的规则管理器,如 FlowRuleManager
、DegradeRuleManager
等。规则管理器会将规则存储在内存中,例如 FlowRuleManager
中的规则存储在 flowRules
列表中:
private static volatile List<FlowRule> flowRules = new ArrayList<>();
5.3.2 规则的加载
规则的加载可以通过代码静态配置,也可以通过控制台动态配置。代码静态配置的示例如下:
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("myResource");
rule.setCount(10); // 限流阈值
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 限流模式为 QPS
rules.add(rule);
FlowRuleManager.loadRules(rules);
FlowRuleManager.loadRules
方法会将规则列表加载到内存中,并更新规则信息。
5.3.3 规则的更新
规则的更新可以通过控制台动态推送,也可以通过代码重新加载规则。当规则更新时,规则管理器会更新内存中的规则信息,并通知相关的处理槽进行规则的重新加载。例如,FlowRuleManager
中的 updateRules
方法:
public static void updateRules(List<FlowRule> rules) {
flowRules = rules;
for (FlowRuleListener l : listeners) {
l.configUpdate(rules);
}
}
该方法会更新 flowRules
列表,并通知所有的规则监听器 FlowRuleListener
进行规则的更新。
六、统计机制解析
6.1 滑动窗口算法
Sentinel 采用滑动窗口算法进行实时统计,能够高效地记录和计算请求的流量信息。滑动窗口算法的基本思想是将时间划分为多个固定大小的窗口,每个窗口记录一段时间内的请求信息,随着时间的推移,窗口会不断滑动,保证统计结果的实时性。
6.2 统计节点
6.2.1 StatisticNode
StatisticNode
是 DefaultNode
和 ClusterNode
的基类,负责具体的统计逻辑。它内部维护了多个 ArrayMetric
对象,用于存储不同时间粒度的统计数据。以下是 StatisticNode
的部分关键代码:
public class StatisticNode implements Node {
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
@Override
public void incrementPassQps() {
rollingCounterInSecond.addPass(1);
rollingCounterInMinute.addPass(1);
}
@Override
public void incrementBlockQps() {
rollingCounterInSecond.addBlock(1);
rollingCounterInMinute.addBlock(1);
}
@Override
public long totalRequest() {
return rollingCounterInSecond.totalRequest() + rollingCounterInMinute.totalRequest();
}
// 其他统计方法...
}
在上述代码中:
rollingCounterInSecond
用于记录秒级的统计数据,它将 1 秒划分为多个小窗口(窗口数量由SampleCountProperty.SAMPLE_COUNT
决定),每个窗口的时间间隔为1000 / SampleCountProperty.SAMPLE_COUNT
毫秒。rollingCounterInMinute
用于记录分钟级的统计数据,将 1 分钟划分为 60 个窗口,每个窗口为 1 秒。
当有请求通过或被阻塞时,StatisticNode
会调用相应的 addPass
或 addBlock
方法更新统计数据。同时,提供了一系列方法用于获取不同维度的统计信息,如总请求数、通过请求数、阻塞请求数等。
6.2.2 DefaultNode
DefaultNode
继承自 StatisticNode
,用于记录某个资源在当前线程中的统计信息。它还维护了一个调用关系,记录了该资源的调用来源。以下是 DefaultNode
的部分代码:
public class DefaultNode extends StatisticNode {
private volatile String origin;
private ClusterNode clusterNode;
public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) {
super();
this.id = id;
this.clusterNode = clusterNode;
}
@Override
public void incrementPassQps() {
super.incrementPassQps();
this.clusterNode.incrementPassQps();
}
@Override
public void incrementBlockQps() {
super.incrementBlockQps();
this.clusterNode.incrementBlockQps();
}
// 其他方法...
}
在 DefaultNode
中,当更新统计信息时,除了更新自身的统计数据外,还会调用 ClusterNode
的相应方法更新集群级别的统计信息。
6.2.3 ClusterNode
ClusterNode
同样继承自 StatisticNode
,用于记录某个资源在整个集群中的统计信息。它是所有 DefaultNode
共享的统计节点,能够汇总各个节点的统计数据。以下是 ClusterNode
的部分代码:
public class ClusterNode extends StatisticNode {
private final AtomicLong totalException = new AtomicLong(0);
@Override
public void addException(int count) {
totalException.addAndGet(count);
super.addException(count);
}
public long totalException() {
return totalException.get();
}
// 其他方法...
}
ClusterNode
除了继承 StatisticNode
的统计功能外,还额外记录了总异常数等信息,方便对整个集群的资源状态进行监控和分析。
6.3 滑动窗口的实现(ArrayMetric
)
ArrayMetric
是 Sentinel 中滑动窗口的具体实现类,它使用数组来存储滑动窗口的统计数据。以下是 ArrayMetric
的部分关键代码:
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
@Override
public long pass() {
data.currentWindow();
long pass = 0;
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
// 其他统计方法...
}
在上述代码中:
LeapArray
是滑动窗口的核心数据结构,它维护了一个数组,数组中的每个元素是一个WindowWrap
对象,WindowWrap
包含了一个MetricBucket
对象,用于存储该窗口的统计信息。currentWindow
方法用于获取当前时间对应的窗口,如果该窗口不存在则创建一个新的窗口。addPass
方法用于在当前窗口中增加通过请求的数量。pass
方法用于计算所有有效窗口中通过请求的总数。
通过这种方式,ArrayMetric
能够高效地实现滑动窗口的统计功能,并且可以根据不同的时间粒度进行统计。
七、流量控制实现原理
7.1 流量控制规则(FlowRule
)
流量控制规则 FlowRule
是实现流量控制的依据,它包含了多个属性,用于定义流量控制的策略。以下是 FlowRule
的部分属性:
public class FlowRule extends AbstractRule {
private String resource;
private int grade = RuleConstant.FLOW_GRADE_QPS;
private double count;
private int strategy = RuleConstant.STRATEGY_DIRECT;
private String refResource;
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int warmUpPeriodSec = 10;
private int maxQueueingTimeMs = 500;
// Getters and Setters...
}
各属性的含义如下:
resource
:要进行流量控制的资源名称。grade
:限流模式,支持 QPS 限流(RuleConstant.FLOW_GRADE_QPS
)和并发线程数限流(RuleConstant.FLOW_GRADE_THREAD
)。count
:限流阈值,例如 QPS 的上限或并发线程数的上限。strategy
:限流策略,支持直接限流(RuleConstant.STRATEGY_DIRECT
)、关联限流(RuleConstant.STRATEGY_RELATE
)和链路限流(RuleConstant.STRATEGY_CHAIN
)。refResource
:关联资源或入口资源的名称,根据不同的限流策略使用。controlBehavior
:流量控制行为,支持直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT
)、预热(RuleConstant.CONTROL_BEHAVIOR_WARM_UP
)和匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER
)。warmUpPeriodSec
:预热时间,仅在预热模式下有效。maxQueueingTimeMs
:最大排队时间,仅在匀速排队模式下有效。
7.2 流量控制检查(FlowSlot
)
FlowSlot
是处理流量控制的核心处理槽,它会根据流量控制规则对请求进行检查。以下是 FlowSlot
的 entry
方法的部分代码:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
在 entry
方法中,首先调用 checkFlow
方法进行流量控制检查,如果检查通过,则调用 fireEntry
方法将请求传递给下一个处理槽。checkFlow
方法会调用 FlowRuleChecker
的 checkFlow
方法进行具体的规则检查。
7.3 流量控制策略实现
7.3.1 直接限流
直接限流是最常见的限流策略,根据资源的当前 QPS 或并发线程数与限流阈值进行比较,如果超过阈值则拒绝请求。以下是直接限流的部分检查代码:
private static boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
在上述代码中,canPassCheck
方法会根据规则的配置选择本地检查或集群检查。passLocalCheck
方法会根据限流策略选择合适的统计节点,然后调用规则的 Rater
(评估器)的 canPass
方法进行检查。
7.3.2 关联限流
关联限流是指当关联资源的流量达到一定阈值时,对当前资源进行限流。例如,当某个数据库的查询请求过多时,限制对该数据库的写入请求。关联限流的实现主要通过 selectNodeByRequesterAndStrategy
方法选择关联资源的统计节点进行检查。
7.3.3 链路限流
链路限流是指只对指定链路入口的请求进行限流。例如,只对从某个特定接口进入的请求进行限流。链路限流的实现同样通过 selectNodeByRequesterAndStrategy
方法选择合适的统计节点进行检查。
7.4 流量控制行为实现
7.4.1 直接拒绝
直接拒绝是最简单的流量控制行为,当请求超过限流阈值时,直接抛出 FlowException
异常,拒绝请求。以下是直接拒绝的部分代码:
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
在上述代码中,如果当前使用的令牌数加上请求的令牌数超过阈值,且请求不是优先请求,则直接返回 false
,表示请求被拒绝。
7.4.2 预热
预热模式主要用于应对系统在刚启动时,由于资源未充分预热而导致的性能问题。在预热期间,限流阈值会逐渐增加,直到达到正常阈值。以下是预热模式的部分代码:
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 省略部分代码
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 计算斜率
double warningQps = Math.nextUp(count / (1.0 + aboveToken * coldFactor / warningToken));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
在上述代码中,syncToken
方法用于同步令牌数量,根据当前存储的令牌数和警告令牌数的关系,计算出不同的限流阈值进行检查。
7.4.3 匀速排队
匀速排队模式会将超过阈值的请求放入队列中,按照固定的速率依次处理。以下是匀速排队模式的部分代码:
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 省略部分代码
long currentTime = TimeUtil.currentTimeMillis();
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// 可以立即通过
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime + costTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// 排队等待
sleep(waitTime);
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
在上述代码中,根据当前请求需要的令牌数和限流阈值计算出预计处理时间,如果预计处理时间在当前时间之后,则计算等待时间,若等待时间不超过最大排队时间,则将请求放入队列中等待处理。
八、熔断降级实现原理
8.1 熔断降级规则(DegradeRule
)
熔断降级规则 DegradeRule
用于在资源的某个指标(如平均响应时间、异常比例)达到一定阈值时,对资源进行熔断降级。以下是 DegradeRule
的部分属性:
收起
java
public class DegradeRule extends AbstractRule {
private String resource;
private int grade = RuleConstant.DEGRADE_GRADE_RT;
private double count;
private int timeWindow;
private int minRequestAmount = DegradeRuleConstant.DEFAULT_MIN_REQUEST_AMOUNT;
private int statIntervalMs = 1000;
// Getters and Setters...
}
各属性的含义如下:
resource
:要进行熔断降级的资源名称。grade
:熔断策略,支持平均响应时间熔断(RuleConstant.DEGRADE_GRADE_RT
)和异常比例熔断(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO
)。count
:熔断阈值,例如平均响应时间的上限或异常比例的上限。timeWindow
:熔断时长,单位为秒。minRequestAmount
:最小请求数,只有当请求数达到该值时才会进行熔断判断。statIntervalMs
:统计时间间隔,单位为毫秒。
8.2 熔断降级检查(DegradeSlot
)
DegradeSlot
是处理熔断降级的核心处理槽,它会根据熔断降级规则对请求进行检查。以下是 DegradeSlot
的 entry
方法的部分代码:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<DegradeRule> rules = DegradeRuleManager.getRulesForResource(r.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, r)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
在 entry
方法中,首先调用 performChecking
方法进行熔断降级检查,如果检查通过,则调用 fireEntry
方法将请求传递给下一个处理槽。
8.2 熔断降级检查(DegradeSlot
)(续)
performChecking
方法会从 DegradeRuleManager
中获取该资源的所有熔断降级规则,并依次对这些规则进行检查。对于每条规则,调用其 passCheck
方法判断当前请求是否可以通过。如果某条规则检查不通过,则抛出 DegradeException
异常,表明该资源已被熔断。
8.3 熔断策略实现
8.3.1 平均响应时间熔断(DEGRADE_GRADE_RT
)
当资源的平均响应时间超过设定的阈值时,会触发熔断。以下是平均响应时间熔断规则的检查逻辑:
@Override
public boolean passCheck(Context context, ResourceWrapper r) {
if (circuitBreaker == null) {
initCircuitBreaker();
}
return circuitBreaker.tryPass(context);
}
private void initCircuitBreaker() {
if (this.grade == RuleConstant.DEGRADE_GRADE_RT) {
circuitBreaker = new ResponseTimeCircuitBreaker(this);
} else if (this.grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
circuitBreaker = new ExceptionRatioCircuitBreaker(this);
} else if (this.grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
circuitBreaker = new ExceptionCountCircuitBreaker(this);
}
}
在 initCircuitBreaker
方法中,根据规则的熔断策略创建相应的熔断器。对于平均响应时间熔断,会创建 ResponseTimeCircuitBreaker
。以下是 ResponseTimeCircuitBreaker
的部分关键代码:
public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {
private final int minRequestAmount;
private final double maxAllowedRt;
public ResponseTimeCircuitBreaker(DegradeRule rule) {
super(rule);
this.minRequestAmount = rule.getMinRequestAmount();
this.maxAllowedRt = rule.getCount();
}
@Override
public boolean tryPass(Context context) {
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}
@Override
protected void onRequestComplete(Context context) {
DefaultNode node = (DefaultNode) context.getCurNode();
double rt = node.avgRt();
if (node.passQps() < minRequestAmount) {
return;
}
if (rt > maxAllowedRt) {
triggerBreak();
}
}
}
在 tryPass
方法中,会根据熔断器的当前状态判断请求是否可以通过。如果熔断器处于关闭状态(CLOSED
),则请求可以直接通过;如果处于打开状态(OPEN
),则检查是否达到重试超时时间,如果达到则尝试从打开状态转换为半开状态。
在 onRequestComplete
方法中,会在请求完成后获取该资源的平均响应时间 rt
,如果请求数达到最小请求数且平均响应时间超过阈值,则触发熔断,将熔断器状态从关闭状态转换为打开状态。
8.3.2 异常比例熔断(DEGRADE_GRADE_EXCEPTION_RATIO
)
当资源的异常比例超过设定的阈值时,会触发熔断。对于异常比例熔断,会创建 ExceptionRatioCircuitBreaker
。以下是其部分关键代码:
public class ExceptionRatioCircuitBreaker extends AbstractCircuitBreaker {
private final int minRequestAmount;
private final double maxExceptionRatio;
public ExceptionRatioCircuitBreaker(DegradeRule rule) {
super(rule);
this.minRequestAmount = rule.getMinRequestAmount();
this.maxExceptionRatio = rule.getCount();
}
@Override
protected void onRequestComplete(Context context) {
DefaultNode node = (DefaultNode) context.getCurNode();
int totalCount = node.totalRequest();
int exceptionCount = node.exceptionQps();
if (totalCount < minRequestAmount) {
return;
}
double exceptionRatio = (double) exceptionCount / totalCount;
if (exceptionRatio > maxExceptionRatio) {
triggerBreak();
}
}
}
在 onRequestComplete
方法中,会在请求完成后获取该资源的总请求数和异常请求数,计算异常比例。如果请求数达到最小请求数且异常比例超过阈值,则触发熔断。
8.3.3 异常数熔断(DEGRADE_GRADE_EXCEPTION_COUNT
)
当资源的异常数超过设定的阈值时,会触发熔断。对于异常数熔断,会创建 ExceptionCountCircuitBreaker
。其实现逻辑与异常比例熔断类似,只是判断条件为异常数是否超过阈值。
8.4 熔断器状态转换
熔断器有三种状态:关闭(CLOSED
)、打开(OPEN
)和半开(HALF_OPEN
),不同状态之间会根据一定的条件进行转换。
8.4.1 关闭状态(CLOSED
)
在关闭状态下,熔断器允许所有请求通过,正常统计资源的各项指标(如平均响应时间、异常比例等)。当触发熔断条件时,熔断器从关闭状态转换为打开状态。
8.4.2 打开状态(OPEN
)
在打开状态下,熔断器会拒绝所有请求,此时资源处于熔断状态。当达到设定的熔断时长后,熔断器会尝试从打开状态转换为半开状态。
8.4.3 半开状态(HALF_OPEN
)
在半开状态下,熔断器会允许部分请求通过,用于试探资源是否已经恢复正常。如果通过的请求中没有再次触发熔断条件,则将熔断器状态转换为关闭状态,资源恢复正常;如果再次触发熔断条件,则将熔断器状态重新转换为打开状态。
以下是熔断器状态转换的部分代码示例:
protected boolean fromOpenToHalfOpen(Context context) {
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
entryPassed.set(0);
return true;
}
return false;
}
protected void fromHalfOpenToOpen(int errorCount) {
if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
resetStat();
nextRetryTimestamp.set(TimeUtil.currentTimeMillis() + rule.getTimeWindow() * 1000L);
}
}
protected void fromHalfOpenToClose() {
if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
resetStat();
}
}
九、系统保护实现原理
9.1 系统保护规则(SystemRule
)
系统保护规则 SystemRule
用于根据系统的整体负载情况,对所有资源进行统一的流量控制。以下是 SystemRule
的部分属性:
public class SystemRule extends AbstractRule {
private double highestSystemLoad = -1;
private double avgRt = -1;
private int maxThread = -1;
private double qps = -1;
private double highestCpuUsage = -1;
// Getters and Setters...
}
各属性的含义如下:
highestSystemLoad
:系统最大负载,当系统负载超过该值时进行流量控制。avgRt
:平均响应时间上限,当系统平均响应时间超过该值时进行流量控制。maxThread
:最大线程数,当系统线程数超过该值时进行流量控制。qps
:系统最大 QPS,当系统 QPS 超过该值时进行流量控制。highestCpuUsage
:系统最大 CPU 使用率,当系统 CPU 使用率超过该值时进行流量控制。
9.2 系统保护检查(SystemSlot
)
SystemSlot
是处理系统保护的核心处理槽,它会根据系统保护规则对请求进行检查。以下是 SystemSlot
的 entry
方法的部分代码:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
在 entry
方法中,首先调用 SystemRuleManager.checkSystem
方法进行系统保护检查,如果检查通过,则调用 fireEntry
方法将请求传递给下一个处理槽。
9.3 系统保护逻辑实现
SystemRuleManager.checkSystem
方法会遍历所有的系统保护规则,对系统的各项指标(如系统负载、平均响应时间、线程数、QPS、CPU 使用率等)进行检查。以下是部分检查逻辑代码:
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
if (!checkSystemStatus()) {
throw new SystemBlockException(resourceWrapper.getName());
}
}
private static boolean checkSystemStatus() {
double currentLoad = SystemStatus.getLoadAverage();
double currentQps = Constants.ENTRY_NODE.successQps() + Constants.ENTRY_NODE.blockQps();
double currentAvgRt = Constants.ENTRY_NODE.avgRt();
int currentThread = Constants.ENTRY_NODE.curThreadNum();
double currentCpuUsage = SystemStatus.getCpuUsage();
for (SystemRule rule : getRules()) {
if (rule.getHighestSystemLoad() >= 0 && currentLoad > rule.getHighestSystemLoad()) {
return false;
}
if (rule.getAvgRt() >= 0 && currentAvgRt > rule.getAvgRt()) {
return false;
}
if (rule.getMaxThread() >= 0 && currentThread > rule.getMaxThread()) {
return false;
}
if (rule.getQps() >= 0 && currentQps > rule.getQps()) {
return false;
}
if (rule.getHighestCpuUsage() >= 0 && currentCpuUsage > rule.getHighestCpuUsage()) {
return false;
}
}
return true;
}
在 checkSystemStatus
方法中,会获取系统的当前负载、QPS、平均响应时间、线程数和 CPU 使用率等指标,然后与系统保护规则中的阈值进行比较。如果任何一个指标超过阈值,则认为系统处于过载状态,返回 false
,表示请求会被系统保护机制拦截。
十、热点参数规则实现原理
10.1 热点参数规则(ParamFlowRule
)
热点参数规则 ParamFlowRule
用于对资源的热点参数进行流量控制。以下是 ParamFlowRule
的部分属性:
public class ParamFlowRule extends AbstractRule {
private String resource;
private int paramIdx;
private double count;
private int durationInSec = 1;
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
private int maxQueueingTimeMs = 500;
private Map<Object, Integer> specificCountMap = new HashMap<>();
// Getters and Setters...
}
各属性的含义如下:
resource
:要进行热点参数限流的资源名称。paramIdx
:需要进行限流的参数索引,从 0 开始。count
:限流阈值,对于大部分参数的限流值。durationInSec
:统计时间窗口,单位为秒。controlBehavior
:流量控制行为,与流量控制规则中的含义相同。maxQueueingTimeMs
:最大排队时间,仅在匀速排队模式下有效。specificCountMap
:特定参数的限流值映射,对于某些特殊参数可以设置不同的限流阈值。
10.2 热点参数检查(ParamFlowSlot
)
ParamFlowSlot
是处理热点参数规则的核心处理槽,它会根据热点参数规则对请求的参数进行检查。以下是 ParamFlowSlot
的 entry
方法的部分代码:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
checkFlow(resourceWrapper, count, args);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
private void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesForResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
在 entry
方法中,首先检查该资源是否有热点参数规则,如果有则调用 checkFlow
方法进行检查。checkFlow
方法会遍历该资源的所有热点参数规则,对请求的参数进行检查。如果某个规则检查不通过,则抛出 ParamFlowException
异常。
10.3 热点参数限流逻辑实现
ParamFlowChecker.passCheck
方法会根据热点参数规则对请求的参数进行限流检查。以下是部分检查逻辑代码:
public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object... args) {
if (args == null || args.length <= rule.getParamIdx()) {
return true;
}
Object value = args[rule.getParamIdx()];
double curCount = getCurCount(rule, value);
if (curCount + count > getCount(rule, value)) {
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passRateLimiterCheck(rule, value, count);
}
return false;
}
return true;
}
private static double getCurCount(ParamFlowRule rule, Object value) {
ParameterMetric metric = ParamFlowMetricStorage.getParamMetric(rule.getResource());
if (metric == null) {
return 0;
}
return metric.getCurCount(rule.getParamIdx(), value);
}
private static double getCount(ParamFlowRule rule, Object value) {
Integer specificCount = rule.getSpecificCountMap().get(value);
if (specificCount != null) {
return specificCount;
}
return rule.getCount();
}
在 passCheck
方法中,首先获取请求中需要进行限流的参数值,然后调用 getCurCount
方法获取该参数的当前请求数,调用 getCount
方法获取该参数的限流阈值。如果当前请求数加上本次请求数超过限流阈值,则根据流量控制行为进行相应的处理。如果是匀速排队模式,则调用 passRateLimiterCheck
方法进行排队检查;否则,直接返回 false
,表示请求被限流。