sentinel学习笔记8-系统自适应与黑白名单限流
本文属于sentinel学习笔记系列。网上看到吴就业老师的专栏,写的好值得推荐,我整理的有所删减,推荐看原文。
https://blog.csdn.net/baidu_28523317/category_10400605.html
系统自适应
Sentinel 系统自适应保护从整体维度对应用入口流量进行控制,结合应用的 Load、总体平均 RT、入口 QPS 和线程数等几个维度的监控指标,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。官网有原理介绍:
system-adaptive-protection | Sentinel
系统自适应限流也属于熔断降级的一种实现,而非限流降级。
系统自适应限流规则
系统自适应限流规则针对所有流量类型为 IN 的资源生效,因此不需要配置规则的资源名称。SystemRule 定义的字段如下:
public class SystemRule extends AbstractRule {
/**
* negative value means no threshold checking.
* 按系统负载限流的阈值,默认 -1,大于 0.0 才生效。
*/
private double highestSystemLoad = -1;
/**
* cpu usage, between [0, 1]
* 按 CPU 使用率限流的阈值,取值[0,1]之间,默认 -1,大于等于 0.0 才生效。
*/
private double highestCpuUsage = -1;
//按 QPS 限流的阈值,默认 -1,大于 0 才生效。
private double qps = -1;
//按平均耗时的限流阈值,默认 -1,大于 0 才生效。
private long avgRt = -1;
//最大并行占用的线程数阈值,默认 -1,大于 0 才生效。
private long maxThread = -1;
如果配置了多个 SystemRule,则每个配置项只取最小值,这在调用 SystemRuleManager#loadRules 方法加载规则完成。
public static void loadSystemConf(SystemRule rule) {
// 是否开启系统自适应限流判断功能
boolean checkStatus = false;
// Check if it's valid.
// highestSystemLoad
if (rule.getHighestSystemLoad() >= 0) {
// 多个规则都配置则取最小值
highestSystemLoad = Math.min(highestSystemLoad, rule.getHighestSystemLoad());
highestSystemLoadIsSet = true;
checkStatus = true;
}
// highestCpuUsage
if (rule.getHighestCpuUsage() >= 0) {
if (rule.getHighestCpuUsage() > 1) {
RecordLog.warn(String.format("[SystemRuleManager] Ignoring invalid SystemRule: "
+ "highestCpuUsage %.3f > 1", rule.getHighestCpuUsage()));
} else { // 多个规则都配置则取最小值
highestCpuUsage = Math.min(highestCpuUsage, rule.getHighestCpuUsage());
highestCpuUsageIsSet = true;
checkStatus = true;
}
}
// avgRt
if (rule.getAvgRt() >= 0) { // 多个规则都配置则取最小值
maxRt = Math.min(maxRt, rule.getAvgRt());
maxRtIsSet = true;
checkStatus = true;
}
// maxThread
if (rule.getMaxThread() >= 0) { // 多个规则都配置则取最小值
maxThread = Math.min(maxThread, rule.getMaxThread());
maxThreadIsSet = true;
checkStatus = true;
}
// qps
if (rule.getQps() >= 0) {
qps = Math.min(qps, rule.getQps());
qpsIsSet = true;
checkStatus = true;
}
checkSystemStatus.set(checkStatus);
}
系统自适应限流判断流程
SystemSlot 是实现系统自适应限流的切入点。DegradeSlot 在 ProcessorSlotChain 链表中被放在 FlowSlot 的后面,作为限流的兜底解决方案,而 SystemSlot 在 ProcessorSlotChain 链表中被放在 FlowSlot 的前面,强制优先考虑系统目前的情况能否处理当前请求,让系统尽可能跑在最大吞吐量的同时保证系统的稳定性。
当 SystemSlot#entry 方法被调用时,由 SystemSlot 调用 SystemRuleManager#checkSystem 方法判断是否需要限流,流程如下图所示:
com.alibaba.csp.sentinel.slots.system.SystemSlot
public class SystemSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
SystemRuleManager#checkSystem 方法从全局的资源指标数据统计节点 Constans.ENTRY_NODE 读取当前时间窗口的指标数据,判断总的 QPS、平均耗时这些指标数据是否达到阈值,或者总占用的线程数是否达到阈值,如果达到阈值则抛出 Block 异常(SystemBlockException)。除此之外,checkSystem 方法还实现了根据系统当前 Load 和 CPU 使用率限流。
public static void checkSystem(ResourceWrapper resourceWrapper, int count) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
// 如果有配置 SystemRule,则 checkSystemStatus 为 true
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only 只限流类型为 IN 的流量
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// total qps qps 限流
double currentQps = Constants.ENTRY_NODE.passQps();
if (currentQps + count > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// total thread 占用线程数限流
int currentThread = Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 平均耗时限流
double rt = Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load. BBR algorithm. 系统平均负载限流
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu usage 使用率限流
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
linux 经常使用top命令查看系统的平均负载(Load)和 CPU 使用率。Sentinel 通过定时任务每秒钟使用 OperatingSystemMXBean API 获取这两个指标数据的值。
来源访问控制(黑白名单限流)
origin-authority-control | Sentinel
很多时候,我们需要根据调用方来限制资源是否通过,这时候可以使用 Sentinel 的黑白名单控制的功能。黑白名单根据资源的请求来源(origin
)限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过;若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。
调用方信息通过
ContextUtil.enter(resourceName, origin)
方法中的origin
参数传入。
一些关键类说明:
- AuthoritySlot:实现黑白名称授权功能的切入点(ProcessorSlot)
- AuthorityRule:授权规则类
- AuthorityRuleChecker:授权检测类
- AuthorityRuleManager:授权规则管理者,提供 loadRuls API
- AuthorityException:授权检测异常,继承 BlockException
AuthorityRule
public class AuthorityRule extends AbstractRule {
/**
* Mode: 0 for whitelist; 1 for blacklist.
* 限制模式,默认白名单
*/
private int strategy = RuleConstant.AUTHORITY_WHITE;
还有
- resource:资源名称,从父类继承而来。
- limitApp:限制的来源名称,在 AuthorityRule 中可配置多个,使用‘,’号分隔。
AuthoritySlot
在使用默认的 SlotChainBuilder 情况下,AuthoritySlot 被放在 SystemSlot、FlowSlot、DegradeSlot 的前面,其优先级更高。
public class AuthoritySlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {//访问控制校验
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
//从 AuthorityRuleManager 获取当前配置的所有授权规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
//获取为当前资源配置的所有授权规则;
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
//遍历
for (AuthorityRule rule : rules) {
//判断是否拒绝当前请求,是则抛出 AuthorityException 异常。
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
}
AuthorityRuleChecker
AuthorityRuleChecker 负责实现黑白名单的过滤逻辑,首先是从当前 Context 获取调用来源的名称,只有在调用来源不为空且规则配置了黑名单或者白名单的情况下,才会走黑白名单的过滤逻辑com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleChecker#passCheck
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin(); // 获取来源
// Empty origin or empty limitApp will pass. 来源为空,或者规则配置的limitApp为空 则不拦截请求
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// Do exact match with origin name. 字符串查找,这一步起到快速过滤的作用,提升性能
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
// 存在才精确匹配
if (contain) {
boolean exactlyMatch = false;
// 分隔数组
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
// 标志设置为 true
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 策略
int strategy = rule.getStrategy();
// 如果是黑名单,且来源存在规则配置的黑名单中,拦截
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 如果是白名单,且来源不存在规则配置的白名单中,拦截
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
热点参数限流
何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如:
- 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制
- 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制
热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。
Sentinel 利用 LRU 策略统计最近最常访问的热点参数,结合令牌桶算法来进行参数级别的流控。
parameter-flow-control | Sentinel
之前的限流策略都是针对资源维度的,热点参数限流则将维度细化到资源的某个参数上
热点参数限流功能在 Sentinel 源码的扩展功能模块为 sentinel-extension,子模块为 sentinel-parameter-flow-control。
sentinel提供两种限流类型
1.QPS:分为直接限流和匀速限流
- 直接限流:令牌桶原理
- 匀速限流:漏桶原理
2.并发线程
关于令牌桶和漏桶原理可以参考:流量控制与RateLimiter_net8限流-CSDN博客
参数限流规则 ParamFlowRule
public class ParamFlowRule extends AbstractRule {
public ParamFlowRule() {}
public ParamFlowRule(String resourceName) {
setResource(resourceName);
}
/**
* The threshold type of flow control (0: thread count, 1: QPS).
* 限流规则的阈值类型
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* Parameter index.
* 参数索引,ParamFlowChecker 根据限流规则的参数索引获取参数的值,下标从 0 开始,
* 例如方法 public String apiHello(String name),该方法只有一个参数,索引为 0 对应 name 参数。
*/
private Integer paramIdx;
/**
* The threshold count.
* 阈值
*/
private double count;
/**
* Traffic shaping behavior (since 1.6.0).
* 流量控制效果,同 FlowRule
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//实现匀速排队流量控制效果的虚拟队列最大等待时间,超过该值的请求被抛弃
private int maxQueueingTimeMs = 0;
//支持的突发流量总数
private int burstCount = 0;
//统计窗口时间长度(单位为秒)
private long durationInSec = 1;
/**
* Original exclusion items of parameters.参数例外项,
*/
private List<ParamFlowItem> paramFlowItemList = new ArrayList<ParamFlowItem>();
ParamFlowRuleManager
ParamFlowRuleManager主要负责管理和配置热点参数流控规则。通过定义热点参数流控规则,可以限制对特定参数的访问频率,从而保护系统免受热点参数的过度访问。
public final class ParamFlowRuleManager {
//规则缓存
private static final Map<String, List<ParamFlowRule>> PARAM_FLOW_RULES = new ConcurrentHashMap<>();
private final static RulePropertyListener PROPERTY_LISTENER = new RulePropertyListener();
private static SentinelProperty<List<ParamFlowRule>> currentProperty = new DynamicSentinelProperty<>();
static {
currentProperty.addListener(PROPERTY_LISTENER);
}
同其他rulemanager类似,也是有监听器支持动态更新规则。
热点参数限流功能的实现
sentinel-parameter-flow-control 模块通过 Java SPI 注册自定义的 SlotChainBuilder,即注册 HotParamSlotChainBuilder,将 ParamFlowSlot 放置在 StatisticSlot 的后面,这个 ParamFlowSlot 就是实现热点参数限流功能的切入点,当达到设置的阈值时抛出ParamFlowException。
public class ParamFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
//验证
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
void applyRealParamIdx(/*@NonNull*/ ParamFlowRule rule, int length) {
int paramIdx = rule.getParamIdx();
if (paramIdx < 0) {
if (-paramIdx <= length) {
rule.setParamIdx(length + paramIdx);
} else {
// Illegal index, give it a illegal positive value, latter rule checking will pass.
rule.setParamIdx(-paramIdx);
}
}
}
void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
if (args == null) {//参数为空,checkFlow 方法的最后一个参数是请求参数
return;
}//判断当前资源有没有配置限流规则
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
return;
}
List<ParamFlowRule> rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());
for (ParamFlowRule rule : rules) {
// 从 args 中获取本次限流需要使用的 value
applyRealParamIdx(rule, args.length);
// Initialize the parameter metrics. 当前资源初始化创建 ParameterMetric
ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);
//调用passCheck判断当前请求是否可以放行
if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {
String triggeredParam = "";
if (args.length > rule.getParamIdx()) {
Object value = args[rule.getParamIdx()];
// Assign actual value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
triggeredParam = String.valueOf(value);
}
throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);
}
}
}
}
参数限流中的 Node
- ParameterMetricStorage:用于实现类似 EntranceNode 功能,管理和存储每个资源对应的 ParameterMetric。ParameterMetricStorage#initParamMetricsFor
public static void initParamMetricsFor(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule) {
if (resourceWrapper == null || resourceWrapper.getName() == null) {
return;
}
String resourceName = resourceWrapper.getName();
ParameterMetric metric;
// Assume that the resource is valid.
// 双重检测,线程安全,为资源创建全局唯一的 ParameterMetric
if ((metric = metricsMap.get(resourceName)) == null) {
synchronized (LOCK) {
if ((metric = metricsMap.get(resourceName)) == null) {
metric = new ParameterMetric();
metricsMap.put(resourceWrapper.getName(), metric);
RecordLog.info("[ParameterMetricStorage] Creating parameter metric for: {}", resourceWrapper.getName());
}
}
}
metric.initialize(rule);
}
- ParameterMetric:用于实现类似 ClusterNode 的统计功能。
- com.alibaba.csp.sentinel.slots.block.flow.param.ParameterMetric#initialize
public void initialize(ParamFlowRule rule) {
if (!ruleTimeCounters.containsKey(rule)) {
synchronized (lock) {
if (ruleTimeCounters.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTimeCounters.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
if (!ruleTokenCounter.containsKey(rule)) {
synchronized (lock) {
if (ruleTokenCounter.get(rule) == null) {
long size = Math.min(BASE_PARAM_MAX_CAPACITY * rule.getDurationInSec(), TOTAL_MAX_CAPACITY);
ruleTokenCounter.put(rule, new ConcurrentLinkedHashMapWrapper<Object, AtomicLong>(size));
}
}
}
if (!threadCountMap.containsKey(rule.getParamIdx())) {
synchronized (lock) {
if (threadCountMap.get(rule.getParamIdx()) == null) {
threadCountMap.put(rule.getParamIdx(),
new ConcurrentLinkedHashMapWrapper<Object, AtomicInteger>(THREAD_COUNT_MAX_CAPACITY));
}
}
}
}
- ruleTimeCounters:用于实现匀速流量控制效果,key 为参数限流规则(ParamFlowRule),值为参数不同取值对应的上次生产令牌的时间。用于 QPS 限流
- ruleTokenCounter:用于实现匀速流量控制效果,key 为参数限流规则(ParamFlowRule),值为参数不同取值对应的当前令牌桶中的令牌数。用于 QPS 限流
- threadCountMap:key 为参数索引,值为参数不同取值对应的当前并行占用的线程总数。
接下来看看ParamFlowChecker.passCheck,passCheck 返回 true 表示放行,返回 false 表示拒绝。
public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,
Object... args) {
if (args == null) {
return true;
}
// 判断参数索引是否合法
int paramIdx = rule.getParamIdx();
if (args.length <= paramIdx) {
return true;
}
// Get parameter value. 获取参数值,如果值为空则允许通过
Object value = args[paramIdx];
// Assign value with the result of paramFlowKey method
if (value instanceof ParamFlowArgument) {
value = ((ParamFlowArgument) value).paramFlowKey();
}
// If value is null, then pass
if (value == null) {
return true;
}
// 集群限流
if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
return passClusterCheck(resourceWrapper, rule, count, value);
}
//单机限流
return passLocalCheck(resourceWrapper, rule, count, value);
}
先不看集群限流情况,仅看单机本地限流情况。passLocalCheck 方法的源码如下:
private static boolean passLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int count,
Object value) {
try { // 基本数据类型
if (Collection.class.isAssignableFrom(value.getClass())) {
for (Object param : ((Collection)value)) {
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
//数组类
} else if (value.getClass().isArray()) {
int length = Array.getLength(value);
for (int i = 0; i < length; i++) {
Object param = Array.get(value, i);
if (!passSingleValueCheck(resourceWrapper, rule, count, param)) {
return false;
}
}
} else { // 引用类型
return passSingleValueCheck(resourceWrapper, rule, count, value);
}
} catch (Throwable e) {
RecordLog.warn("[ParamFlowChecker] Unexpected error", e);
}
return true;
}
由于参数可能是基本数据类型,也可能是数组类型,或者引用类型,所以 passLocalCheck 方法分三种情况处理,数组需要遍历,先看passSingleValueCheck方法。
static boolean passSingleValueCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//qps
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//匀速限流
if (rule.getControlBehavior() == RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER) {
return passThrottleLocalCheck(resourceWrapper, rule, acquireCount, value);
} else {//直接限流
return passDefaultLocalCheck(resourceWrapper, rule, acquireCount, value);
}
// 线程级限流逻辑
} else if (rule.getGrade() == RuleConstant.FLOW_GRADE_THREAD) {
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取当前资源的 ParameterMetric,从而获取线程数
long threadCount = getParameterMetric(resourceWrapper).getThreadCount(rule.getParamIdx(), value);
if (exclusionItems.contains(value)) {
int itemThreshold = rule.getParsedHotItems().get(value);
//并行占用的线程总数+1 大于限流阈值则限流
return ++threadCount <= itemThreshold;
}
long threshold = (long)rule.getCount();
return ++threadCount <= threshold;
}
return true;
}
直接限流(快速失败)
快速失败基于令牌桶算法实现。passDefaultLocalCheck 方法控制每个时间窗口只生产一次令牌,将令牌放入令牌桶,每个请求都从令牌桶中取走令牌,当令牌足够时放行,当令牌不足时直接拒绝。ParameterMetric#tokenCounters 用作令牌桶,timeCounters 存储最近一次生产令牌的时间。
static boolean passDefaultLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//根据资源获取 ParameterMetric
ParameterMetric metric = getParameterMetric(resourceWrapper);
//从ParameterMetric 获取当前限流规则的令牌桶和最近一次生产令牌的时间
CacheMap<Object, AtomicLong> tokenCounters = metric == null ? null : metric.getRuleTokenCounter(rule);
CacheMap<Object, AtomicLong> timeCounters = metric == null ? null : metric.getRuleTimeCounter(rule);
if (tokenCounters == null || timeCounters == null) {
return true;
}
// Calculate max token count (threshold) 计算限流阈值,即令牌桶最大存放的令牌总数tokenCount
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
long tokenCount = (long)rule.getCount();//默认使用rule的count
if (exclusionItems.contains(value)) {//如果排除的热点参数中包含当前 value,则使用热点参数配置的count
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
//重新计算限流阈值,将当前限流阈值加上允许突增流量的数量。
long maxCount = tokenCount + rule.getBurstCount();
//请求的令牌数超过最大令牌数直接限流
if (acquireCount > maxCount) {
return false;
}
while (true) {//获取当前时间
long currentTime = TimeUtil.currentTimeMillis();
//获得上一次添加令牌的时间,
AtomicLong lastAddTokenTime = timeCounters.putIfAbsent(value, new AtomicLong(currentTime));
if (lastAddTokenTime == null) {
// Token never added, just replenish the tokens and consume {@code acquireCount} immediately.
//如果当前参数值未生产过令牌,则初始化生产令牌
tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
return true;
}
// Calculate the time duration since last token was added.
//获取当前时间与上次生产令牌的时间间隔
long passTime = currentTime - lastAddTokenTime.get();
// A simplified token bucket algorithm that will replenish the tokens only when statistic window has passed.
//如果间隔时间大于一个窗口时间
if (passTime > rule.getDurationInSec() * 1000) {
AtomicLong oldQps = tokenCounters.putIfAbsent(value, new AtomicLong(maxCount - acquireCount));
if (oldQps == null) {
// Might not be accurate here.
lastAddTokenTime.set(currentTime);
return true;
} else {//剩余令牌数
long restQps = oldQps.get();
//需要补充的令牌数,根据时间间隔、限流阈值、窗口时间计算
long toAddCount = (passTime * tokenCount) / (rule.getDurationInSec() * 1000);
// 计算新的令牌总数,并立即使用(扣减 acquireCount 个令牌)
long newQps = toAddCount + restQps > maxCount ? (maxCount - acquireCount)
: (restQps + toAddCount - acquireCount);
//表示acquireCount需要的令牌不足,直接限流
if (newQps < 0) {
return false;
} //CAS更新剩余令牌数
if (oldQps.compareAndSet(restQps, newQps)) {
lastAddTokenTime.set(currentTime);//更新最近一次生产令牌的时间。
return true;
}
Thread.yield();
}
} else {//不需要补充令牌
AtomicLong oldQps = tokenCounters.get(value);
if (oldQps != null) {
long oldQpsValue = oldQps.get();
// 令牌是否足够
if (oldQpsValue - acquireCount >= 0) {
// 从令牌桶中取走令牌,放行
if (oldQps.compareAndSet(oldQpsValue, oldQpsValue - acquireCount)) {
return true;
}
} else {
return false;
}
}
Thread.yield();
}
}
}
快速失败基于令牌桶算法实现。passDefaultLocalCheck 方法控制每个时间窗口只生产一次令牌,将令牌放入令牌桶,每个请求都从令牌桶中取走令牌,当令牌足够时放行,当令牌不足时直接拒绝。
static boolean passThrottleLocalCheck(ResourceWrapper resourceWrapper, ParamFlowRule rule, int acquireCount,
Object value) {
//获取ParameterMetric
ParameterMetric metric = getParameterMetric(resourceWrapper);
//根据rule 获得最后添加令牌的时间记录map
CacheMap<Object, AtomicLong> timeRecorderMap = metric == null ? null : metric.getRuleTimeCounter(rule);
if (timeRecorderMap == null) {
return true;
}
// Calculate max token count (threshold) 计算限流阈值,不支持突增流量。
Set<Object> exclusionItems = rule.getParsedHotItems().keySet();
//获取对应热点参数的令牌数
long tokenCount = (long)rule.getCount();
if (exclusionItems.contains(value)) {
tokenCount = rule.getParsedHotItems().get(value);
}
if (tokenCount == 0) {
return false;
}
// 计算请求通过的时间间隔(根据rule配置的每多少秒可以通过多少请求来计算出一个请求需要多少毫秒)
long costTime = Math.round(1.0 * 1000 * acquireCount * rule.getDurationInSec() / tokenCount);
while (true) {
long currentTime = TimeUtil.currentTimeMillis();
//更新timeRecorder为当前时间并返回旧的记录
AtomicLong timeRecorder = timeRecorderMap.putIfAbsent(value, new AtomicLong(currentTime));
if (timeRecorder == null) {
return true;
}
//AtomicLong timeRecorder = timeRecorderMap.get(value);
//上次通过请求的时间
long lastPassTime = timeRecorder.get();
// 计算当前请求的期望通过时间,最近一次请求的期望通过时间 + 请求通过的时间间隔
long expectedTime = lastPassTime + costTime;
//期望时间已经过了或者与还需要等待的时间小于配置的排队阈值
if (expectedTime <= currentTime || expectedTime - currentTime < rule.getMaxQueueingTimeMs()) {
AtomicLong lastPastTimeRef = timeRecorderMap.get(value);
//CAS修改lastPastTimeRef时间戳
if (lastPastTimeRef.compareAndSet(lastPassTime, currentTime)) {
long waitTime = expectedTime - currentTime;
if (waitTime > 0) { //waitTime>0表示当前请求“放入”虚拟队列等待
lastPastTimeRef.set(expectedTime);
try {
TimeUnit.MILLISECONDS.sleep(waitTime);
} catch (InterruptedException e) {
RecordLog.warn("passThrottleLocalCheck: wait interrupted", e);
}
}
return true;
} else {
Thread.yield();
}
} else {
return false;
}
}
}
总结
系统自适应、来源访问控制限流的实现相对简单,热点参数限流的实现相对复杂。热点参数限流对性能的影响和对内存的占用与参数的取值有多少种可能成正比,限流参数的取值可能性越多,占用的内存就越大,对性能的影响也就越大,在使用热点参数限流功能时,一定要考虑参数的取值。