当前位置: 首页 > article >正文

业务幂等性技术架构体系之消息幂等深入剖析

在系统中当使用消息队列时,无论做哪种技术选型,有很多问题是无论如何也不能忽视的,如:消息必达、消息幂等等。本文以典型的RabbitMQ为例,讲解如何保证消息幂等的可实施解决方案,其他MQ选型均可参考。

一、消息重试机制

消息队列的消息幂等性,主要是由MQ重试机制引起的。因为消息生产者将消息发送到MQ-Server后,MQ-Server 会将消息推送到具体的消息消费者。假设由于网络抖动或出现异常时,MQ-Server根据重试机制就会将消息重新向 消息消费者推送,造成消息消费者多次收到相同消息,造成数据不一致。

在RabbitMQ中,消息重试机制是默认开启的,但只会在consumer出现异常时,才会重复推送。在使用中,异常 的出现有可能是由于消费方又去调用第三方接口,由于网络抖动而造成异常,但是这个异常有可能是暂时的。所以 当消费者出现异常,可以让其重试几次,如果重试几次后,仍然有异常,则需要进行数据补偿。

消费重试配置

# 消费者监听相关配置 
    listener: 
        simple: 
            retry: 
            # 开启消费者(程序出现异常)重试机制,默认开启并一直重试 
            enabled: true 
            # 最大重试次数 
            max‐attempts: 5 
            # 重试间隔时间(毫秒) 
            initial‐interval: 3000

当consumer消息监听类中添加异常,最终接受消息时,可以发现,消息在接收五次后,最终出现异常。

二、消息幂等解决

以上就是我们的消息幂等解决的架构图

下面看代码实现

修改OrderController
/**
     * 此处为了方便演示,不做基础添加数据库操作
     * @return
     */
@PostMapping("/addOrder")
public String addOrder(){

    String uniqueKey = String.valueOf(idWorker.nextId());

    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setMessageId(uniqueKey);
    messageProperties.setContentType("text/plain");
    messageProperties.setContentEncoding("utf-8");
    Message message = new Message("1271700536000909313".getBytes(),messageProperties);
    rabbitTemplate.convertAndSend(RabbitMQConfig.REDUCE_STOCK_QUEUE,message);

    return "success";
}
新增消息监听类
@Component
public class ReduceStockListener {

    @Autowired
    private StockService stockService;

    @Autowired
    private JedisPool jedisPool;

    @Autowired
    private StockFlowService stockFlowService;

    @RabbitListener(queues = RabbitMQConfig.REDUCE_STOCK_QUEUE)
    @Transactional
    public void receiveMessage(Message message){

        //获取消息id
        String messageId = message.getMessageProperties().getMessageId();

        Jedis jedis = jedisPool.getResource();

        System.out.println(messageId);
        try {

            //redis锁去重校验
            if (!"OK".equals(jedis.set(messageId, messageId, "NX", "PX", 300000))){
                System.out.println("重复请求");
                return;
            }

            //mysql状态校验
            if (!(stockFlowService.findByFlag(messageId).size() == 0)){
                System.out.println("数据已处理");
                return;
            }

            String goodsId = null;
            try {
                //获取消息体中goodsId
                goodsId = new String(message.getBody(),"utf-8");
                stockService.reduceStock(goodsId,messageId);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }

            int nextInt = new Random().nextInt(100);
            System.out.println("随机数:"+nextInt);
            if (nextInt%2 ==0){
                int i= 1/0;
            }


        } catch (RuntimeException e) {
            //解锁
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            jedis.eval(script, Collections.singletonList(messageId), Collections.singletonList(messageId));
            System.out.println("出现异常了");
            System.out.println(messageId+":释放锁");
            throw e;
        }
    }
}

生产者的死信队列投递自行完成


http://www.kler.cn/a/504947.html

相关文章:

  • mayavi -> python 3D可视化工具Mayavi的安装
  • VM(虚拟机)和Linux的安装
  • “AI 自动化效能评估系统:开启企业高效发展新征程
  • 51_Lua面向对象编程
  • wordpress 房产网站筛选功能
  • OpenCV基础:矩阵的创建、检索与赋值
  • flutter 安卓端打包
  • Java 如何只测试某个类或方法:Maven与IntelliJ IDEA的不同方法及注意事项
  • iOS - TLS(线程本地存储)
  • 40,【5】CTFHUB WEB SQL 时间盲注
  • 跨境電商防關聯指紋流覽器Linken Sphere使用教程
  • vscode配置opencv4.8环境
  • Open FPV VTX开源之嵌入式OSD配置
  • extends配置项详解
  • 深度学习中的模块复用原则(定义一次还是多次)
  • C语言数据结构编程练习-用指针创建顺序表,进行创销和增删改查操作
  • 屏幕轻触间:触摸交互从 “感知” 到 “智算” 的隐秘路径
  • 爬虫案例:python爬取京东商品数据||京东商品详情SKU价格
  • OpenSeaOtter使用手册-项目简介
  • # MyBatis 基础了解
  • camera 配置预览和拍照streams上报的可用尺寸列表
  • DevOps实用场景:在哪些业务中应用DevOps最有效
  • selenium操作指南,2万字总结
  • 【力扣Hot100】双指针
  • Linux磁盘存储与内存管理命令
  • 【C++学习篇】红黑树 从入门到进阶