当前位置: 首页 > article >正文

记一次事务里发普通消息的线上问题排查过程--图文解析

记一次事务里发普通消息的线上问题排查过程–图文解析

原文链接:https://juejin.cn/post/7345645720043274255

原文作者:转转技术团队

事务+MQ使用不当导致数据不一致问题

项目初期,业务复杂度低,多个业务放在一个系统中,叫做单体系统:

在这里插入图片描述

随着业务的深入,业务的复杂度越来越高,于是把不同的业务放在不同的子系统中:
在这里插入图片描述

客户端发出请求后,请求先进入系统A,系统A处理了业务A后,将处理后的用户请求信息存入MQ,排队进入系统B。好处就是系统A处理完自己的业务后,直接通过MQ把剩余的工作丢出去,然后接收新的用户请求了。注意这是一种异步的操作。

在这里插入图片描述

或者是处理业务A的一部分业务后,先发一个MQ给系统B,然后处理业务A的剩余的其他业务。比如说有人迟到了,则人力系统先记录迟到次数,然后把要扣掉的金额信息发给财务系统进行统计,之后HR系统在继续记录迟到原因等信息。

在这里插入图片描述

本文中的背景就是如此,系统A处理完自己的业务后,发送了一个业务MQ-A消息到系统B,系统B接收到这个消息后进行处理。原文中说:

系统A处理完一笔订单后,会发送MQ,系统B消费系统A的MQ,消费过程中,会去系统A拉取信息,然后更新系统B对应的表信息。

我们可以简单理解为,系统A就是订单系统,系统B就是财务系统,订单系统处理了一部分订单业务后,将处理好的信息通过MQ交给财务系统进行处理。然后继续处理订单的其他业务。

img

现在的问题是,系统A发消息让系统B更新一个叫做cost的字段值,但个别消息处理后发现cost的值为0,本文中的cost是退货流程,cost是不可能为0的,这不正常!

一开始的分析是,经过代码排查发现系统A向系统B不止发送了一次MQ,而是两次!原因是系统A和系统B都有一个cost字段,系统A在初始化自己的cost字段为0后发送tag:create到系统b,让其初始化自己的cost字段,系统A更新自己的cost字段后又发送了tag:update的消息到系统B让其更新自己的cost字段,通常情况下,系统B需要常见两个消费者类来分别处理这两个tag发过来的信息。我们上面说过,因为发送信息是异步的,所以我们无法保证发送顺序以及最终的消费顺序!

可能我们发送初始化cost消息的时候,因为网络延迟发送慢了,让更新cost的先发送过去了。或者是系统B中的针对tag:create的消费者处理慢了,让tag:update的消费者类先处理完了。

在这里插入图片描述

原文中说,原本应该是先初始化cost=0,然后再更新cost=20;结果由于个别消息无法控制执行顺序执行反了,先更新再初始化,导致cost为0!

img

那么如何保证先消费tag:create,在消费tag:update呢?也就是如何保证不同tag下的mq的消息消费顺序呢?这本来就有点扯!既然是不同tag的异步消息,本来在设计的时候就不应该保证消费顺序!但事已如此,也只好用加锁的方式来处理:

img

上面的代码中,先要获得锁,才可以进行update处理。由于只给了代码片段,我简单用图给大家解释下:

简单说就是tag:create消费者类处理完消息后,会向缓存发送一个已经处理的orderID,说明已经初始化结束了,在tag:update执行的时候先去缓存查一下是否有这个orderID,如果有则处理,如果没有则通过自旋的方式每200毫秒查一次。当然,不一定用缓存,检查一下数据库里对应的cost字段是否已经被初始化为0也可以。

在这里插入图片描述

按理说到这里就应该没事了,但是根本不起作用!经过大佬排查,发现是大事务的问题,通过原文图示我们会发现,系统B更新cost字段的时候,根本不是接收系统A发过来的cost数据,而是到系统A的表里查系统A的cost的值,然后同步到自己的表里的cost字段值!

img

由于系统B接收到系统A发过来的消息,并进行处理的时候,系统A的事务是一个大事务,事务没有提交,所以系统A的表中的cost数据并不会发生任何变化!

经过其团队优化后,将发送mq的代码调整到提交事务之后发送,问题解决!

优化之前:

img

优化之后:

img

这也就是为什么大厂不推荐直接使用@Transactional注解来操作事务,就是因为这个注解直接影响整个方法:

@Service
public class MyService {

    @Autowired
    private MyRepository myRepository;

    @Transactional(rollbackFor = Exception.class)
    public void saveDataWithTransaction() {
        // 执行数据库操作,例如保存数据
        //事务代码
        myRepository.saveData();
        myRepository.updateData();
        //事务之外的代码
        //...
        commit;
    }
}

采用手动方式操作注解,可以精确到事务的具体代码行,尽可能的优化大事务:

