当前位置: 首页 > article >正文

Sentinel 源码深度解析

一、引言

长路漫漫~

二、Sentinel 整体架构概述

2.1 架构设计目标

Sentinel 的设计旨在为分布式系统提供全面、灵活且高效的流量治理解决方案。具体目标包括:

  • 高可用性:确保在高并发场景下,系统能够稳定运行,不会因为流量过大而崩溃。
  • 灵活性:支持多种流量控制规则和熔断降级策略,能够根据不同的业务场景进行灵活配置。
  • 可扩展性:采用模块化和插件化的设计,方便用户扩展和定制功能。
  • 实时性:能够实时监测和统计流量信息,及时做出流量控制和熔断降级决策。

2.2 整体架构图

Sentinel 的整体架构主要由以下几个部分组成:

  • 核心库:包含 Sentinel 的核心逻辑,如流量控制、熔断降级、统计等。
  • 规则管理模块:负责规则的存储、加载和更新,支持多种规则源,如内存、文件、远程配置中心等。
  • 传输模块:用于规则的推送和拉取,支持与控制台的通信。
  • 控制台:提供可视化的管理界面,方便用户配置规则、查看统计信息和监控系统状态。
  • 扩展模块:允许用户根据自己的需求开发自定义的插件,如自定义规则检查器、统计器等。
+-------------------+
|      控制台       |
+-------------------+
       |
       | 规则推送/拉取
       v
+-------------------+
|    传输模块       |
+-------------------+
       |
       | 规则加载/更新
       v
+-------------------+
|   规则管理模块    |
+-------------------+
       |
       | 规则应用
       v
+-------------------+
|     核心库        |
|  - 流量控制       |
|  - 熔断降级       |
|  - 统计          |
+-------------------+
       |
       | 自定义扩展
       v
+-------------------+
|    扩展模块       |
+-------------------+

三、核心概念解析

3.1 资源(Resource)

资源是 Sentinel 中最基本的概念,它代表了系统中的一个可被保护的操作或服务。可以是一个方法、一个接口、一个数据库查询等。在代码中,我们可以通过 SphU.entry 方法来标记对资源的访问。例如:

Entry entry = null;
try {
    entry = SphU.entry("myResource");
    // 执行业务逻辑
} catch (BlockException e) {
    // 处理被限流或熔断的情况
} finally {
    if (entry != null) {
        entry.exit();
    }
}

这里的 "myResource" 就是一个资源的名称。

3.2 规则(Rule)

规则是 Sentinel 实现流量控制和熔断降级的依据。Sentinel 支持多种类型的规则,主要包括:

  • 流量控制规则(FlowRule):用于控制资源的访问流量,例如限制 QPS、并发线程数等。
  • 熔断降级规则(DegradeRule):当资源的某个指标(如平均响应时间、异常比例)达到一定阈值时,对资源进行熔断降级。
  • 系统保护规则(SystemRule):根据系统的整体负载情况,对所有资源进行统一的流量控制。
  • 热点参数规则(ParamFlowRule):对资源的热点参数进行流量控制。

3.3 入口(Entry)

Entry 表示一次资源访问的入口,每个 Entry 对应一个资源的访问。在进入资源访问时,会创建一个 Entry 对象,并在访问结束时释放该对象。Entry 对象包含了资源的相关信息,如资源名称、入口类型等。

3.4 上下文(Context)

上下文用于存储一次请求的相关信息,包括请求的来源、调用链等。每个请求都会关联一个上下文,Entry 对象也会与上下文进行绑定。上下文的主要作用是在处理请求的过程中传递信息,确保请求在不同的处理环节中能够正确处理。

3.5 节点(Node)

节点用于记录资源的统计信息,如 QPS、响应时间、异常数等。Sentinel 中存在多种类型的节点,主要包括:

  • DefaultNode:记录某个资源在当前线程中的统计信息。
  • ClusterNode:记录某个资源在整个集群中的统计信息。
  • StatisticNode:是 DefaultNode 和 ClusterNode 的基类,负责具体的统计逻辑。

四、源码结构分析

4.1 核心模块(core)

核心模块是 Sentinel 的核心代码所在,包含了流量控制、熔断降级、统计等核心逻辑。主要的包结构如下:

  • com.alibaba.csp.sentinel.context:包含上下文相关的类,如 ContextContextUtil 等。
  • com.alibaba.csp.sentinel.entry:包含 Entry 相关的类,如 EntrySphU 等。
  • com.alibaba.csp.sentinel.node:包含节点相关的类,如 DefaultNodeClusterNodeStatisticNode 等。
  • com.alibaba.csp.sentinel.slotchain:包含处理槽链相关的类,如 ProcessorSlotChainProcessorSlot 等。
  • com.alibaba.csp.sentinel.slots:包含各种处理槽的实现,如 FlowSlotDegradeSlotStatisticSlot 等。
  • com.alibaba.csp.sentinel.util:包含一些工具类,如 TimeUtilStringUtil 等。

