当前位置: 首页 > article >正文

《RabbitMQ篇》消息应答和发布确认

消息应答

消息应答机制:消费者接收信息并处理完之后,告诉rabbitmq该信息已经处理,rabbitmq可以把该信息删除了.

消息自动重新入队:如果处理某个消息的消费者异常关闭了,没有发送ACK确认,rabbitmq会将其重新入队,分发给其他消费者处理。

自动应答

消息发送后即被认为传送成功。该模式弊端,消费者不断的接收消息,如果处理不及时,导致内存耗尽,系统杀死消费者进程。所以适用于消费者可以高效处理信息的情况下。

手动应答:

Channel.basicAck(long deliveryTag, boolean multiple):

确认一条或多条收到的消息。对于经过确认的消息,RabbitMQ 认为该消息成功的处理,可以将其丢弃了。

在手动应答情况下,如果消费者没有关闭,但就是不发送ACK确认,这条消息就会一直在MQ中显示unacked,不会重新分发给其他消费者。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

multiple该参数用于开启批量应答:比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的还未应答的消息都会被确认收到消息应答。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):

拒绝一条或多条收到的消息。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

requeue:如果为True,拒绝的消息将会重新加入队列

重新入队的消息会分发给其他消费者,不重新入队的消息将会丢失/死信。

Channel.basicReject(long deliveryTag, boolean requeue):

拒绝一条消息。与 Channel.basicNack 相比少一个参数Multiple。

发布确认

生产者把消息发送到MQ后,需要MQ返回一个ACK给生产者,这样生产者才知道自己的消息成功发出去了。

实现过程:生产者将信道设置成confirm模式,所有在该信道的消息都会被指派一个唯一的id,消息到达匹配的队列后,broker发送一个确认给生产者,这样生产者就知道消息已经发布到MQ了

RabbitMQ默认没有开启发布确认模式的,需要使用Channel.confirmSelect() 开启发布确认。

Channel.waitForCinfirms():等待Broker确认或拒绝自上次调用以来发布的所有消息。

单个发布确认

单个确认发布:发布一个确认一个,才能接着发第二个。

优缺点:可以确认哪个消息失败,适用于吞吐量不大的程序。

public class ConfirmMessage {

    //消息的个数
    public static final int MESSAGE_COUNT = 1000; 

    public static void main(String[] args) throws Exception {
        publishMessageIndividually();//发布1000个单独确认消息,耗时:546ms
    }
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);
    
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
    
        //批量发消息
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                // TODO:消息发送成功的处理逻辑
            } else {
                // TODO: 消息发送失败的处理逻辑
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");
    }
}

批量发布确认

批量发布确认:以批次发布,一批次可以有很多消息,待批次发布完,才确认。

优缺点:速度比单个快,但是不能确认哪个消息失败。

public class ConfirmMessage {

    //消息的个数
    public static final int MESSAGE_COUNT = 1000; 

    public static void main(String[] args) throws Exception {
        publishMessageBatch(); //发布1000个批量确认消息,耗时:117ms
    }
    public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();
    
        int batchSize = 10;
    
        //批量发消息
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
            //批量确认
            if(i % batchSize == 0) {
                boolean flag = channel.waitForConfirms();
                if(flag) {
                    // TODO:消息发送成功的处理逻辑
                } else {
                    // TODO: 消息发送失败的处理逻辑
                }
            }
        }
    
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
    
    }
}

异步发布确认

异步确认发布:每个信息都有自己的标识,一直发布即可,发布成功与否,都会有回调信息。

优缺点:最快,又能确认哪个消息发布失败。

public class ConfirmMessage {

    //消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        publishMessageAsync(); //发布1000个异步确认消息,耗时:61ms
    }
    public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,true,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //异步确认
        ConfirmCallback confirmAckCallback = (deliveryTag, multiple) -> {
            // TODO:消息发送成功的处理逻辑
        };
        ConfirmCallback confirmNackCallback = (deliveryTag, multiple) -> {
            // TODO: 消息发送失败的处理逻辑
        };
        channel.addConfirmListener(confirmAckCallback, confirmNackCallback);

        //批量发消息
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
    }
}

消息应答和发布确认的区别

消息应答属于消费者,消费完消息告诉MQ已经消费成功。

发布确认属于生产者,生产消息发送到MQ,MQ告诉生产者已经收到消息。


http://www.kler.cn/news/339542.html

相关文章:

  • 使用PuTTY连接到Amazon Linux实例
  • Maven 父子模块的 pom.xml 文件编写
  • 代码随想录day23:贪心part1
  • 初学者如何快速入门人工智能
  • 基于深度学习的材料科学中的自动化实验设计
  • GO网络编程(七):海量用户通信系统5:分层架构
  • docker compose入门4—常用命令
  • SQL Server—了解数据库和数据库的创建
  • 河道垃圾数据集 水污染数据集——无人机视角数据集 共3000张图片,可直接用于河道垃圾、水污染功能检测 已标注yolo格式、voc格式,可直接训练;
  • Linux 系统网络配置
  • Linux中各种查看
  • 图像增强论文精读笔记-Low-Light Image Enhancement via a Deep Hybrid Network
  • Stm32的bootloader无法使用问题
  • Flume面试整理-Flume的核心组件
  • 力扣随机题
  • RNN(循环神经网络)简介及应用
  • 【Windows】在任务管理器中隐藏进程
  • IvorySQL 西安站活动回顾|一键了解IvorySQL新兼容性
  • Collection 和 Collections 有什么区别?
  • 阿里云融合认证中的App端一键登录能力