当前位置: 首页 > article >正文

RabbitMQ 消息顺序性保证

方式一:Consumer设置exclusive

在这里插入图片描述

注意条件

  • 作用于basic.consume
  • 不支持quorum queue
    在这里插入图片描述
    当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)
	at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)
	at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)
	at java.base/java.lang.Thread.run(Thread.java:840)

Spring AMQP 如何通过exclusive实现顺序消费:

在这里插入图片描述
核心逻辑

while (!DirectMessageListenerContainer.this.started && isRunning()) {
   this.cancellationLock.reset();
   try {
      for (String queue : queueNames) {
         consumeFromQueue(queue);
      }
   }
   catch (AmqpConnectException | AmqpIOException e) {
      long nextBackOff = backOffExecution.nextBackOff();
      if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {
         DirectMessageListenerContainer.this.aborted = true;
         shutdown();
         this.logger.error("Failed to start container - fatal error or backOffs exhausted",
               e);
         this.taskScheduler.schedule(this::stop, Instant.now());
         break;
      }
      this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);
      doShutdown();
      try {
         Thread.sleep(nextBackOff); // NOSONAR
      }
      catch (InterruptedException e1) {
         Thread.currentThread().interrupt();
      }
      continue; // initialization failed; try again having rested for backOff-interval
   }
   DirectMessageListenerContainer.this.started = true;
   DirectMessageListenerContainer.this.startedLatch.countDown();
}
  1. 抛出异常后,会重试
  2. 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制

方式二:single active consumer

在这里插入图片描述

原理:

在这里插入图片描述

代码示例

Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

在这里插入图片描述
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams


http://www.kler.cn/a/538178.html

相关文章:

  • 优化GPT API接口链接的方法
  • AF3 drmsd函数解读
  • DFS+回溯+剪枝(深度优先搜索)——搜索算法
  • 从算法到落地:DeepSeek如何突破AI工具的同质化竞争困局
  • swap内存
  • 扩展知识--缓存和分时复用cpu
  • 多线程下jdk1.7的头插法导致的死循环问题
  • 学JDBC 第二日
  • OSwatch性能分析工具部署
  • 为什么要学习AI/机器学习
  • 2025年02月07日Github流行趋势
  • vnev/Scripts/activate : 无法加载文件
  • 深度学习之DCGAN算法深度解析
  • 微服务组件LoadBalancer负载均衡
  • GnuTLS: 在 pull 函数中出错。 无法建立 SSL 连接。
  • 求组合数,
  • ubuntu18.04 编译安装opencv3.4.8
  • 云计算真的可以提高企业的IT敏捷性吗?
  • 【简单】27.移除元素
  • 《Java核心技术 卷II》本地化的数字格式
  • 3.攻防世界 Confusion1(服务器模板注入SSTI)
  • 直接抓取网页的爬虫技术:限制与合规挑战
  • 订单超时设计(1)--- 如何使用redis实现订单超时实时关闭功能
  • 软件测试就业
  • 前端学习-页面加载事件和页面滚动事件(三十二)
  • vue3:点击子组件进行父子通信