4.2 规则管理模块(rule)

规则管理模块负责规则的存储、加载和更新。主要的包结构如下:

  • com.alibaba.csp.sentinel.slots.block:包含规则相关的类,如 RuleFlowRuleDegradeRule 等。
  • com.alibaba.csp.sentinel.slots.block.flow:包含流量控制规则相关的类,如 FlowRuleManagerFlowRuleChecker 等。
  • com.alibaba.csp.sentinel.slots.block.degrade:包含熔断降级规则相关的类,如 DegradeRuleManagerDegradeRuleChecker 等。
  • com.alibaba.csp.sentinel.slots.block.system:包含系统保护规则相关的类,如 SystemRuleManagerSystemRuleChecker 等。
  • com.alibaba.csp.sentinel.slots.block.param:包含热点参数规则相关的类,如 ParamFlowRuleManagerParamFlowRuleChecker 等。

4.3 传输模块(transport)

传输模块用于规则的推送和拉取,支持与控制台的通信。主要的包结构如下:

  • com.alibaba.csp.sentinel.transport:包含传输相关的基类和接口。
  • com.alibaba.csp.sentinel.transport.command:包含命令相关的类,用于与控制台进行交互。
  • com.alibaba.csp.sentinel.transport.config:包含传输配置相关的类。
  • com.alibaba.csp.sentinel.transport.heartbeat:包含心跳相关的类,用于保持与控制台的连接。
  • com.alibaba.csp.sentinel.transport.netty:包含基于 Netty 实现的传输模块。

4.4 控制台模块(dashboard)

控制台模块提供可视化的管理界面,方便用户配置规则、查看统计信息和监控系统状态。主要的包结构如下:

  • com.alibaba.csp.sentinel.dashboard:包含控制台的主类和配置类。
  • com.alibaba.csp.sentinel.dashboard.controller:包含控制器类,处理用户的请求。
  • com.alibaba.csp.sentinel.dashboard.domain:包含数据模型类,如规则信息、统计信息等。
  • com.alibaba.csp.sentinel.dashboard.repository:包含数据存储相关的类,如规则存储、统计信息存储等。
  • com.alibaba.csp.sentinel.dashboard.service:包含服务类,处理业务逻辑。

五、核心流程解析

5.1 资源访问流程

资源访问是 Sentinel 的核心流程之一,主要涉及 SphU.entry 方法和 entry.exit 方法。下面详细分析这个流程:

5.1.1 SphU.entry 方法
public static Entry entry(String name) throws BlockException {
    return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}

SphU.entry 方法是进入资源访问的入口,它会调用 Env.sph.entry 方法。Env.sph 是 Sph 接口的实现类,通常是 CtSph

public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        return new CtEntry(null, null, context);
    }
    if (context == null) {
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
    // 省略部分代码
    ProcessorSlotChain chain = lookProcessChain(resourceWrapper);
    if (chain == null) {
        chain = SlotChainProvider.newSlotChain();
        if (chain != null) {
            lookProcessChainMap.putIfAbsent(resourceWrapper, chain);
        }
    }
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException ex) {
        e.exit(count, args);
        throw ex;
    } catch (Throwable ex) {
        RecordLog.info("Sentinel unexpected exception", ex);
    }
    return e;
}

该方法的主要步骤如下:

  1. 获取当前请求的上下文 Context,如果上下文为空,则创建一个默认的上下文。
  2. 查找或创建处理槽链 ProcessorSlotChain,处理槽链是一个由多个 ProcessorSlot 组成的链表,每个 ProcessorSlot 负责不同的处理逻辑。
  3. 创建一个 CtEntry 对象,表示一次资源访问的入口。
  4. 调用处理槽链的 entry 方法,将请求传递给处理槽链进行处理。
  5. 如果在处理过程中抛出 BlockException 异常,表示请求被限流或熔断,调用 entry.exit 方法退出资源访问,并抛出异常。
5.1.2 entry.exit 方法
public void exit(int count, Object... args) throws ErrorEntryFreeException {
    if (context != null) {
        if (context instanceof NullContext) {
            return;
        }
        if (this.resourceWrapper == null) {
            RecordLog.info("Trying to exit null resource");
            return;
        }
        if (curexit > 0) {
            throw new ErrorEntryFreeException("The entry has been released!");
        }
        curexit++;
        try {
            this.slotsChain.exit(context, this.resourceWrapper, count, args);
        } catch (Throwable e) {
            RecordLog.warn("Exception occurred when exiting entry", e);
        } finally {
            // 省略部分代码
        }
    }
}

entry.exit 方法用于退出资源访问,它会调用处理槽链的 exit 方法,将请求传递给处理槽链进行处理,处理槽链会依次执行各个 ProcessorSlot 的 exit 方法,完成资源的释放和统计信息的更新。

5.2 处理槽链(ProcessorSlotChain)

