当前位置: 首页 > article >正文

RabbitMQ 如何设置限流?

RabbitMQ 的限流(流量控制)主要依赖于 QoS(Quality of Service) 机制,即 prefetch count 参数。这个参数控制每个消费者一次最多能获取多少条未确认的消息,从而避免某个消费者被大量消息压垮。


1. RabbitMQ 限流的主要方式

(1) 基于 prefetch count 进行流量控制

作用:控制 RabbitMQ 一次最多发送多少条消息 给消费者,避免消费者积压太多消息导致内存爆炸。

  • 默认情况下,RabbitMQ 会源源不断地向消费者推送消息,直到消费者崩溃。
  • prefetch count 设为 1,表示消费者一次只获取 1 条消息,处理完再取下一条。

示例(Java Spring Boot 版):

// 生产者 - 用户抢购
public void sendSeckillRequest(String userId, String productId) {
    String message = userId + "," + productId;
    rabbitTemplate.convertAndSend("seckillQueue", message);
}

// 消费者 - 处理秒杀
@RabbitListener(queues = "seckillQueue", containerFactory = "customContainerFactory")
public void handleSeckillRequest(Message message, Channel channel) throws IOException {
    try {
        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
        System.out.println("收到秒杀请求:" + msg);

        // 模拟秒杀业务处理
        Thread.sleep(1000);

        // 手动ACK,表示消息已消费完成
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        // 处理失败时拒绝消息,并放回队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

对应的RabbitMQ 配置(限制 prefetch count)

@Bean
public SimpleRabbitListenerContainerFactory customContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动ACK
    factory.setPrefetchCount(1); // **每次只取 1 条消息**
    return factory;
}

这样可以限制 RabbitMQ 一次最多给每个消费者发送 1 条消息,等它处理完了才会发送下一条。


(2) 基于 x-max-length 限制队列长度

作用:限制消息队列的最大长度,超出部分的消息会被丢弃。

示例(Java 代码方式创建队列,并限制最大长度 1000):

@Bean
public Queue seckillQueue() {
    return QueueBuilder.durable("seckillQueue")
            .withArgument("x-max-length", 1000) // **最多存 1000 条消息**
            .build();
}

这样,RabbitMQ 最多存 1000 条秒杀请求,超出的会自动丢弃,避免无限堆积。


(3) 基于 x-message-ttl 限制消息存活时间

作用:让 RabbitMQ 的消息有过期时间,超时未消费的消息会被删除。

示例(Java 代码方式):

@Bean
public Queue seckillQueue() {
    return QueueBuilder.durable("seckillQueue")
            .withArgument("x-message-ttl", 5000) // **5 秒后未消费,自动删除**
            .build();
}

这样,RabbitMQ 超过 5 秒未消费的消息会自动删除,避免秒杀请求无限堆积。


(4) 基于 x-max-priority 设置优先级队列

作用:高优先级的消息先被消费。

示例(Java 代码方式):

@Bean
public Queue seckillQueue() {
    return QueueBuilder.durable("seckillQueue")
            .withArgument("x-max-priority", 10) // **优先级范围 0-10**
            .build();
}

生产者在发送消息时,可以为每条消息指定优先级:

rabbitTemplate.convertAndSend("seckillQueue", "普通用户秒杀请求", message -> {
    message.getMessageProperties().setPriority(1); // 普通用户优先级低
    return message;
});

rabbitTemplate.convertAndSend("seckillQueue", "VIP 用户秒杀请求", message -> {
    message.getMessageProperties().setPriority(9); // VIP 用户优先级高
    return message;
});

优先级范围:0~10(具体取决于队列的 x-max-priority 设置)。

  • 0 表示最低优先级
  • 10 表示最高优先级
  • RabbitMQ 会优先发送高优先级的消息给消费者。

这样,RabbitMQ 支持优先级消息,比如可以让 VIP 用户的秒杀请求优先处理。


2. 结合多个限流策略优化秒杀系统

高并发秒杀场景下,RabbitMQ 限流可以这样设计:

限流方式作用
prefetch count = 1限制消费者一次最多消费 1 条消息,防止消息处理过载。
x-max-length = 1000限制队列最大存储 1000 条消息,超出的直接丢弃,防止消息堆积。
x-message-ttl = 5000超过 5 秒未消费的秒杀请求自动删除,避免系统长时间积压请求。
x-max-priority = 10支持优先级消息,比如 VIP 用户的消息先消费。

这样能有效防止 RabbitMQ 队列爆炸,保护数据库,提升秒杀成功率


3. 总结

基于 prefetch count 限制消费速率,防止消费者被消息压垮。
基于 x-max-length 限制队列最大长度,防止秒杀请求无限堆积。
基于 x-message-ttl 让过期消息自动删除,避免长时间存积压请求。
基于 x-max-priority 提高 VIP 用户的处理优先级,提升体验。

这些策略组合使用,可以大幅提升RabbitMQ 在秒杀系统中的稳定性和吞吐能力 


http://www.kler.cn/a/542318.html

相关文章:

  • 02.06、回文链表
  • 7.推荐系统的评价与优化
  • 借助 ListWise 提升推荐系统精排效能:技术、案例与优化策略
  • 基于机器学习时序库pmdarima实现时序预测
  • 人工智能学习(七)之神经网络
  • 【Git】Failed to connect to github.com port 443: Timed out
  • 安卓基础(Intent)
  • 运用 LangChain 编排任务处理流水线,实现多轮对话场景
  • 【C语言标准库函数】标准输入输出函数详解[4]:二进制文件读写函数
  • 通用的将jar制作成docker镜像sh脚本
  • 机器学习 - 数据的特征表示
  • 《Transformer架构完全解析:从零开始读懂深度学习的革命性模型》
  • 【C++指南】解锁C++ STL:从入门到进阶的技术之旅
  • LabVIEW 开发航天项目软件
  • SSM开发(十一) mybatis关联关系多表查询(嵌套查询,举例说明)
  • unity碰撞的监测和监听
  • SpringBoot 项目中使用Log4j2详细(避坑)
  • 在Uniapp中使用阿里云OSS插件实现文件上传
  • 高级java每日一道面试题-2025年02月03日-服务器篇[Nginx篇]-Nginx是如何处理一个HTTP请求的呢 ?
  • 【leetcode】滑动窗口刷题总结
  • Python 爬虫基础教程
  • 物联网水质监测系统设计与实现/基于STM32的水产养殖云监控系统设计
  • Kubernetes 最佳实践:Top 10 常见 DevOps/SRE 面试问题及答案
  • Java、Go、Rust、Node.js 的内存占比及优缺点分析
  • DeepSeek投喂数据(训练AI)
  • DeepSeek模拟阿里面试——Mysql