当前位置: 首页 > article >正文

RabbitMQ 确认模式(Acknowledgements Mode)详解

        RabbitMQ 是一款流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在消息传递过程中,确保消息被正确处理是至关重要的。RabbitMQ 提供了多种机制来确保消息的可靠性,其中确认模式(Acknowledgements Mode)是一个关键特性。

什么是确认模式?

        确认模式(Acknowledgements Mode)允许消费者在成功处理消息后显式地向 RabbitMQ 服务器发送确认信号(ack)。只有在收到确认信号后,RabbitMQ 服务器才会从队列中删除该消息。如果消费者未能发送确认信号(例如,由于消费者崩溃或网络故障),RabbitMQ 会认为消息尚未被处理,并在适当的时候重新发送消息。

RabbitMQ 提供了三种主要的确认模式:

  1. 手动确认(Manual Acknowledgement):消费者需要显式地发送确认信号。
  2. 自动确认(Automatic Acknowledgement):消息一旦被消费者接收,立即自动确认。
  3. 批量确认(Batch Acknowledgement):消费者可以一次确认多条消息。
为什么使用确认模式?
  • 确保消息不丢失:即使消费者崩溃,消息也会重新发送。
  • 提高可靠性:通过控制确认时机,可以更好地管理消息处理流程。
  • 灵活性:可以根据不同的业务需求选择不同的确认模式。
Java 代码示例

        下面是一个使用 Java 和 Spring AMQP 实现 RabbitMQ 确认模式的示例。

Maven 依赖

        首先,在你的 pom.xml 文件中添加 Spring AMQP 依赖:

<dependencies>  
    <dependency>  
        <groupId>org.springframework.boot</groupId>  
        <artifactId>spring-boot-starter-amqp</artifactId>  
    </dependency>  
    <!-- 其他依赖 -->  
</dependencies>
配置 RabbitMQ

        在 application.properties 文件中配置 RabbitMQ 连接信息:

spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=guest  
spring.rabbitmq.password=guest
配置类

        配置一个队列、交换机和绑定:

import org.springframework.amqp.core.*;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
@Configuration  
public class RabbitMQConfig {  
  
    public static final String QUEUE_NAME = "exampleQueue";  
    public static final String EXCHANGE_NAME = "exampleExchange";  
    public static final String ROUTING_KEY = "exampleRoutingKey";  
  
    @Bean  
    public Queue queue() {  
        return new Queue(QUEUE_NAME, true); // 持久化队列  
    }  
  
    @Bean  
    public DirectExchange exchange() {  
        return new DirectExchange(EXCHANGE_NAME);  
    }  
  
    @Bean  
    public Binding binding(Queue queue, DirectExchange exchange) {  
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);  
    }  
}
消息监听器

        使用手动确认模式来监听队列:

import org.springframework.amqp.rabbit.annotation.RabbitListener;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener.ReturnCallback;  
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;  
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerContainerFactory;  
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerEndpoint;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;  
import org.springframework.amqp.support.converter.MessageConverter;  
import org.springframework.amqp.support.converter.SimpleMessageConverter;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
  
import com.rabbitmq.client.Channel;  
  
@Configuration  
public class ListenerConfig {  
  
    @Autowired  
    private ConnectionFactory connectionFactory;  
  
    @Bean  
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {  
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();  
        factory.setConnectionFactory(connectionFactory);  
        factory.setMessageConverter(jsonMessageConverter());  
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动确认模式  
        return factory;  
    }  
  
    @Bean  
    public MessageConverter jsonMessageConverter() {  
        return new SimpleMessageConverter();  
    }  
  
    @RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory")  
    public void listen(String message, Channel channel, org.springframework.amqp.core.Message rabbitMessage) throws Exception {  
        try {  
            // 处理消息  
            System.out.println("Received <" + message + ">");  
              
            // 发送确认信号  
            channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);  
        } catch (Exception e) {  
            // 发送拒绝信号,并设置为重新入队(requeue)  
            channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, true);  
            throw e;  
        }  
    }  
}
发送消息

        编写一个简单的控制器来发送消息到 RabbitMQ:

import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.web.bind.annotation.GetMapping;  
import org.springframework.web.bind.annotation.RequestParam;  
import org.springframework.web.bind.annotation.RestController;  
  
@RestController  
public class MessageController {  
  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    @GetMapping("/send")  
    public String sendMessage(@RequestParam String message) {  
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);  
        return "Message sent: " + message;  
    }  
}
运行示例
  1. 启动 Spring Boot 应用程序。
  2. 使用浏览器或工具(如 Postman)访问 http://localhost:8080/send?message=HelloRabbitMQ 来发送消息。
  3. 观察控制台输出,确认消息已被接收并处理。

通过这种方式,你可以确保消息在成功处理后才会从队列中删除,从而提高了消息传递的可靠性。


新时代农民工


http://www.kler.cn/news/364012.html

相关文章:

  • ASP.NET Core 8.0 中使用 Hangfire 调度 API
  • 如何对pdf文件进行加密?pdf文件加密全攻略与深度解析(5个方法)
  • SpringBoot中yml文件多环境配置
  • ZYNQ PS_GPIO中断
  • SSL证书有免费的吗?在哪里可以申请到?——附带申请步骤
  • 哥德巴赫猜想渐行渐远
  • 海外媒体发稿:外媒宣发之《时代》杂志 TIME 的魅力
  • 使用 NumPy 和 Matplotlib 实现交互式数据可视化
  • 【Android】AHandler/AMessage/ALooper机制
  • Java:关于哈希表
  • 2024年808数据结构答案
  • css知识点梳理
  • 如何在服务器上部署开源大模型 GLM-4-9B-Chat 并应用到RAG应用中
  • 【传知代码】机器学习在情绪预测中的应用(论文复现)
  • 虚拟机的 NAT 模式 或 Bridged 模式能够被外界IPping通
  • IDEA无法生成自动化序列serialVersionUID及无法访问8080端口异常的解决方案
  • 计算机毕业设计PySpark+大模型农产品推荐系统 农产品爬虫 农产品商城 农产品大数据 农产品数据分析可视化 PySpark Hadoop
  • 【亚马逊云】基于 Amazon EKS 搭建开源向量数据库 Milvus
  • 【ArcGIS Pro实操第4期】绘制三维地图
  • Java如何自定义线程池
  • Python + 查看个人下载次数小工具 - 记录
  • 【.Net】【C#】Program.cs通用代码模板
  • AJAX中get和post的区别
  • 【自动化测试之oracle数据库】MacOs如何安装oracle- client
  • Matlab|电价负荷需求响应-考虑电价变动
  • 线性可分支持向量机的原理推导 9-25对拉格朗日函数L(w,b,α) 关于w求导 公式解析