处理槽链是 Sentinel 的核心处理流程,它由一系列的 ProcessorSlot 组成,每个 ProcessorSlot 负责不同的处理逻辑。常见的处理槽包括:

  • StatisticSlot:负责统计请求的流量信息,如 QPS、响应时间、异常数等。
  • FlowSlot:负责流量控制,根据流量控制规则判断请求是否可以通过。
  • DegradeSlot:负责熔断降级,根据熔断降级规则判断资源是否需要熔断。
  • SystemSlot:负责系统保护,根据系统保护规则对所有资源进行统一的流量控制。
  • AuthoritySlot:负责黑白名单控制,根据黑白名单规则判断请求是否允许访问。
5.2.1 处理槽链的创建

处理槽链的创建是通过 SlotChainProvider.newSlotChain 方法实现的:

public static ProcessorSlotChain newSlotChain() {
    if (builder != null) {
        return builder.build();
    }
    resolveSlotChainBuilder();
    if (builder == null) {
        RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
        builder = new DefaultSlotChainBuilder();
    }
    return builder.build();
}

该方法会先检查是否已经有处理槽链构建器 builder,如果没有则调用 resolveSlotChainBuilder 方法进行解析。默认情况下,使用 DefaultSlotChainBuilder 来构建处理槽链。

public class DefaultSlotChainBuilder implements SlotChainBuilder {
    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());
        return chain;
    }
}

DefaultSlotChainBuilder 会依次添加各个处理槽到处理槽链中,形成一个完整的处理流程。

5.2.2 处理槽链的执行

处理槽链的执行是通过调用 ProcessorSlotChain.entry 方法实现的:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
    if (this.head.next != null) {
        this.head.next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
    }
}

该方法会将请求传递给处理槽链的第一个处理槽的 transformEntry 方法,然后每个处理槽会依次调用下一个处理槽的 transformEntry 方法,直到最后一个处理槽。在处理过程中,如果某个处理槽判断请求不通过,会抛出 BlockException 异常,中断处理流程。

5.3 规则管理流程

规则管理是 Sentinel 的重要功能之一,主要涉及规则的存储、加载和更新。下面详细分析规则管理的流程:

5.3.1 规则的存储

Sentinel 支持多种规则源,如内存、文件、远程配置中心等。不同类型的规则有对应的规则管理器,如 FlowRuleManagerDegradeRuleManager 等。规则管理器会将规则存储在内存中,例如 FlowRuleManager 中的规则存储在 flowRules 列表中:

private static volatile List<FlowRule> flowRules = new ArrayList<>();
5.3.2 规则的加载

规则的加载可以通过代码静态配置,也可以通过控制台动态配置。代码静态配置的示例如下:

List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("myResource");
rule.setCount(10); // 限流阈值
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // 限流模式为 QPS
rules.add(rule);
FlowRuleManager.loadRules(rules);

FlowRuleManager.loadRules 方法会将规则列表加载到内存中,并更新规则信息。

5.3.3 规则的更新

规则的更新可以通过控制台动态推送,也可以通过代码重新加载规则。当规则更新时,规则管理器会更新内存中的规则信息,并通知相关的处理槽进行规则的重新加载。例如,FlowRuleManager 中的 updateRules 方法:

public static void updateRules(List<FlowRule> rules) {
    flowRules = rules;
    for (FlowRuleListener l : listeners) {
        l.configUpdate(rules);
    }
}

该方法会更新 flowRules 列表,并通知所有的规则监听器 FlowRuleListener 进行规则的更新。

六、统计机制解析

6.1 滑动窗口算法

Sentinel 采用滑动窗口算法进行实时统计,能够高效地记录和计算请求的流量信息。滑动窗口算法的基本思想是将时间划分为多个固定大小的窗口,每个窗口记录一段时间内的请求信息,随着时间的推移,窗口会不断滑动,保证统计结果的实时性。

6.2 统计节点

6.2.1 StatisticNode

StatisticNode 是 DefaultNode 和 ClusterNode 的基类,负责具体的统计逻辑。它内部维护了多个 ArrayMetric 对象,用于存储不同时间粒度的统计数据。以下是 StatisticNode 的部分关键代码:

public class StatisticNode implements Node {

    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);

    @Override
    public void incrementPassQps() {
        rollingCounterInSecond.addPass(1);
        rollingCounterInMinute.addPass(1);
    }

    @Override
    public void incrementBlockQps() {
        rollingCounterInSecond.addBlock(1);
        rollingCounterInMinute.addBlock(1);
    }

    @Override
    public long totalRequest() {
        return rollingCounterInSecond.totalRequest() + rollingCounterInMinute.totalRequest();
    }

    // 其他统计方法...
}

在上述代码中:

  • rollingCounterInSecond 用于记录秒级的统计数据,它将 1 秒划分为多个小窗口(窗口数量由 SampleCountProperty.SAMPLE_COUNT 决定),每个窗口的时间间隔为 1000 / SampleCountProperty.SAMPLE_COUNT 毫秒。
  • rollingCounterInMinute 用于记录分钟级的统计数据,将 1 分钟划分为 60 个窗口,每个窗口为 1 秒。

