当前位置: 首页 > article >正文

深入解析 Redisson 分布式限流器 RRateLimiter 的原理与实现

文章目录

    • RRateLimiter 介绍
    • 代码实现
    • Lua 脚本
    • 现实场景
    • 1. 初始化限流器
    • 2. 限流器应用场景(客人申请游玩流程)

RRateLimiter 介绍

在分布式系统中,限流(Rate Limiting)是保障系统稳定性、避免过载的重要机制。Redisson 作为一个功能强大的 Redis 客户端,不仅提供了广泛使用的分布式锁,还包含了许多其他实用的分布式工具。其中,RRateLimiter 是 Redisson 提供的分布式限流器,功能强大。本文将详细解析 RRateLimiter 的原理,深入理解其工作机制。

代码实现

首先,通过一个简单的示例了解如何使用 RRateLimiter,它创建了一个限流器并启动多个线程来获取令牌:

import org.redisson.Redisson; // 导入 Redisson 的核心类,用于创建 Redisson 客户端
import org.redisson.api.RRateLimiter; // 导入 RRateLimiter 接口,用于实现分布式限流
import org.redisson.api.RedissonClient; // 导入 RedissonClient 接口,用于与 Redis 进行交互
import org.redisson.config.Config; // 导入 Redisson 的配置类,用于配置 Redis 连接

import java.util.concurrent.CountDownLatch; // 导入 CountDownLatch 类,用于控制线程同步

public class RateLimiterDemo { // 定义一个公共类 RateLimiterDemo
    public static void main(String[] args) throws InterruptedException { // 主方法,程序入口,可能抛出 InterruptedException
        RRateLimiter rateLimiter = createRateLimiter(); // 创建一个 RRateLimiter 实例

        int totalThreads = 20; // 定义总线程数为 20
        CountDownLatch latch = new CountDownLatch(totalThreads); // 创建一个 CountDownLatch 实例,初始计数为 totalThreads

        long startTime = System.currentTimeMillis(); // 记录开始时间,用于计算总耗时
        for (int i = 0; i < totalThreads; i++) { // 循环创建并启动 20 个线程
            new Thread(() -> { // 创建一个新线程
                rateLimiter.acquire(1); // 每个线程尝试获取 1 个令牌,若令牌不足则阻塞等待
                latch.countDown(); // 线程完成后,调用 countDown() 方法减少计数器
            }).start(); // 启动线程
        }
        latch.await(); // 主线程等待,直到所有子线程完成
        System.out.println("Total elapsed time: " + (System.currentTimeMillis() - startTime) + " ms"); // 打印总耗时
    }

    /**
     * 创建并配置 RRateLimiter 的方法
     *
     * @return 配置好的 RRateLimiter 实例
     */
    private static RRateLimiter createRateLimiter() { // 创建并配置 RRateLimiter 的方法
        Config config = new Config(); // 创建一个新的 Redisson 配置实例
        config.useSingleServer() // 配置使用单一 Redis 服务器
              .setAddress("redis://127.0.0.1:6379") // 设置 Redis 服务器地址
              .setTimeout(1000000); // 设置连接超时时间(毫秒)

        RedissonClient redisson = Redisson.create(config); // 根据配置创建一个 Redisson 客户端实例
        RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter"); // 获取名为 "myRateLimiter" 的 RRateLimiter 实例
        rateLimiter.trySetRate(RRateLimiter.RateType.OVERALL, 1, 1, RateIntervalUnit.SECONDS); // 初始化限流器,设置全局速率为每秒 1 个令牌
        return rateLimiter; // 返回配置好的限流器实例
    }
}

Lua 脚本

为了更深入地理解 RRateLimiter 的工作原理,将进一步解析其底层的 Lua 脚本,实现分布式限流的核心逻辑。以下内容将逐行解释 Lua 脚本的功能和实现细节。

redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]); -- 将速率设置到哈希表中,只有当 'rate' 字段不存在时才设置
redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]); -- 将时间区间设置到哈希表中,只有当 'interval' 字段不存在时才设置
return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]); -- 将类型设置到哈希表中,只有当 'type' 字段不存在时才设置,并返回结果
-- ARGV[1] 为请求令牌数
-- ARGV[2] 为请求时间戳
-- ARGV[3] 为请求类型

-- 获取限流器的速率、时间区间和类型
local rate = redis.call("hget", KEYS[1], "rate") -- 从哈希表中获取速率
local interval = redis.call("hget", KEYS[1], "interval") -- 获取时间区间(毫秒)
local type = redis.call("hget", KEYS[1], "type") -- 获取限流器的类型(单机或集群)
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized") -- 确保限流器已初始化

-- 默认情况下,使用 {name}:value 和 {name}:permits
local valueName = KEYS[2] -- 当前令牌数的键名
local permitsName = KEYS[4] -- 记录请求的有序集合键名

-- 如果类型为 "1"(单机模式),则使用不同的键名
if type == "1" then
    valueName = KEYS[3] -- 单机模式下的令牌数键名
    permitsName = KEYS[5] -- 单机模式下的有序集合键名
end

-- 确保请求的令牌数不超过限流器的速率
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")

