1.pom依赖
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- JSON处理 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
2.配置信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
# 发布确认类型
publisher-confirm-type: correlated # 发布确认
publisher-returns: true # 发布返回
listener:
simple:
acknowledge-mode: manual # 手动确认
# 每次只处理一条消息
prefetch: 1
3. 生产方确认监听器
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// 消息发送确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
});
// 消息返回回调
rabbitTemplate.setReturnsCallback(returned -> {
System.out.println("消息被退回");
});
4.消费方确认
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void handleMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 处理消息
int a=5/0;
System.out.println(new String(message.getBody()));
// 手动确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消息接收成功");
} catch (Exception e) {
// 消息重新入队
channel.basicNack(deliveryTag, false, true);
System.out.println("消息接收异常");
}
}