当前位置: 首页 > article >正文

RabbitMQ深度探索:消息幂等性问题

  1. RabbitMQ 消息自动重试机制:
    1. 让我们消费者处理我们业务代码的时候,如果抛出异常的情况下,在这时候 MQ 会自动触发重试机制,默认的情况下 RabbitMQ 时无限次数的重试
    2. 需要认为指定重试次数限制问题
  2. 在什么情况下消费者实现重试策略:
    1. 消费者调用第三方接口,但是调用第三方接口失败后,需要实现重试策略,网络延迟只是暂时调不通,重试多次有可能会调通
    2. 消费者获取代码后,因为代码问题抛出数据异常,此时不需要实现重试策略
      1. 我们需要将日志存放起来,后期通过定时任务或者人工补偿形式
      2. 如果是重试多次还是失败消息,需要重新发布消费者版本实现消费
      3. 可以使用死信队列
    3. MQ 在重试的过程中,可能会引发消费者重复消费的问题
    4. MQ 消费者需要解决幂等性问题
      1. 幂等性:保证数据唯一
  3. 解决幂等性问题:
    1. 生产者在投递消息的时候,生成一个唯一 id 放在我们消息中
    2. 消费者获取到该消息,可以根据全局唯一 id 实现去重
    3. 全局唯一 id 根据业务来定的,订单号码作为全局的 id 
    4. 实际上还是需要在 DB 层面解决数据防重复
    5. 业务逻辑是在做 insert 操作使用唯一主键约束
    6. 业务逻辑是在做 update 操作,使用乐观锁
      1. 当消费者业务逻辑代码中抛出异常自动实现重试(默认是无数次重试)
      2. 应该对 RabbitMQ 重试次数实现限制,比如最多重试 5 次,每次间隔 30 秒
      3. 重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿
  4. 如何选择消息重试:
    1. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败后是否要重试?
    2. 消费者获取消息后,如果代码问题抛出数据异常,是否需要重试?
    3. 总结:
      1. 如果消费者处理消息时,因为代码原因抛出异常是需要重新发布版本才能解决,就不要重试
      2. 存放到死信队列或者是数据库记录、后期人工实现补偿
  5. 实现:
    1. yml 文件:
      spring:
        rabbitmq:
          ####连接地址
          host: 127.0.0.1
          ####端口号
          port: 5672
          ####账号
          username: guest
          ####密码
          password: guest
          ### 地址
          virtual-host: boyatopVirtualHost
          listener:
            simple:
              retry:
                #开启消费者进行重试(程序异常的情况)
                enabled: true
                #最大重试次数
                max-attempts: 5
                #重试间隔时间
                initial-interval: 3000
                #手动确认机制
                acknowledge-mode: manual
        datasource:
          url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
          username: root
          password: root
          driver-class-name: com.mysql.jdbc.Driver
      
      boyatop:
        #备胎交换机
        dlx:
          exchange: boyatop_dlx_exchange
          queue: boyatop_dlx_queue
          routingKey: dlx
        #普通交换机
        order:
          exchange: boyatop_order_exchange
          queue: boyatop_order_queue
          routingKey: order
    2. 配置类:
      @Component
      public class IdempotentExchangeConfig {
          //交换机
          @Value("${boyatop.order.exchange}")
          private  String order_exchange;
      
          //普通队列
          @Value("${boyatop.order.queue}")
          private String order_queue;
      
          //普通队列的 key
          @Value("${boyatop.order.routingKey}")
          private String order_rotingKey;
      
          //死信交换机
          @Value("${boyatop.dlx.exchange}")
          private String dlx_exchange;
      
          //死信队列
          @Value("${boyatop.dlx.queue}")
          private String dlx_queue;
      
          //死信队列的 key
          @Value("${boyatop.dlx.routingKey}")
          private String dlx_routingKey;
      
      
          //定义死信交换机
          @Bean
          public DirectExchange dlxExchange(){
              return new DirectExchange(dlx_exchange);
          }
      
          //定义死信队列
          @Bean
          public Queue dlxQueue(){
              return new Queue(dlx_queue);
          }
      
          //定义普通交换机
          @Bean
          public DirectExchange orderExchange(){
              return new DirectExchange(order_exchange);
          }
      
          //定义普通队列
          @Bean
          public Queue orderQueue(){
              //订单队列绑定死信交换机
              Map<String,Object> arguments = new HashMap<>(2);
              arguments.put("x-dead-letter-exchange",dlx_exchange);
              arguments.put("x-dead-letter-routing-key",dlx_routingKey);
              return new Queue(order_queue,true,false,false,arguments);
      //        return QueueBuilder.durable(order_queue).withArguments(arguments).build();
          }
      
      
          //订单队列绑定交换机
          @Bean
          public Binding bindingOrderExchange(DirectExchange orderExchange, Queue orderQueue){
              return BindingBuilder.bind(orderQueue)
                      .to(orderExchange)
                      .with(order_rotingKey);
          }
      
          //死信队列绑定交换机
          @Bean
          public Binding bindingDlxExchange(DirectExchange dlxExchange, Queue dlxQueue){
              return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlx_routingKey);
          }
      
      }
    3. 实体类:
      @Data
      @NoArgsConstructor
      public class OrderEntity implements Serializable {
          private Integer id;
          private String orderName;
          private String orderId;
      
          public OrderEntity(String orderName, String orderId) {
              this.orderName = orderName;
              this.orderId = orderId;
          }
      }
    4. Mapper:
      public interface OrderMapper {
      
          @Insert("INSERT into order_entity value (null,#{orderName},#{orderId})")
          int addOrder(OrderEntity orderEntity);
      
          @Select("select * from order_entity where order_id = #{orderId} ")
          OrderEntity getOrder(String orderId);
      }
    5. 生产者:
      @Component
      @Slf4j
      public class OrderProducer {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          @Value("${boyatop.order.exchange}")
          private  String order_exchange;
      
          //普通队列的 key
          @Value("${boyatop.order.routingKey}")
          private String order_rotingKey;
      
      
          public void sendMsg(String orderName,String orderId){
              OrderEntity orderEntity = new OrderEntity(orderName,orderId);
              rabbitTemplate.convertAndSend(order_exchange,order_rotingKey,orderEntity,message -> {
                  message.getMessageProperties().setExpiration("5000");
                  return message;
              });
          }
      }
    6. 消费者:
      @Component
      @Slf4j
      @RabbitListener(queues = "boyatop_order_queue")
      public class OrderConsumer {
      
          @Autowired
          private OrderMapper orderMapper;
      
          @RabbitHandler
          public void process(OrderEntity orderEntity, Message message, Channel channel){
              try{
                  String orderId = orderEntity.getOrderId();
                  if(StringUtils.isEmpty(orderId)){
                      return;
                  }
      
                  OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
                  if(dbOrderEntity != null){
                      //出现异常,消息拒收,进入死信队列人为处理
                      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                  }
      
                  int result = orderMapper.addOrder(orderEntity);
                  //出现异常
                  int i = 1 / 0;
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                  System.out.println("监听内容:" + orderEntity);
              }catch (Exception e){
                  // 记录该消息日志形式  存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿
                  //将该消息存放到死信队列中,单独写一个死信消费者实现消费。
              }
          }
      }