-- 获取当前剩余的令牌数
local currentValue = redis.call("get", valueName)
-- 第一次请求直接走else
-- 第二次请求因为 valueName 更新有值,走if
if currentValue ~= false then
    -- 获取已过期的请求(初始时间 至 (当前时间(ARGV[2]-时间间隔(interval)) 准备清理失效的令牌数据
    local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
    local released = 0 -- 初始化拟新增失效令牌数
    -- 遍历过期的请求,释放相应的令牌
    for i, v in ipairs(expiredValues) do
        local random, permits = struct.unpack("fI", v)
        released = released + permits
    end

    -- 如果有释放的令牌,更新当前可用令牌数并移除过期的请求
    if released > 0 then
        redis.call("zrem", permitsName, unpack(expiredValues)) -- 清除 permitsName 中包含 expiredValues 的数据
        currentValue = tonumber(currentValue) + released -- 清理失效令牌后计算总可用令牌数
        redis.call("set", valueName, currentValue) -- 更新可用令牌
    end

    -- 如果当前令牌数不足以满足请求  
    if tonumber(currentValue) < tonumber(ARGV[1]) then
        -- 计算需要等待的时间
        local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1) -- 找到最近一次的请求时间 nearest 
        local random, permits = struct.unpack("fI", nearest[1]) -- 解压为时间戳+请求令牌数
        -- 返回等待时间,也可以写为 tonumber(nearest[2])+interval-tonumber(ARGV[2])
        return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval) -- nearest[2] 为上行的 random
    else
        -- 当前可用令牌数足够,记录此次请求并减少可用令牌数
        redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
        redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
        return nil -- 成功获取令牌
    end
else
    -- 第一次请求,初始化令牌数和有序集合
    redis.call("set", valueName, rate) -- 设置当前令牌数为最大速率值
    redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1])) -- 记录请求记录
    redis.call("decrby", valueName, ARGV[1]) -- 更新可用令牌数 valueName -= ARGV[1](请求令牌数)
    return nil -- 成功获取令牌
end

现实场景

冰雪大世界的热门项目每天吸引着络绎不绝的顾客。为了避免人流过于集中,影响顾客的体验和项目的正常运行,管理团队制定了以下规则:

  • 每小时只接待6位客人。
  • 每位客人在进入项目游玩,一个小时后自动将入场票归还到废票区,确保不影响后续客人的入场。
    限流机制的设置

1. 初始化限流器

项目每天一开始,第一位客人进入游玩时,系统会进行以下操作:

  • 统计系统剩余票数量:记录为(valueName),代表同一时间段内的最大客容量。
  • 记录每次申请的客人及进场时间:存储在(permitsName)中。
  • 刷新实际剩余票数量:更新为(currentValue = valueName),确保系统实时掌握当前剩余的入场票数。
    通过这些步骤,系统为当天的限流工作做好了准备。

2. 限流器应用场景(客人申请游玩流程)

当一位客人申请游玩项目时,系统会按照以下流程操作:

步骤一:查询可用票

  • 计算实际剩余票(currentValue = valueName)。

步骤二:回收废票

  • 从废票区 根据入场记录(permitsName)计算(当前时间-时间间隔)之前的所有废弃入场票(released),这意味着已进入游玩的客人在系统时间间隔后已不再影响项目后续游客的体验,归还的票可以重新使用。
  • 更新实际剩余票(currentValue):将回收的票数加到实际剩余票(currentValue += released)
  • 更新系统剩余票(valueName):(valueName = currentValue),确保系统知道当前有多少可用的入场票,反映最新的入场票状态。

步骤三:判断票数是否足够

  • 检查实际剩余票 (currentValue):与当前游客申请票数(tonumber(ARGV[1]))进行比较。
    • 如果票够用:

      • 记录此次请求:将客人的申请信息和进场时间记录到(permitsName)中。
      • 更新系统剩余票:(valueName -= 申请票数)中扣除相应的票数。
      • 允许客人进入:客人成功进入项目游玩。
    • 如果票不够:

      • 计算等待时间:根据上一位客人的入场时间和设定的时间间隔,计算出客人需要等待时间(上一位客人的入场时间+间隔时间-当前时间)。
      • 告知客人:将计算出的等待时间返回给客人,游客异步再尝试进入。

http://www.kler.cn/a/467971.html

相关文章:

  • .net core修行之路-多线程异步编程概念篇
  • LEED绿色建筑认证在2025年相关消息
  • Docker 容器内部如何访问本机的服务
  • AlphaPi相关硬件驱动提取
  • 操作系统大题整理
  • memcached的基本使用
  • python识别outlook邮件并且将pdf文件作为附件发送邮件
  • 矩阵运算提速——玩转opencv::Mat
  • 电脑键盘打不了字是何原因,如何解决呢
  • 软件需求规格是什么
  • CSS——4. 行内样式和内部样式(即CSS引入方式)
  • 连接Milvus
  • Apache PDFBox添加maven依赖,pdf转成图片
  • 人工智能(AI)简史:推动新时代的科技力量
  • 详解MySQL SQL删除(超详,7K,含实例与分析)
  • PaperAssistant:使用Microsoft.Extensions.AI实现
  • Uniapp中使用`wxml-to-canvas`开发DOM生成图片功能
  • Traceroute 网络诊断工具实战详解
  • 中高级运维工程师运维面试题(九)之 Apache Pulsar
  • MySQL优化器估算SQL语句访问行数的深入分析
  • MIPI_DPU 综合(DPU+MIPI+Demosaic+VDMA 通路)
  • Django Admin中实现字段自动提交功能
  • 文献分享:跨模态的最邻近查询RoarGraph
  • BGP的基本配置
  • OC中isa指针
  • LeetCode-有效的括号(020)