当前位置: 首页 > article >正文

rabbitmq——岁月云实战笔记

1 rabbitmq设计

        生产者并不是直接将消息投递到queue,而是发送给exchange,由exchange根据type的规则来选定投递的queue,这样消息设计在生产者和消费者就实现解耦。

        rabbitmq会给没有type预定义一些exchage,而实际我们却应该使用自己定义的。

1.1 用户注册设计

        用户在官网注册,因为官网与其他各子系统是分库的,因此涉及到用户注册后,用户的账号信息也需要同步各子产品,于是就有了下面的设计。2018的时候SOA设计我还用通过otter进行同步,但是现在觉得还不如使用rabbitmq,因为消息队列有很多作用,而且有些情况是,各个子系统承建时间不一样,各自的创建用户后,也会触发其他的操作,这些otter的小表复制策略就不是那么好了。

1.1.1 生产者        

        岁月云官网,可以看到这个里面只需要一个exchange名称即可,将对象转成字符串作为消息发送过去即可。

1.1.2 消费者

        消费者中定义的监听是针对queue,ignoreDeclarationExceptions是幂等设计,可以确保即使某个实例的声明操作失败(例如,因为另一个实例已经成功声明了相同的资源),整个系统仍然可以正常工作。

        fanout是一种广播,绑定到此eayc_user_add_change的queue都可以收到此消息。因为从官网下发的消息,到各子系统都应该收到,并各自创建。

        下面是子系统acc的配置

        具体消费的代码如下所示,

        下面是子系统ps的配置,与acc使用同一个exchange,但queue是不同的。

1.2 死信队列和延时队列

x-message-ttl定义了消息的时间生存期,有了这特性,就可以拓展一些功能,比如高并发的流量控制。

        下面通过x-message-ttl设置了一个延迟队列,通过DECLARE_DEAD_ROUTING_KEY与死信交换机declareDeadExchange进行匹配路由。

@Configuration
public class RabbitMQDelayConfig {

    @Value("${spring.rabbitmq.declare.exchange}")
    private String DECLARE_EXCHANGE;
    @Value("${spring.rabbitmq.declare.queue}")
    private String DECLARE_QUEUE;
    @Value("${spring.rabbitmq.declare.routing}")
    private String DECLARE_ROUTING_KEY;
    @Value("${spring.rabbitmq.declare.deadExchange}")
    private String DECLARE_EXCHANGE_DEAD;
    @Value("${spring.rabbitmq.declare.deadQueue}")
    private String DECLARE_QUEUE_DEAD;
    @Value("${spring.rabbitmq.declare.deadRouting}")
    private String DECLARE_DEAD_ROUTING_KEY;
    @Value("${spring.rabbitmq.declare.ttl}")
    private int DECLARE_TTL;


    /**
     * 申明自动申报业务交换机:
     */
    @Bean
    public DirectExchange declareExchange() {
        return new DirectExchange(DECLARE_EXCHANGE);
    }

    /**
     * 申明自动申报业务死信交换机:
     */
    @Bean
    public DirectExchange declareDeadExchange() {
        return new DirectExchange(DECLARE_EXCHANGE_DEAD);
    }

    /**
     * 申明自动申报业务队列
     * 并绑定死信队列
     */
    @Bean
    public Queue declareQueue() {
        Map<String, Object> arguments = new HashMap<>(3);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange", DECLARE_EXCHANGE_DEAD);
        // 设置死信路由键
        arguments.put("x-dead-letter-routing-key", DECLARE_DEAD_ROUTING_KEY);
        // 设置过期时间
        arguments.put("x-message-ttl", DECLARE_TTL);
        return new Queue(DECLARE_QUEUE, true, false, false, arguments);
    }

    /**
     * 申明自动申报业务死信队列
     */
    @Bean
    public Queue declareDeadQueue() {
        return new Queue(DECLARE_QUEUE_DEAD);
    }

