RabbitMQ 确认模式(Acknowledgements Mode)详解
RabbitMQ 是一款流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在消息传递过程中,确保消息被正确处理是至关重要的。RabbitMQ 提供了多种机制来确保消息的可靠性,其中确认模式(Acknowledgements Mode)是一个关键特性。
什么是确认模式?
确认模式(Acknowledgements Mode)允许消费者在成功处理消息后显式地向 RabbitMQ 服务器发送确认信号(ack)。只有在收到确认信号后,RabbitMQ 服务器才会从队列中删除该消息。如果消费者未能发送确认信号(例如,由于消费者崩溃或网络故障),RabbitMQ 会认为消息尚未被处理,并在适当的时候重新发送消息。
RabbitMQ 提供了三种主要的确认模式:
- 手动确认(Manual Acknowledgement):消费者需要显式地发送确认信号。
- 自动确认(Automatic Acknowledgement):消息一旦被消费者接收,立即自动确认。
- 批量确认(Batch Acknowledgement):消费者可以一次确认多条消息。
为什么使用确认模式?
- 确保消息不丢失:即使消费者崩溃,消息也会重新发送。
- 提高可靠性:通过控制确认时机,可以更好地管理消息处理流程。
- 灵活性:可以根据不同的业务需求选择不同的确认模式。
Java 代码示例
下面是一个使用 Java 和 Spring AMQP 实现 RabbitMQ 确认模式的示例。
Maven 依赖
首先,在你的 pom.xml
文件中添加 Spring AMQP 依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 其他依赖 -->
</dependencies>
配置 RabbitMQ
在 application.properties
文件中配置 RabbitMQ 连接信息:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
配置类
配置一个队列、交换机和绑定:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String QUEUE_NAME = "exampleQueue";
public static final String EXCHANGE_NAME = "exampleExchange";
public static final String ROUTING_KEY = "exampleRoutingKey";
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true); // 持久化队列
}
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
}
消息监听器
使用手动确认模式来监听队列:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener.ReturnCallback;
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.config.SimpleRabbitListenerEndpoint;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.rabbitmq.client.Channel;
@Configuration
public class ListenerConfig {
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置为手动确认模式
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new SimpleMessageConverter();
}
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME, containerFactory = "rabbitListenerContainerFactory")
public void listen(String message, Channel channel, org.springframework.amqp.core.Message rabbitMessage) throws Exception {
try {
// 处理消息
System.out.println("Received <" + message + ">");
// 发送确认信号
channel.basicAck(rabbitMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 发送拒绝信号,并设置为重新入队(requeue)
channel.basicNack(rabbitMessage.getMessageProperties().getDeliveryTag(), false, true);
throw e;
}
}
}
发送消息
编写一个简单的控制器来发送消息到 RabbitMQ:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, message);
return "Message sent: " + message;
}
}
运行示例
- 启动 Spring Boot 应用程序。
- 使用浏览器或工具(如 Postman)访问
http://localhost:8080/send?message=HelloRabbitMQ
来发送消息。 - 观察控制台输出,确认消息已被接收并处理。
通过这种方式,你可以确保消息在成功处理后才会从队列中删除,从而提高了消息传递的可靠性。
新时代农民工