业务幂等性技术架构体系之消息幂等深入剖析
在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本文以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。
一、消息重试机制
消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server 会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向 消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。
在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常 的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以 当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。
消费重试配置
# 消费者监听相关配置
listener:
simple:
retry:
# 开启消费者(程序出现异常)重试机制,默认开启并一直重试
enabled: true
# 最大重试次数
max‐attempts: 5
# 重试间隔时间(毫秒)
initial‐interval: 3000
当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。
二、消息幂等解决
以上就是我们的消息幂等解决的架构图
下面看代码实现
修改OrderController
/**
* 此处为了方便演示,不做基础添加数据库操作
* @return
*/
@PostMapping("/addOrder")
public String addOrder(){
String uniqueKey = String.valueOf(idWorker.nextId());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(uniqueKey);
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
Message message = new Message("1271700536000909313".getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);
return "success";
}
新增消息监听类
@Component
public class ReduceStockListener {
@Autowired
private StockService stockService;
@Autowired
private JedisPool jedisPool;
@Autowired
private StockFlowService stockFlowService;
@RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
@Transactional
public void receiveMessage(Message message){
//获取消息id
String messageId = message.getMessageProperties().getMessageId();
Jedis jedis = jedisPool.getResource();
System.out.println(messageId);
try {
//redis锁去重校验
if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
System.out.println("重复请求");
return;
}
//mysql状态校验
if (!(stockFlowService.findByFlag(messageId).size() == 0)){
System.out.println("数据已处理");
return;
}
String goodsId = null;
try {
//获取消息体中goodsId
goodsId = new String(message.getBody(),"utf-8");
stockService.reduceStock(goodsId,messageId);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
int nextInt = new Random().nextInt(100);
System.out.println("随机数:"+nextInt);
if (nextInt%2 ==0){
int i= 1/0;
}
} catch (RuntimeException e) {
//解锁
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
System.out.println("出现异常了");
System.out.println(messageId+":释放锁");
throw e;
}
}
}
生产者的死信队列投递自行完成