记一次事务里发普通消息的线上问题排查过程--图文解析
记一次事务里发普通消息的线上问题排查过程–图文解析
原文链接:https://juejin.cn/post/7345645720043274255
原文作者:转转技术团队
事务+MQ使用不当导致数据不一致问题
项目初期,业务复杂度低,多个业务放在一个系统中,叫做单体系统:
随着业务的深入,业务的复杂度越来越高,于是把不同的业务放在不同的子系统中:
客户端发出请求后,请求先进入系统A,系统A处理了业务A后,将处理后的用户请求信息存入MQ,排队进入系统B。好处就是系统A处理完自己的业务后,直接通过MQ把剩余的工作丢出去,然后接收新的用户请求了。注意这是一种异步的操作。
或者是处理业务A的一部分业务后,先发一个MQ给系统B,然后处理业务A的剩余的其他业务。比如说有人迟到了,则人力系统先记录迟到次数,然后把要扣掉的金额信息发给财务系统进行统计,之后HR系统在继续记录迟到原因等信息。
本文中的背景就是如此,系统A处理完自己的业务后,发送了一个业务MQ-A消息到系统B,系统B接收到这个消息后进行处理。原文中说:
系统A
处理完一笔订单后,会发送MQ,系统B
消费系统A
的MQ,消费过程中,会去系统A
拉取信息,然后更新系统B
对应的表信息。
我们可以简单理解为,系统A就是订单系统,系统B就是财务系统,订单系统处理了一部分订单业务后,将处理好的信息通过MQ交给财务系统进行处理。然后继续处理订单的其他业务。
现在的问题是,系统A发消息让系统B更新一个叫做cost的字段值,但个别消息处理后发现cost的值为0,本文中的cost是退货流程,cost是不可能为0的,这不正常!
一开始的分析是,经过代码排查发现系统A向系统B不止发送了一次MQ,而是两次!原因是系统A和系统B都有一个cost字段,系统A在初始化自己的cost字段为0后发送tag:create到系统b,让其初始化自己的cost字段,系统A更新自己的cost字段后又发送了tag:update的消息到系统B让其更新自己的cost字段,通常情况下,系统B需要常见两个消费者类来分别处理这两个tag发过来的信息。我们上面说过,因为发送信息是异步的,所以我们无法保证发送顺序以及最终的消费顺序!
可能我们发送初始化cost消息的时候,因为网络延迟发送慢了,让更新cost的先发送过去了。或者是系统B中的针对tag:create的消费者处理慢了,让tag:update的消费者类先处理完了。
原文中说,原本应该是先初始化cost=0,然后再更新cost=20;结果由于个别消息无法控制执行顺序执行反了,先更新再初始化,导致cost为0!
那么如何保证先消费tag:create,在消费tag:update呢?也就是如何保证不同tag下的mq的消息消费顺序呢?这本来就有点扯!既然是不同tag的异步消息,本来在设计的时候就不应该保证消费顺序!但事已如此,也只好用加锁的方式来处理:
上面的代码中,先要获得锁,才可以进行update处理。由于只给了代码片段,我简单用图给大家解释下:
简单说就是tag:create消费者类处理完消息后,会向缓存发送一个已经处理的orderID,说明已经初始化结束了,在tag:update执行的时候先去缓存查一下是否有这个orderID,如果有则处理,如果没有则通过自旋的方式每200毫秒查一次。当然,不一定用缓存,检查一下数据库里对应的cost字段是否已经被初始化为0也可以。
按理说到这里就应该没事了,但是根本不起作用!经过大佬排查,发现是大事务的问题,通过原文图示我们会发现,系统B更新cost字段的时候,根本不是接收系统A发过来的cost数据,而是到系统A的表里查系统A的cost的值,然后同步到自己的表里的cost字段值!
由于系统B接收到系统A发过来的消息,并进行处理的时候,系统A的事务是一个大事务,事务没有提交,所以系统A的表中的cost数据并不会发生任何变化!
经过其团队优化后,将发送mq的代码调整到提交事务之后发送,问题解决!
优化之前:
优化之后:
这也就是为什么大厂不推荐直接使用@Transactional注解来操作事务,就是因为这个注解直接影响整个方法:
@Service
public class MyService {
@Autowired
private MyRepository myRepository;
@Transactional(rollbackFor = Exception.class)
public void saveDataWithTransaction() {
// 执行数据库操作,例如保存数据
//事务代码
myRepository.saveData();
myRepository.updateData();
//事务之外的代码
//...
commit;
}
}
采用手动方式操作注解,可以精确到事务的具体代码行,尽可能的优化大事务:
@Service
public class MyService {
@Autowired
private PlatformTransactionManager transactionManager;
public void saveDataManually() {
DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);
try {
// 执行数据库操作,例如保存数据
//事务代码
myRepository.saveData();
myRepository.updateData();
transactionManager.commit(transactionStatus); // 手动提交事务
//事务之外的代码
//sendMq 发出消息
//...
} catch (Exception e) {
transactionManager.rollback(transactionStatus); // 手动回滚事务
throw e;
}
}
}
如果方法体内,本来就没有非事务代码,只有一个需要发出mq的方法,那也有解决方案:
参考文章:https://blog.csdn.net/u014427391/article/details/134147899
一种是借助于Spring框架提供的TransactionSynchronizationManager
来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener
来控制事务
TransactionSynchronizationManager:
@Transactional(rollbackFor = Exception.class)
public void register() {
User user = User.builder()
.name("管理员")
.email("123456@qq.com")
.build();
userMapper.insert(user);
// after transaction commit
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 发送消息给MQ
sendMQMessage();
}
});
}
通过@TransactionalEventListener注解属性中的phase = TransactionPhase.AFTER_COMMIT来设置在事务之后执行。
package com.example.eventlistener.listener;
import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import javax.annotation.Resource;
@Component
@Slf4j
public class SendMsgListener {
@Resource
private UserMapper userMapper;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)
public void msg(Event event) {
//获取监听到的mq消息并处理逻辑
log.info("sendMsg: {}" , JSONUtil.toJsonStr(event));
}
}
另外也可以使用mq中的消息确认机制或者事务消息的方式来实现,消息确认机制简单说就是系统A发送给系统b消息后,mq需要等到系统B消费成功并返回一个确认信息,mq才会认定这次消息消费成功。另外的事务消息简单说就是mq内部设置,消息在消费之前如果存在事务,则消息先被缓存,直到事务提交成功才会消费消息。由于原文中并没有说明使用的是哪一种mq,所以这里不做代码与配置阐述了。
最后,我需要补充一点,如果事务执行成功,发送了消息,消息刚要消费,或者消费到一半mq宕机了怎么办:
消息后,mq需要等到系统B消费成功并返回一个确认信息,mq才会认定这次消息消费成功。另外的事务消息简单说就是mq内部设置,消息在消费之前如果存在事务,则消息先被缓存,直到事务提交成功才会消费消息。由于原文中并没有说明使用的是哪一种mq,所以这里不做代码与配置阐述了。
最后,我需要补充一点,如果事务执行成功,发送了消息,消息刚要消费,或者消费到一半mq宕机了怎么办: