当前位置: 首页 > article >正文

RabbitMQ中的批量Confirm模式:提升消息可靠性与性能

在现代分布式系统中,消息队列(Message Queue)扮演着至关重要的角色,用于解耦系统组件、异步处理任务以及提高系统的可扩展性和可靠性。RabbitMQ作为一款广泛使用的消息队列中间件,提供了多种机制来确保消息的可靠传递。其中,Confirm模式是RabbitMQ中用于保证消息可靠投递的重要特性之一。而批量Confirm模式则是在此基础上进一步优化性能的一种方式。

本文将深入探讨RabbitMQ中的批量Confirm模式,包括其工作原理、使用场景、实现方法以及优缺点,帮助开发者更好地理解并应用这一特性。


1. RabbitMQ消息确认机制简介

在RabbitMQ中,消息的可靠投递通常通过以下两种机制实现:

  1. 事务机制:通过AMQP协议的事务机制,确保消息的原子性提交或回滚。但事务机制会带来较大的性能开销。
  2. Confirm模式:通过异步确认机制,生产者发送消息后,RabbitMQ会异步返回一个确认(ack)或未确认(nack)的信号,告知生产者消息是否成功投递。Confirm模式在性能上优于事务机制,因此更常用于生产环境。

Confirm模式分为两种:

  • 单条Confirm模式:每发送一条消息,等待RabbitMQ的确认。
  • 批量Confirm模式:发送一批消息后,等待RabbitMQ的批量确认。

2. 批量Confirm模式的工作原理

批量Confirm模式的核心思想是减少网络通信的开销。生产者将多条消息发送到RabbitMQ后,RabbitMQ会将这些消息的确认信号批量返回,而不是逐条确认。这种方式可以显著提高消息投递的效率。

工作流程:

  1. 生产者开启Confirm模式。
  2. 生产者发送一批消息到RabbitMQ。
  3. RabbitMQ接收到消息后,将消息持久化到磁盘(如果启用了持久化)。
  4. RabbitMQ向生产者发送一个批量确认信号(ack),确认这一批消息已成功处理。
  5. 如果某条消息处理失败,RabbitMQ会发送一个未确认信号(nack),生产者可以根据需要进行重试或记录日志。

3. 批量Confirm模式的使用场景

批量Confirm模式适用于以下场景:

  • 高吞吐量场景:需要发送大量消息,且对消息的实时性要求不高。
  • 性能优化:希望通过减少网络通信开销来提高消息投递的效率。
  • 批量任务处理:例如批量导入数据、批量发送通知等。

4. 实现批量Confirm模式

以下是一个使用RabbitMQ批量Confirm模式的示例代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmListener;

import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;

public class BatchConfirmProducer {

    private static final String QUEUE_NAME = "batch_confirm_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.200.142"); // RabbitMQ服务器地址
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("test"); // 用户名
        factory.setPassword("test"); // 密码

        // 创建连接和Channel
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 开启Confirm模式
            channel.confirmSelect();

            // 用于存储未确认的消息
            ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

            // 添加Confirm监听器
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple) {
                        // 批量确认,删除所有小于等于deliveryTag的消息
                        ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
                        confirmed.clear();
                    } else {
                        // 单个确认,删除指定的消息
                        outstandingConfirms.remove(deliveryTag);
                    }
                    System.out.println("Message with delivery tag " + deliveryTag + " has been acknowledged.");
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple) {
                        // 批量未确认,处理所有小于等于deliveryTag的消息
                        ConcurrentNavigableMap<Long, String> nacked = outstandingConfirms.headMap(deliveryTag, true);
                        nacked.forEach((tag, message) -> {
                            System.out.println("Message with delivery tag " + tag + " has been nacked: " + message);
                        });
                        nacked.clear();
                    } else {
                        // 单个未确认,处理指定的消息
                        String message = outstandingConfirms.remove(deliveryTag);
                        System.out.println("Message with delivery tag " + deliveryTag + " has been nacked: " + message);
                    }
                }
            });

            // 发送10条消息
            for (int i = 0; i < 10; i++) {
                String message = "Message " + i;
                long deliveryTag = channel.getNextPublishSeqNo();
                outstandingConfirms.put(deliveryTag, message); // 存储未确认的消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("Sent message: " + message);
            }

            // 等待所有消息被确认
            if (channel.waitForConfirms(5000)) {
                System.out.println("All messages have been confirmed.");
            } else {
                System.out.println("Some messages were not confirmed.");
            }
        }
    }
}

代码解析:

  1. 连接RabbitMQ
    • 使用ConnectionFactory创建与RabbitMQ服务器的连接。
    • 设置主机地址、用户名和密码。
  2. 声明队列
    • 使用channel.queueDeclare声明一个队列。如果队列不存在,则会自动创建。
  3. 开启Confirm模式
    • 使用channel.confirmSelect()将Channel设置为Confirm模式。
  4. 存储未确认的消息
    • 使用ConcurrentNavigableMap存储未确认的消息,以deliveryTag为键。
  5. 添加Confirm监听器
    • 实现ConfirmListener接口,处理acknack事件。
    • handleAck方法中,根据multiple参数判断是批量确认还是单个确认,并清理已确认的消息。
    • handleNack方法中,处理未确认的消息。
  6. 发送消息
    • 使用channel.basicPublish发送消息,并将消息存储到outstandingConfirms中。
  7. 等待确认
    • 使用channel.waitForConfirms等待所有消息被确认。如果超时或部分消息未被确认,则返回false

运行结果:

  1. 运行程序后,您将看到以下输出:

     Sent message: Message 0
     Sent message: Message 1
     Sent message: Message 2
     Sent message: Message 3
     Sent message: Message 4
     Sent message: Message 5
     Sent message: Message 6
     Sent message: Message 7
     Sent message: Message 8
     Sent message: Message 9
     Message with delivery tag 2 has been acknowledged.
     Message with delivery tag 9 has been acknowledged.
     Message with delivery tag 10 has been acknowledged.
     All messages have been confirmed.
    

    如果某些消息未被确认,则会输出类似以下内容:

    Message with delivery tag 5 has been nacked: Message 5
    Some messages were not confirmed.
    

5. 批量Confirm模式的优缺点

优点:

  • 性能提升:减少网络通信开销,提高消息投递效率。
  • 可靠性:确保消息的可靠投递,避免消息丢失。
  • 灵活性:可以根据业务需求调整批量大小。

缺点:

  • 延迟增加:需要等待一批消息发送完成后才能收到确认信号,可能会增加消息投递的延迟。
  • 复杂性:需要处理批量确认的逻辑,增加了代码的复杂性。

6. 最佳实践

  1. 合理设置批量大小:根据业务需求和系统性能,调整批量大小。批量过大会增加延迟,批量过小则无法充分发挥性能优势。
  2. 处理未确认消息:在收到nack信号时,记录日志或进行重试,确保消息不丢失。
  3. 监控与告警:监控消息的确认情况,及时发现并处理异常。
  4. 结合持久化:启用消息持久化,确保即使RabbitMQ重启,消息也不会丢失。

7. 总结

RabbitMQ的批量Confirm模式是一种在保证消息可靠性的同时提升性能的有效方式。通过减少网络通信开销,批量Confirm模式能够显著提高消息投递的效率,适用于高吞吐量的场景。然而,开发者在使用时需要注意合理设置批量大小,并处理未确认消息,以确保系统的稳定性和可靠性。


http://www.kler.cn/a/454575.html

相关文章:

  • springboot配置oracle+达梦数据库多数据源配置并动态切换
  • OpenCV相机标定与3D重建(35)计算两幅图像之间本质矩阵(Essential Matrix)的函数findEssentialMat()的使用
  • 自学记录HarmonyOS Next的HMS AI API 13:语音合成与语音识别
  • ESP-NETIF L2 TAP 接口-物联网嵌入式开发应用
  • 只谈C++11新特性 - 删除函数
  • C程序设计:数据在数组中的交换
  • 王佩丰24节Excel学习笔记——第二十讲:图表基础
  • Elasticsearch 集群
  • WordPress TutorLMS插件 SQL注入漏洞复现(CVE-2024-10400)(附脚本)
  • 秒鲨后端之MyBatis【3】自定义映射resultMap、动态SQL、MyBatis的缓存、MyBatis的逆向工程、分页插件(30000字)
  • D类音频应用EMI管理
  • Day57 图论part07
  • JAVA开发初级入门之-如何快速将Java开发环境搭建,优雅草央千澈快速IDEA与JDK安装配置环境教程一文让你搞定-java开发必修课之一
  • OpenLinkSaas使用手册-简介
  • 【蓝桥杯】压缩字符串
  • Linux-----进程处理(文件IO资源使用)
  • 让 AMD GPU 在大语言模型推理中崭露头角:机遇与挑战
  • Unity如何判断Animator当前播放的动画已经结束
  • Go的Slice如何扩容
  • 游戏引擎学习第57天
  • 「下载」5G智慧园区整体解决方案:架构IOC核心平台层,信息全面集成共享
  • uni-app使用web-view遇到的问题
  • vxe-table 实现跨行按钮同时控制两行的编辑状态
  • Flink CDC MySQL 同步数据到 Kafka实践中可能遇到的问题
  • SpringBoot揭秘:URL与HTTP方法如何定位到Controller
  • Excel中一次查询返回多列