http://www.kler.cn/a/533500.html

相关文章:

  • 视频融合平台EasyCVR无人机场景视频压缩及录像方案
  • 【C++】多态详细讲解
  • 基于微信小程序的校园水电费管理平台设计与实现
  • 在Mapbox GL JS中“line-pattern”的使用详解
  • 最大矩阵的和
  • 51单片机看门狗系统
  • MongoDB 查询文档
  • 哈夫曼树原理及其C语言实现
  • 时间对象管理相关
  • gesp(C++六级)(13)洛谷:P11375:[GESP202412 六级] 树上游走
  • 因果推断与机器学习—可解释性、公平性和因果机器学习
  • go运算符
  • Redis缓存穿透、击穿、雪崩介绍以及解决方案
  • vscode 设置在编辑器的标签页超出可视范围时自动换行(workbench.editor.wrapTabs)
  • SpringBoot 基于个性化定制的智慧校园管理系统设计与开发 - 论文、开题报告
  • 搭建Python环境:为量化交易做准备
  • Linux之安装MySQL
  • Oh3.2项目升级到Oh5.0(鸿蒙Next)具体踩坑记录二
  • 正则表达式详细介绍
  • 题解:洛谷 P1744 采购特价商品
  • 算法随笔_39: 最多能完成排序的块_方法2
  • embeddingbag词袋
  • 协议的种类
  • RNN/LSTM/GRU 学习笔记
  • java进阶知识点
  • 软件测试丨PyTorch 图像目标检测