SpringBoot 消息队列RabbitMQ 消息确认机制确保消息发送成功和失败 生产者确认
介绍
有Publisher Confirm(成功)和Publisher Return(失败)两种确认机制。开启确机制认后,在MQ成功收到消息后会返回消息给生产者。
- 消息投递到了MQ ,但是路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功。
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功
- 其它情况都会返回NACK,告知投递失败
也就是只要返回的是ACK就可以确定是消息发送成功了。
配置生产者确认
spring:
rabbitmq:
host: 128.92.13.281
port: 5673 #通信端口
virtual-host: /csdn #虚拟主机名称
username: c123
password: 123456aa
publisher-confirm-type: correlated
#开启Publisher Confirm 并设置Confirm类型
publisher-returns: true #开启Publisher return机制
- publisher-confirm-type: correlated #MQ异步回调方式返回回执消息
- publisher-confirm-type: none #关闭Confirm机制
- publisher-confirm-type: simple #同步阻塞MQ回执消息
RetrunCallback回调(失败)
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配。
@Configuration
@Slf4j
public class RabbitMqConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate =applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("消息发送失败",returnedMessage.getMessage(),
returnedMessage.getReplyCode(),//代码
returnedMessage.getReplyText(),//消息
returnedMessage.getRoutingKey(),//路由Key
returnedMessage.getExchange() //交换机
//拿到失败消息
);
}
});
}
}
ConfirmCallback回调(成功)
在每一个消息发送时单独指定
private final RabbitTemplate rabbitTemplate;
@GetMapping("/putRecord") //插入记录
public Result userMessage(){
CorrelationData cd= new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("消息回调失败");
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
System.out.println("收到发送确认回执");
if (result.isAck())
{
System.out.println("消息发送成功");
}else{
System.out.println("消息发送失败了"+result.getReason());
}
}
});
rabbitTemplate.convertAndSend("csdn.fanout","acc","发送的消息",cd);
return Result.success("操作成功",null);
}
一般情况下更比较关系NACK消息发送失败。
注意事项
- 生产者确认需要额外的网络和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己业务问题
- 对于nack消息可以有限次数重试,依然失败则记录异常消息