当有请求通过或被阻塞时,StatisticNode 会调用相应的 addPass 或 addBlock 方法更新统计数据。同时,提供了一系列方法用于获取不同维度的统计信息,如总请求数、通过请求数、阻塞请求数等。

6.2.2 DefaultNode

DefaultNode 继承自 StatisticNode,用于记录某个资源在当前线程中的统计信息。它还维护了一个调用关系,记录了该资源的调用来源。以下是 DefaultNode 的部分代码:

public class DefaultNode extends StatisticNode {

    private volatile String origin;
    private ClusterNode clusterNode;

    public DefaultNode(ResourceWrapper id, ClusterNode clusterNode) {
        super();
        this.id = id;
        this.clusterNode = clusterNode;
    }

    @Override
    public void incrementPassQps() {
        super.incrementPassQps();
        this.clusterNode.incrementPassQps();
    }

    @Override
    public void incrementBlockQps() {
        super.incrementBlockQps();
        this.clusterNode.incrementBlockQps();
    }

    // 其他方法...
}

在 DefaultNode 中,当更新统计信息时,除了更新自身的统计数据外,还会调用 ClusterNode 的相应方法更新集群级别的统计信息。

6.2.3 ClusterNode

ClusterNode 同样继承自 StatisticNode,用于记录某个资源在整个集群中的统计信息。它是所有 DefaultNode 共享的统计节点,能够汇总各个节点的统计数据。以下是 ClusterNode 的部分代码:

public class ClusterNode extends StatisticNode {

    private final AtomicLong totalException = new AtomicLong(0);

    @Override
    public void addException(int count) {
        totalException.addAndGet(count);
        super.addException(count);
    }

    public long totalException() {
        return totalException.get();
    }

    // 其他方法...
}

ClusterNode 除了继承 StatisticNode 的统计功能外,还额外记录了总异常数等信息,方便对整个集群的资源状态进行监控和分析。

6.3 滑动窗口的实现(ArrayMetric

ArrayMetric 是 Sentinel 中滑动窗口的具体实现类,它使用数组来存储滑动窗口的统计数据。以下是 ArrayMetric 的部分关键代码:

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }

    @Override
    public long pass() {
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values();
        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
    }

    // 其他统计方法...
}

在上述代码中:

  • LeapArray 是滑动窗口的核心数据结构,它维护了一个数组,数组中的每个元素是一个 WindowWrap 对象,WindowWrap 包含了一个 MetricBucket 对象,用于存储该窗口的统计信息。
  • currentWindow 方法用于获取当前时间对应的窗口,如果该窗口不存在则创建一个新的窗口。
  • addPass 方法用于在当前窗口中增加通过请求的数量。
  • pass 方法用于计算所有有效窗口中通过请求的总数。

通过这种方式,ArrayMetric 能够高效地实现滑动窗口的统计功能,并且可以根据不同的时间粒度进行统计。

七、流量控制实现原理

7.1 流量控制规则(FlowRule

流量控制规则 FlowRule 是实现流量控制的依据,它包含了多个属性,用于定义流量控制的策略。以下是 FlowRule 的部分属性:

public class FlowRule extends AbstractRule {

    private String resource;
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    private double count;
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    private String refResource;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    private int warmUpPeriodSec = 10;
    private int maxQueueingTimeMs = 500;

    // Getters and Setters...
}

各属性的含义如下:

  • resource:要进行流量控制的资源名称。
  • grade:限流模式,支持 QPS 限流(RuleConstant.FLOW_GRADE_QPS)和并发线程数限流(RuleConstant.FLOW_GRADE_THREAD)。
  • count:限流阈值,例如 QPS 的上限或并发线程数的上限。
  • strategy:限流策略,支持直接限流(RuleConstant.STRATEGY_DIRECT)、关联限流(RuleConstant.STRATEGY_RELATE)和链路限流(RuleConstant.STRATEGY_CHAIN)。
  • refResource:关联资源或入口资源的名称,根据不同的限流策略使用。
  • controlBehavior:流量控制行为,支持直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)、预热(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)和匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)。
  • warmUpPeriodSec:预热时间,仅在预热模式下有效。
  • maxQueueingTimeMs:最大排队时间,仅在匀速排队模式下有效。

