当前位置: 首页 > article >正文

微服务: 06-rabbitmq设置延时队列,场景与使用(springboot)

全文目录,一步到位

  • 1.前言简介
    • 1.1 专栏传送门
      • 1.1.1 上文小总结
      • 1.1.2 上文传送门
  • 2. rabbitmq延时队列
      • 2.1.0 查看docker启动的服务(检查rabbitmq)
      • 2.1.1 延时队列的作用
      • 2.1.2 业务中需要使用的场景
    • 2.2 延时队列的准备工作
      • 2.2.0 查看docker
      • 2.2.1 升级rabbitmq, 装入延时插件
      • 2.2.2 将插件放在宿主机的一个位置中
      • 2.2.3 复制宿主机插件到容器
      • 2.2.4 进入rabbitmq的容器内部
      • 2.2.5 检查一下rabbitmq控制台的交换机
    • 2.3 springboot中使用延时队列
      • 2.3.1 创建配置类
      • 2.3.2 发送json信息
      • 2.3.3 查看交换机是否生成(运行后)
    • 2.4 消费者如何消费
      • 2.4.1 正常消费 `@RabbitListener(queues = *)`
      • 2.4.2 什么情况会进入死信
  • 3. 文章的总结与预告
    • 3.1 本文总结
    • 3.2 下文预告


1.前言简介

1.1 专栏传送门

=> 分布式必备服务与配置

1.1.1 上文小总结

前面00-05中 分别提到了rabbitmq的基础部署 配置 使用 以及死信队列的使用
本篇呢 介绍新的队列 延时队列

1.1.2 上文传送门

微服务: 04-springboot中rabbitmq的yml或properties配置,消息回收,序列化方式

2. rabbitmq延时队列

2.1.0 查看docker启动的服务(检查rabbitmq)

docker ps
docker ps -a
没有rabbitmq的可以看专栏前面rabbitmq的00-05即可
在这里插入图片描述

2.1.1 延时队列的作用

延时队列,从名字就可以看出,队列里面的消息会延时一会,也就是等一会才会被消费。
最经典的就是电商中下订单后不支付。
通常,我们会设定一个时间,比如 30 分钟内如果不支付,订单就自动取消。这个功能就可以通过延时队列来实现,下订单后,马上向延时队列发送一条消息,并且设置延迟时间为 30 分钟。然后等 30 分钟之后,消费者开始消费这条消息,可以简单的判断一下比如订单还是未支付状态,就把订单状态改为关闭的。

2.1.2 业务中需要使用的场景

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

2.2 延时队列的准备工作

2.2.0 查看docker

在这里插入图片描述

2.2.1 升级rabbitmq, 装入延时插件

rabbitmq_delayed_message_exchange-3.12.0.ez
插件: => github插件地址:
插件位置: => csdn0积分下载(永久免费)

2.2.2 将插件放在宿主机的一个位置中

我这里创建的路径是: /usr/local/src/docker/rabbitmq/plugins
在这里插入图片描述

2.2.3 复制宿主机插件到容器

先检查rabbitmq容器是否启动后

docker ps -a

然后复制宿主机路径文件到容器插件内

# 将插件拷贝到容器内部的插件目录
docker cp /usr/local/src/docker/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.12.0.ez rabbit:/plugins

2.2.4 进入rabbitmq的容器内部

docker exec -it rabbitmq bash
启动rabbitmq的插件
然后重启

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq

2.2.5 检查一下rabbitmq控制台的交换机

找到x-delayed-message类型 则为成功
在这里插入图片描述

2.3 springboot中使用延时队列

2.3.1 创建配置类

配置死信队列和延时队列 创建延时交换机和延时队列

@Configuration
public class RabbitConfig {
    
    @Autowired
    private SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory;

    @Autowired
    private RabbitCallbackConfig rabbitCallbackConfig;
    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //消息到不到队列 自动重新返回到生产者
        rabbitTemplate.setMandatory(true);//其实前面配置加了
        rabbitTemplate.setConfirmCallback(rabbitCallbackConfig);
        rabbitTemplate.setReturnCallback(rabbitCallbackConfig);
        // 使用 JSON 序列化与反序列化
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());

        return rabbitTemplate;
    }

    @Bean
    public DirectExchange deadDirectExchange() {
        return ExchangeBuilder.directExchange(RabbitmqConstant.DEAD_LETTER_EXCHANGE).build();
    }

    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(RabbitmqConstant.*_DIVIDE_DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Queue deadLetterQueue, DirectExchange deadDirectExchange) {
        return BindingBuilder.bind(deadLetterQueue).to(deadDirectExchange).with("*DeadKey");
    }