    /**
     * 绑定交换机和队列
     */
    @Bean
    public Binding declareQueueBinding(@Qualifier("declareQueue") Queue queue, @Qualifier("declareExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DECLARE_ROUTING_KEY);
    }

    /**
     * 绑定死信交换机和死信队列
     */
    @Bean
    public Binding declareDeadQueueBinding(@Qualifier("declareDeadQueue") Queue queue, @Qualifier("declareDeadExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DECLARE_DEAD_ROUTING_KEY);
    }


}

        生产者只需要往业务的exchange投递消息即可

// 发送一条消息到rabbitmq延时队列中,处理申报流程超时的情况
            message = new HashMap<>();
            message.put("dataId", taxDeclareDto.getDataId());
            message.put("batchId", req.getBatchId());
            rabbitTemplate.convertAndSend(DECLARE_EXCHANGE, DECLARE_ROUTING_KEY, gson.toJson(message));

         异常情况是监听死信队列,处理对应的逻辑。

   /**
     * 监听消息队列,处理申报流程超时的申请记录
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "${spring.rabbitmq.declare.deadQueue}"),
            exchange = @Exchange(name = "${spring.rabbitmq.declare.deadExchange}")))
    @RabbitHandler
    public void declareTimeout(Message message){
        logger.info("收到rabbitMq申报超时消息:{}", message);
        Map<String, String> map = gson.fromJson((String) message.getPayload(), Map.class);
        if(CheckEmptyUtil.isNotEmpty(map.get("batchId"))) {
            // 如果是批量申报超时,中断批次中所有的申报中的请求
            interruptDeclaresInBatch(map.get("batchId"));
        } else {
            String dataId = map.get("dataId");
            updateTaxDeclareStatus(new TaxDeclareYearStatusUpdateReq(dataId, null,false, StatementConstants.DeclareMessage.TIMEOUT));
        }
    }

1.3 重复消费

        如果根据高内聚低耦合的设计原则,消费者侧应该作重复消费设计,这个问题并不只是rabbitmq的问题,因为只要出现数据重复推送的问题,就会有重复消费的问题。比如有第三方系统定时同步数据到自己的系统,这个同步数据是由第三方承建的,你无法进行约束,必须在自己的系统进行幂等设计。

        springboot默认使用tomcat作为servlet容器,servlet容器使用线程池管理http请求,而controller和service都是单例,是线程不安全的,因此在接收到重复数据的请求时,如果其程序再新启动了异步线程,就会出现重复的情况,如下所示:

        主线程接收消息,做一些转换,然后执行交给异步线程处理。

 @PostMapping("/xx/batchSync")
    public ResponseResult xxBatchSync(@RequestBody CommonRequest commonRequest) {
        log.info("销项发票同步请求:{}",commonRequest.getInfo());
        XxBatchSyncReq xxBatchSyncReq = JsonUtil.toPojo(commonRequest.getInfo(),XxBatchSyncReq.class);
        String zyCompanyId = xxBatchSyncReq.getZyCompanyId();
        if (!CheckEmptyUtil.isEmpty(xxBatchSyncReq.getInvoices())){
            // 账套信息
            Integer asId = accAccountSetService.selectByZyCompanyId(xxBatchSyncReq.getZyCompanyId());
            if (asId==null){
                throw new RuntimeException(String.format("账套信息不存在,企业id:%s",xxBatchSyncReq.getZyCompanyId() ));
            }
            // 异步写入发票数据
            accInvoice4ZYService.xxBatchSync(asId,xxBatchSyncReq);
        }
        return new ResponseResult(true,"销项发票接收成功");
    }

        异步线程的逻辑如下,accInvoiceService.isExist看似基础逻辑没有问题,但是在多线程环境下会有问题,因为线程A添加进入到addInvoice方法添加发票的时候还没有提交,这个时候线程B执行accInvoiceService.isExist的时候判断已经是不存在的,于是他依旧会向下执行。导致出现数据重复写入。由此判断这个重复消费问题并不是消息队列独有的,还是业务处理的问题

    @Override
    @Async("loadDataExecutor")
    public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {
        // 发票模板
        AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);
        for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){
            xxInvoiceDto.setAsId(asId);
            if (!accInvoiceService.isExist(asId,xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){
                AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, xxBatchSyncReq.getZyCompanyId(),accInvoiceTemplate);
                addInvoice(accInvoiceDto);
            }
        }
    }

        再看事务逻辑愿望是美好的,接收到批量发票,然后一张张提交。这里就很有问题,

    @Override
    @Transactional
//    @RedisReentrantLock(key = "'acc_invoice_lock_'+#accInvoiceDto.asId")
    public void addInvoice(AccInvoiceDto accInvoiceDto) {
        // 保存发票头
        accInvoiceService.save(accInvoiceDto);
        Integer invoiceId = accInvoiceDto.getId();

        // 保存发票明细信息
        List<AccInvoiceDetail> accInvoiceDetails = accInvoiceDto.getAccInvoiceDetails();
        accInvoiceDetails.stream().forEach(accInvoiceDetail -> {
            accInvoiceDetail.setInvoiceId(invoiceId);
        });
        accInvoiceDetailService.saveBatch(accInvoiceDetails);
    }

         代码作如下调整,下面的代码依然会有问题,

    @Override
    @Async("loadDataExecutor")
    public void xxBatchSync(Integer asId, XxBatchSyncReq xxBatchSyncReq) {
        // 发票模板
        AccInvoiceTemplate accInvoiceTemplate = accInvoiceTemplateService.selectOne(asId, InvoiceConstants.InvoiceTemplateType.SALES);
        for (XxInvoiceDto xxInvoiceDto:xxBatchSyncReq.getInvoices()){
            xxInvoiceDto.setAsId(asId);
            addInvoice(xxBatchSyncReq.getZyCompanyId(),xxInvoiceDto,accInvoiceTemplate);
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void addInvoice(String zyCompanyId,XxInvoiceDto xxInvoiceDto,AccInvoiceTemplate accInvoiceTemplate){
        if (!accInvoiceService.isExist(xxInvoiceDto.getAsId(),xxInvoiceDto.getFpdm(),xxInvoiceDto.getFphm())){
            AccInvoiceDto accInvoiceDto = getAccInvoiceDto(xxInvoiceDto, zyCompanyId,accInvoiceTemplate);
            addInvoice(accInvoiceDto);
        }
    }

        用MySQL来模拟一下,就可以看到问题。

        另起一个事务,因为判断还是不存在,依旧写入进去,导致数据重复。那么为什么呢?Mysql的Repeatable Read事务隔离级别,不会出现脏读、不会出现不可重复读,而间隙锁又解决了幻读的问题,但这个业务问题却需要自己认为去处理。

         解决方案最简的办法就是设置唯一键索引。另外一种办法,可以参考redis——岁月云实战,我们也可以采取加分布式锁的方式来控制数据操作。

2 线上问题

2.1 内存设置问题

k8s部署rabbitmq集群,搭建环境后登录web控制台发现内存飘红。进入到rabbitmq容器中,发现vm_memory_high_watermark.absolute = 100MB,这个就是从其他复制过来没有经过大脑的原因。这个值应该是按照Pod中设置最大内存的75%进行设置

        调整为3GB后,恢复正常。


http://www.kler.cn/a/471554.html

相关文章:

  • 【Qt】QtConcurrent
  • CANN 学习——基于香橙派 KunpengPro(1)
  • Leffa 虚拟试衣论文笔记
  • 『SQLite』常见日期时间函数的使用
  • 在K8S上部署OceanBase的最佳实践
  • C#高级:递归4-根据一颗树递归生成数据列表
  • C# async和await
  • Dexcap复现代码数据预处理全流程(四)——demo_clipping_3d.py
  • 东土科技参股广汽集团飞行汽车初创公司,为低空经济构建新型产业生态
  • 面向对象的思维hong
  • Ubuntu22.04 离线安装:gcc、make、dkms、build-essential
  • 模式识别-Ch3-贝叶斯估计
  • QT给端口扫描工程增加线程2
  • 【JavaWeb】2. 通用基础代码
  • ubuntu为Docker配置代理
  • 永磁同步电机模型预测控制——模型预测研究现状
  • ChatGPT在数据分析与处理中的使用详解
  • 在 a-tree 中报错 parent 期望是对象,但获得是字符串
  • PyTorch 框架实现线性回归:从数据预处理到模型训练全流程
  • vue 导出excel接口请求和axios返回值blob类型处理
  • Go语言的数据库交互
  • 最新版Chrome浏览器加载ActiveX控件之SolidWorks 3D控件
  • EasyExcel.read读取 Excel 文件
  • 第 24 章 网络请求与远程资源
  • WELL健康建筑认证在2025年相关消息
  • 网络安全主动防御技术与应用