7.2 流量控制检查(FlowSlot

FlowSlot 是处理流量控制的核心处理槽,它会根据流量控制规则对请求进行检查。以下是 FlowSlot 的 entry 方法的部分代码:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

在 entry 方法中,首先调用 checkFlow 方法进行流量控制检查,如果检查通过,则调用 fireEntry 方法将请求传递给下一个处理槽。checkFlow 方法会调用 FlowRuleChecker 的 checkFlow 方法进行具体的规则检查。

7.3 流量控制策略实现

7.3.1 直接限流

直接限流是最常见的限流策略,根据资源的当前 QPS 或并发线程数与限流阈值进行比较,如果超过阈值则拒绝请求。以下是直接限流的部分检查代码:

private static boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                    boolean prioritized) {
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

在上述代码中,canPassCheck 方法会根据规则的配置选择本地检查或集群检查。passLocalCheck 方法会根据限流策略选择合适的统计节点,然后调用规则的 Rater(评估器)的 canPass 方法进行检查。

7.3.2 关联限流

关联限流是指当关联资源的流量达到一定阈值时,对当前资源进行限流。例如,当某个数据库的查询请求过多时,限制对该数据库的写入请求。关联限流的实现主要通过 selectNodeByRequesterAndStrategy 方法选择关联资源的统计节点进行检查。

7.3.3 链路限流

链路限流是指只对指定链路入口的请求进行限流。例如,只对从某个特定接口进入的请求进行限流。链路限流的实现同样通过 selectNodeByRequesterAndStrategy 方法选择合适的统计节点进行检查。

7.4 流量控制行为实现

7.4.1 直接拒绝

直接拒绝是最简单的流量控制行为,当请求超过限流阈值时,直接抛出 FlowException 异常,拒绝请求。以下是直接拒绝的部分代码:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    int curCount = avgUsedTokens(node);
    if (curCount + acquireCount > count) {
        if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
            long currentTime;
            long waitInMs;
            currentTime = TimeUtil.currentTimeMillis();
            waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
            if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);
                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
            }
        }
        return false;
    }
    return true;
}

在上述代码中,如果当前使用的令牌数加上请求的令牌数超过阈值,且请求不是优先请求,则直接返回 false,表示请求被拒绝。

7.4.2 预热

预热模式主要用于应对系统在刚启动时,由于资源未充分预热而导致的性能问题。在预热期间,限流阈值会逐渐增加,直到达到正常阈值。以下是预热模式的部分代码:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    long passQps = (long) node.passQps();
    long previousQps = (long) node.previousPassQps();
    syncToken(previousQps);
    // 省略部分代码
    long restToken = storedTokens.get();
    if (restToken >= warningToken) {
        long aboveToken = restToken - warningToken;
        // 计算斜率
        double warningQps = Math.nextUp(count / (1.0 + aboveToken * coldFactor / warningToken));
        if (passQps + acquireCount <= warningQps) {
            return true;
        }
    } else {
        if (passQps + acquireCount <= count) {
            return true;
        }
    }
    return false;
}

在上述代码中,syncToken 方法用于同步令牌数量,根据当前存储的令牌数和警告令牌数的关系,计算出不同的限流阈值进行检查。

7.4.3 匀速排队

匀速排队模式会将超过阈值的请求放入队列中,按照固定的速率依次处理。以下是匀速排队模式的部分代码:

@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // 省略部分代码
    long currentTime = TimeUtil.currentTimeMillis();
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
    long expectedTime = costTime + latestPassedTime.get();
    if (expectedTime <= currentTime) {
        // 可以立即通过
        latestPassedTime.set(currentTime);
        return true;
    } else {
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
        if (waitTime > maxQueueingTimeMs) {
            return false;
        } else {
            long oldTime = latestPassedTime.addAndGet(costTime);
            try {
                waitTime = oldTime + costTime - TimeUtil.currentTimeMillis();
                if (waitTime > maxQueueingTimeMs) {
                    latestPassedTime.addAndGet(-costTime);
                    return false;
                }
                // 排队等待
                sleep(waitTime);
                return true;
            } catch (InterruptedException e) {
            }
        }
    }
    return false;
}

在上述代码中,根据当前请求需要的令牌数和限流阈值计算出预计处理时间,如果预计处理时间在当前时间之后,则计算等待时间,若等待时间不超过最大排队时间,则将请求放入队列中等待处理。

八、熔断降级实现原理