//------------------------------>
    /**
     * 延时队列交换机
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("*_exchange", "x-delayed-message", true, false, args);
    }

    /**
     * 延时队列
     */
    @Bean
    public Queue delayQueue() {
//        return new Queue(RabbitConstant.DELAY_QUEUE, true);
        Map<String, Object> arguments = new HashMap<>(3 );
        arguments.put("x-dead-letter-exchange",RabbitmqConstant.DEAD_LETTER_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","*DeadKey");
        arguments.put("x-max-length",2000);
//        arguments.put("x-message-ttl", 100000 );

        return QueueBuilder.durable(RabbitmqConstant.*_DIVIDE_QUEUE).withArguments(arguments).build( );
    }

    /**
     * 给延时队列绑定交换机
     */
    @Bean
    public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("*Key").noargs();
    }


    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory) {
        return args -> connectionFactory.createConnection().close();
    }

2.3.2 发送json信息

.setHeader("x-delay", 10000) //毫秒 代表10秒后发送消息

Message msg= new Message(1,"天气好");//随便的一个实体类

    rabbitTemplate.convertAndSend(
                "*_*_exchange",
                "*Key",
                dto,
                message -> {
                    message.getMessageProperties().setHeader("x-delay", 10000);
                    return message;
                }
        );

2.3.3 查看交换机是否生成(运行后)

测试服务器中的mq交换机 找到x-delayed-message死信交换机 (名得相同)则成功
报错(如重名等, 手动删除后自动创建即可), 可以看前面00-05文章
在这里插入图片描述

2.4 消费者如何消费

2.4.1 正常消费 @RabbitListener(queues = *)

    @RabbitListener(queues = RabbitmqConstant.*_DIVIDE_QUEUE)
    public void collect(*DivideDto *DivideDto, Channel channel, Message message) throws IOException {
        // goods coupons persons
        log.info("获取到mq消息,消息内容为{}", *DivideDto);
        log.info("获取到mq---mq消息,消息内容为{}", message);
    }

2.4.2 什么情况会进入死信

随机刨除异常 throw new RuntimeException(“测试死信回滚”);
前面文章有介绍 这里不过多介绍 假设设置参数5次报错 进入死信
测试服务器的mq所示
> 在

3. 文章的总结与预告

3.1 本文总结

  1. 延时队列的使用
  2. 为什么要使用延时队列
  3. 升级rabbitmq,加入延时队列插件
  4. springboot中使用延时队列, 查看控制台信息
  5. 接收延时队列, 接收成功为成功
  6. 测试异常情况, 多次报错进入死信队列 单独处理
  7. 注意多次重复消费问题, 消费者进行处理, 特殊业务 请使用单一消费者解决即可

3.2 下文预告

rocketmq的相关代码操作



@author: pingzhuyan
@description: ok
@year: 2024


http://www.kler.cn/a/452500.html

相关文章:

  • 面经zhenyq
  • C++模板:编译时模拟Duck Typing
  • 2024-12-24 NO1. XR Interaction ToolKit 环境配置
  • LeetCode:257. 二叉树的所有路径
  • Kafka可视化工具 Offset Explorer (以前叫Kafka Tool)
  • 基于云计算的资源管理系统
  • 图神经网络_图嵌入_SDNE
  • 【Rust自学】6.4. 简单的控制流-if let
  • 【每日学点鸿蒙知识】指纹识别隐藏背面、数组内部值变化刷新UI、键盘输入类型、跨组件路由、C++20特性支持
  • 智谱BigModel研习社|搭建 AI 搜索引擎 - 使用免费的Web-Search-Pro+脑图Agent智能体
  • Gmsh有限元网格剖分(Python)---点、直线、平面的移动
  • STL 六大组件
  • 私域电商逆袭密码:AI 智能名片小程序与商城系统如何梦幻联动
  • 数据结构--堆的向上调整和向下调整
  • excel中如何筛选一列中重复的内容,相同的内容只保留一次
  • 2、C#基于.net framework的应用开发实战编程 - 设计(二、三) - 编程手把手系列文章...
  • Golang的发展历程
  • MFC/C++学习系列之简单记录11——树控件的使用
  • Java中以某字符串开头且忽略大小写字母如何实现【正则表达式(Regex)】
  • 工厂常用软件系统大全中英文全称对照表及功能介绍应用场景ERP MES WMS SCADA IOT SAP等软件系统介绍
  • 【从零开始入门unity游戏开发之——C#篇29】C#泛型(T)和 泛型约束
  • 前端Python应用指南(三)Django vs Flask:哪种框架适合构建你的下一个Web应用?
  • 在 CentOS 8 系统上安装 Jenkins 的全过程
  • uniapp 项目基础搭建(vue2)
  • Python PDF转换工具箱(PDF转图片,word,拆分,删除,提取)
  • HarmonyOS NEXT 实战之元服务:静态案例效果---查看国际航班服务