@Service
public class MyService {
    @Autowired
    private PlatformTransactionManager transactionManager;

    public void saveDataManually() {
        DefaultTransactionDefinition transactionDefinition = new DefaultTransactionDefinition();
        TransactionStatus transactionStatus = transactionManager.getTransaction(transactionDefinition);

        try {
            // 执行数据库操作,例如保存数据
            //事务代码
            myRepository.saveData();
            myRepository.updateData();
            transactionManager.commit(transactionStatus); // 手动提交事务
            //事务之外的代码
            //sendMq 发出消息
            //...
        } catch (Exception e) {
            transactionManager.rollback(transactionStatus); // 手动回滚事务
            throw e;
        }
    }
}

如果方法体内,本来就没有非事务代码,只有一个需要发出mq的方法,那也有解决方案:

参考文章:https://blog.csdn.net/u014427391/article/details/134147899

一种是借助于Spring框架提供的TransactionSynchronizationManager来控制,另外一种方法是借助于Spring框架提供的@TransactionalEventListener来控制事务

TransactionSynchronizationManager:

@Transactional(rollbackFor = Exception.class)
public void register() {
    User user = User.builder()
                .name("管理员")
                .email("123456@qq.com")
                .build();
    userMapper.insert(user);
    // after transaction commit
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
        @Override
        public void afterCommit() {
            // 发送消息给MQ
            sendMQMessage();
        }
    });
}

通过@TransactionalEventListener注解属性中的phase = TransactionPhase.AFTER_COMMIT来设置在事务之后执行。

package com.example.eventlistener.listener;


import cn.hutool.http.HttpRequest;
import cn.hutool.json.JSONUtil;
import com.example.eventlistener.event.SendMsgEvent;
import com.example.eventlistener.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

import javax.annotation.Resource;

@Component
@Slf4j
public class SendMsgListener {

    @Resource
    private UserMapper userMapper;

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT , classes = SendMsgEvent.class)
    public void msg(Event event) {
        //获取监听到的mq消息并处理逻辑
        log.info("sendMsg: {}" , JSONUtil.toJsonStr(event));

    }
}


另外也可以使用mq中的消息确认机制或者事务消息的方式来实现,消息确认机制简单说就是系统A发送给系统b消息后,mq需要等到系统B消费成功并返回一个确认信息,mq才会认定这次消息消费成功。另外的事务消息简单说就是mq内部设置,消息在消费之前如果存在事务,则消息先被缓存,直到事务提交成功才会消费消息。由于原文中并没有说明使用的是哪一种mq,所以这里不做代码与配置阐述了。

最后,我需要补充一点,如果事务执行成功,发送了消息,消息刚要消费,或者消费到一半mq宕机了怎么办:

消息后,mq需要等到系统B消费成功并返回一个确认信息,mq才会认定这次消息消费成功。另外的事务消息简单说就是mq内部设置,消息在消费之前如果存在事务,则消息先被缓存,直到事务提交成功才会消费消息。由于原文中并没有说明使用的是哪一种mq,所以这里不做代码与配置阐述了。

最后,我需要补充一点,如果事务执行成功,发送了消息,消息刚要消费,或者消费到一半mq宕机了怎么办:

在这里插入图片描述


http://www.kler.cn/a/287871.html

相关文章:

  • 记一次OpenEuler Linux磁盘分区表损坏的数据恢复
  • 怎样利用海外云手机进行引流?
  • Vue.js组件开发-使用地图绘制轨迹
  • 二十三种模式-适配器模式
  • 道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展
  • pycharm+pyside6+desinger实现查询汉字笔顺GIF动图
  • Jenkins配置使用LDAP的用户和密码登录
  • 前端【CSDN创作优化3】CSDN自定义模块:解决保存CSDN自定义模块时显示fail
  • 行为型设计模式-中介者(mediator)模式-python实现
  • Docker容器详细介绍
  • 传统CV算法——图像特征算法概述
  • 刷题记录(2)
  • 强化学习实践(一):Model Based 环境准备
  • Java入门:07.Java中的面向对象
  • DRF序列化_data传参
  • AI 通过python脚本自动化导出交易软件某一天的分笔成交明细
  • 0基础轻松玩转.NET Web API 8.0【CICD】项目实战
  • FPGA与高速ADC接口简介
  • 谷粒商城实战笔记-275~276-商城业务-订单服务-订单确认页完成
  • 两阶段最小二乘法2SLS和Heckman两阶段回归Stata代码(附示例数据)
  • Hadoop的HA配置与实现(ZooKeeper)
  • 华为设备默认密码
  • 记一次ssh伪终端修改为shell
  • 3.3 语法规则
  • 设计模式--享元模式
  • 春秋云镜(ZZCMS 2023)·CVE-2023-50104