8.1 熔断降级规则(DegradeRule

熔断降级规则 DegradeRule 用于在资源的某个指标(如平均响应时间、异常比例)达到一定阈值时,对资源进行熔断降级。以下是 DegradeRule 的部分属性:

收起

java

public class DegradeRule extends AbstractRule {

    private String resource;
    private int grade = RuleConstant.DEGRADE_GRADE_RT;
    private double count;
    private int timeWindow;
    private int minRequestAmount = DegradeRuleConstant.DEFAULT_MIN_REQUEST_AMOUNT;
    private int statIntervalMs = 1000;

    // Getters and Setters...
}

各属性的含义如下:

  • resource:要进行熔断降级的资源名称。
  • grade:熔断策略,支持平均响应时间熔断(RuleConstant.DEGRADE_GRADE_RT)和异常比例熔断(RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO)。
  • count:熔断阈值,例如平均响应时间的上限或异常比例的上限。
  • timeWindow:熔断时长,单位为秒。
  • minRequestAmount:最小请求数,只有当请求数达到该值时才会进行熔断判断。
  • statIntervalMs:统计时间间隔,单位为毫秒。

8.2 熔断降级检查(DegradeSlot

DegradeSlot 是处理熔断降级的核心处理槽,它会根据熔断降级规则对请求进行检查。以下是 DegradeSlot 的 entry 方法的部分代码:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    performChecking(context, resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(Context context, ResourceWrapper r) throws BlockException {
    List<DegradeRule> rules = DegradeRuleManager.getRulesForResource(r.getName());
    if (rules == null) {
        return;
    }
    for (DegradeRule rule : rules) {
        if (!rule.passCheck(context, r)) {
            throw new DegradeException(rule.getLimitApp(), rule);
        }
    }
}

在 entry 方法中,首先调用 performChecking 方法进行熔断降级检查,如果检查通过,则调用 fireEntry 方法将请求传递给下一个处理槽。

8.2 熔断降级检查(DegradeSlot)(续)

performChecking 方法会从 DegradeRuleManager 中获取该资源的所有熔断降级规则,并依次对这些规则进行检查。对于每条规则,调用其 passCheck 方法判断当前请求是否可以通过。如果某条规则检查不通过,则抛出 DegradeException 异常,表明该资源已被熔断。

8.3 熔断策略实现

8.3.1 平均响应时间熔断(DEGRADE_GRADE_RT

当资源的平均响应时间超过设定的阈值时,会触发熔断。以下是平均响应时间熔断规则的检查逻辑:

@Override
public boolean passCheck(Context context, ResourceWrapper r) {
    if (circuitBreaker == null) {
        initCircuitBreaker();
    }
    return circuitBreaker.tryPass(context);
}

private void initCircuitBreaker() {
    if (this.grade == RuleConstant.DEGRADE_GRADE_RT) {
        circuitBreaker = new ResponseTimeCircuitBreaker(this);
    } else if (this.grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
        circuitBreaker = new ExceptionRatioCircuitBreaker(this);
    } else if (this.grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
        circuitBreaker = new ExceptionCountCircuitBreaker(this);
    }
}

在 initCircuitBreaker 方法中,根据规则的熔断策略创建相应的熔断器。对于平均响应时间熔断,会创建 ResponseTimeCircuitBreaker。以下是 ResponseTimeCircuitBreaker 的部分关键代码:

public class ResponseTimeCircuitBreaker extends AbstractCircuitBreaker {

    private final int minRequestAmount;
    private final double maxAllowedRt;

    public ResponseTimeCircuitBreaker(DegradeRule rule) {
        super(rule);
        this.minRequestAmount = rule.getMinRequestAmount();
        this.maxAllowedRt = rule.getCount();
    }

    @Override
    public boolean tryPass(Context context) {
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        if (currentState.get() == State.OPEN) {
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }

    @Override
    protected void onRequestComplete(Context context) {
        DefaultNode node = (DefaultNode) context.getCurNode();
        double rt = node.avgRt();
        if (node.passQps() < minRequestAmount) {
            return;
        }
        if (rt > maxAllowedRt) {
            triggerBreak();
        }
    }
}

在 tryPass 方法中,会根据熔断器的当前状态判断请求是否可以通过。如果熔断器处于关闭状态(CLOSED),则请求可以直接通过;如果处于打开状态(OPEN),则检查是否达到重试超时时间,如果达到则尝试从打开状态转换为半开状态。

在 onRequestComplete 方法中,会在请求完成后获取该资源的平均响应时间 rt,如果请求数达到最小请求数且平均响应时间超过阈值,则触发熔断,将熔断器状态从关闭状态转换为打开状态。

8.3.2 异常比例熔断(DEGRADE_GRADE_EXCEPTION_RATIO

当资源的异常比例超过设定的阈值时,会触发熔断。对于异常比例熔断,会创建 ExceptionRatioCircuitBreaker。以下是其部分关键代码:

public class ExceptionRatioCircuitBreaker extends AbstractCircuitBreaker {

    private final int minRequestAmount;
    private final double maxExceptionRatio;

    public ExceptionRatioCircuitBreaker(DegradeRule rule) {
        super(rule);
        this.minRequestAmount = rule.getMinRequestAmount();
        this.maxExceptionRatio = rule.getCount();
    }

    @Override
    protected void onRequestComplete(Context context) {
        DefaultNode node = (DefaultNode) context.getCurNode();
        int totalCount = node.totalRequest();
        int exceptionCount = node.exceptionQps();
        if (totalCount < minRequestAmount) {
            return;
        }
        double exceptionRatio = (double) exceptionCount / totalCount;
        if (exceptionRatio > maxExceptionRatio) {
            triggerBreak();
        }
    }
}

在 onRequestComplete 方法中,会在请求完成后获取该资源的总请求数和异常请求数,计算异常比例。如果请求数达到最小请求数且异常比例超过阈值,则触发熔断。

8.3.3 异常数熔断(DEGRADE_GRADE_EXCEPTION_COUNT

当资源的异常数超过设定的阈值时,会触发熔断。对于异常数熔断,会创建 ExceptionCountCircuitBreaker。其实现逻辑与异常比例熔断类似,只是判断条件为异常数是否超过阈值。

8.4 熔断器状态转换

熔断器有三种状态:关闭(CLOSED)、打开(OPEN)和半开(HALF_OPEN),不同状态之间会根据一定的条件进行转换。

8.4.1 关闭状态(CLOSED

在关闭状态下,熔断器允许所有请求通过,正常统计资源的各项指标(如平均响应时间、异常比例等)。当触发熔断条件时,熔断器从关闭状态转换为打开状态。

8.4.2 打开状态(OPEN

在打开状态下,熔断器会拒绝所有请求,此时资源处于熔断状态。当达到设定的熔断时长后,熔断器会尝试从打开状态转换为半开状态。

8.4.3 半开状态(HALF_OPEN

在半开状态下,熔断器会允许部分请求通过,用于试探资源是否已经恢复正常。如果通过的请求中没有再次触发熔断条件,则将熔断器状态转换为关闭状态,资源恢复正常;如果再次触发熔断条件,则将熔断器状态重新转换为打开状态。

以下是熔断器状态转换的部分代码示例:

protected boolean fromOpenToHalfOpen(Context context) {
    if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
        entryPassed.set(0);
        return true;
    }
    return false;
}

protected void fromHalfOpenToOpen(int errorCount) {
    if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
        resetStat();
        nextRetryTimestamp.set(TimeUtil.currentTimeMillis() + rule.getTimeWindow() * 1000L);
    }
}

protected void fromHalfOpenToClose() {
    if (currentState.compareAndSet(State.HALF_OPEN, State.CLOSED)) {
        resetStat();
    }
}

九、系统保护实现原理

9.1 系统保护规则(SystemRule

系统保护规则 SystemRule 用于根据系统的整体负载情况,对所有资源进行统一的流量控制。以下是 SystemRule 的部分属性:

public class SystemRule extends AbstractRule {

    private double highestSystemLoad = -1;
    private double avgRt = -1;
    private int maxThread = -1;
    private double qps = -1;
    private double highestCpuUsage = -1;

    // Getters and Setters...
}

各属性的含义如下:

  • highestSystemLoad:系统最大负载,当系统负载超过该值时进行流量控制。
  • avgRt:平均响应时间上限,当系统平均响应时间超过该值时进行流量控制。
  • maxThread:最大线程数,当系统线程数超过该值时进行流量控制。
  • qps:系统最大 QPS,当系统 QPS 超过该值时进行流量控制。
  • highestCpuUsage:系统最大 CPU 使用率,当系统 CPU 使用率超过该值时进行流量控制。

9.2 系统保护检查(SystemSlot

SystemSlot 是处理系统保护的核心处理槽,它会根据系统保护规则对请求进行检查。以下是 SystemSlot 的 entry 方法的部分代码:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    SystemRuleManager.checkSystem(resourceWrapper, count);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

在 entry 方法中,首先调用 SystemRuleManager.checkSystem 方法进行系统保护检查,如果检查通过,则调用 fireEntry 方法将请求传递给下一个处理槽。

9.3 系统保护逻辑实现

SystemRuleManager.checkSystem 方法会遍历所有的系统保护规则,对系统的各项指标(如系统负载、平均响应时间、线程数、QPS、CPU 使用率等)进行检查。以下是部分检查逻辑代码:

public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
    if (!checkSystemStatus()) {
        throw new SystemBlockException(resourceWrapper.getName());
    }
}

private static boolean checkSystemStatus() {
    double currentLoad = SystemStatus.getLoadAverage();
    double currentQps = Constants.ENTRY_NODE.successQps() + Constants.ENTRY_NODE.blockQps();
    double currentAvgRt = Constants.ENTRY_NODE.avgRt();
    int currentThread = Constants.ENTRY_NODE.curThreadNum();
    double currentCpuUsage = SystemStatus.getCpuUsage();

    for (SystemRule rule : getRules()) {
        if (rule.getHighestSystemLoad() >= 0 && currentLoad > rule.getHighestSystemLoad()) {
            return false;
        }
        if (rule.getAvgRt() >= 0 && currentAvgRt > rule.getAvgRt()) {
            return false;
        }
        if (rule.getMaxThread() >= 0 && currentThread > rule.getMaxThread()) {
            return false;
        }
        if (rule.getQps() >= 0 && currentQps > rule.getQps()) {
            return false;
        }
        if (rule.getHighestCpuUsage() >= 0 && currentCpuUsage > rule.getHighestCpuUsage()) {
            return false;
        }
    }
    return true;
}

在 checkSystemStatus 方法中,会获取系统的当前负载、QPS、平均响应时间、线程数和 CPU 使用率等指标,然后与系统保护规则中的阈值进行比较。如果任何一个指标超过阈值,则认为系统处于过载状态,返回 false,表示请求会被系统保护机制拦截。

十、热点参数规则实现原理

10.1 热点参数规则(ParamFlowRule

热点参数规则 ParamFlowRule 用于对资源的热点参数进行流量控制。以下是 ParamFlowRule 的部分属性:

public class ParamFlowRule extends AbstractRule {

    private String resource;
    private int paramIdx;
    private double count;
    private int durationInSec = 1;
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    private int maxQueueingTimeMs = 500;
    private Map<Object, Integer> specificCountMap = new HashMap<>();

    // Getters and Setters...
}

各属性的含义如下:

  • resource:要进行热点参数限流的资源名称。
  • paramIdx:需要进行限流的参数索引,从 0 开始。
  • count:限流阈值,对于大部分参数的限流值。
  • durationInSec:统计时间窗口,单位为秒。
  • controlBehavior:流量控制行为,与流量控制规则中的含义相同。
  • maxQueueingTimeMs:最大排队时间,仅在匀速排队模式下有效。
  • specificCountMap:特定参数的限流值映射,对于某些特殊参数可以设置不同的限流阈值。

10.2 热点参数检查(ParamFlowSlot

ParamFlowSlot 是处理热点参数规则的核心处理槽,它会根据热点参数规则对请求的参数进行检查。以下是 ParamFlowSlot 的 entry 方法的部分代码:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    if (ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
        checkFlow(resourceWrapper, count, args);
    }
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

private void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    if (args == null) {
        return;
    }
    List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesForResource(resourceWrapper.getName());
    for (ParamFlowRule rule : rules) {
        if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
            String triggeredParam = "";
            if (args.length > rule.getParamIdx()) {
                Object value = args[rule.getParamIdx()];
                triggeredParam = String.valueOf(value);
            }
            throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
        }
    }
}

在 entry 方法中,首先检查该资源是否有热点参数规则,如果有则调用 checkFlow 方法进行检查。checkFlow 方法会遍历该资源的所有热点参数规则,对请求的参数进行检查。如果某个规则检查不通过,则抛出 ParamFlowException 异常。

10.3 热点参数限流逻辑实现

ParamFlowChecker.passCheck 方法会根据热点参数规则对请求的参数进行限流检查。以下是部分检查逻辑代码:

public static boolean passCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count, Object... args) {
    if (args == null || args.length <= rule.getParamIdx()) {
        return true;
    }
    Object value = args[rule.getParamIdx()];
    double curCount = getCurCount(rule, value);
    if (curCount + count > getCount(rule, value)) {
        if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
            return passRateLimiterCheck(rule, value, count);
        }
        return false;
    }
    return true;
}

private static double getCurCount(ParamFlowRule rule, Object value) {
    ParameterMetric metric = ParamFlowMetricStorage.getParamMetric(rule.getResource());
    if (metric == null) {
        return 0;
    }
    return metric.getCurCount(rule.getParamIdx(), value);
}

private static double getCount(ParamFlowRule rule, Object value) {
    Integer specificCount = rule.getSpecificCountMap().get(value);
    if (specificCount != null) {
        return specificCount;
    }
    return rule.getCount();
}

在 passCheck 方法中,首先获取请求中需要进行限流的参数值,然后调用 getCurCount 方法获取该参数的当前请求数,调用 getCount 方法获取该参数的限流阈值。如果当前请求数加上本次请求数超过限流阈值,则根据流量控制行为进行相应的处理。如果是匀速排队模式,则调用 passRateLimiterCheck 方法进行排队检查;否则,直接返回 false,表示请求被限流。


http://www.kler.cn/a/547826.html

相关文章:

  • resultType,jdbcType,parameterType区别
  • 大数据学习(46) - Flink按键分区处理函数
  • Java版PDF拼接
  • 结合实际讲NR系列5——RRCResume
  • 分享一个使用的音频裁剪chrome扩展-Ringtone Maker
  • 【css实现边框圆角渐变效果】
  • 服务器安全——日志分析和扫描
  • 力扣19题——删除链表的倒数第 N 个结点
  • 股票自动化交易
  • 【20250215】二叉树:144.二叉树的前序遍历
  • Python网络编程
  • 无人机航迹规划: 梦境优化算法(Dream Optimization Algorithm,DOA)求解无人机路径规划MATLAB
  • python学opencv|读取图像(七十)使用cv2.HoughCircles()函数实现图像中的霍夫圆形检测
  • MES管理系统解决方案在制造企业中的实施路径
  • mybatis-lombok工具包介绍
  • 阿里云视频点播,基于thinkphp8上传视频
  • 游戏引擎学习第101天
  • 登录演示和功能拆解
  • Unity-New Input System
  • 【Film Shot】CineScale: Recognising Cinematic Features with AI