当前位置: 首页 > article >正文

【RabbitMQ】rabbitmq广播模式的使用

前言:

        项目需要同步另一个系统的数据,对方系统采用MQ的发布/订阅模式方便我们同步数据,即当对方系统中的某条数据修改后,会向绑定他们交换机的每一个队列发布消息。消费者(即我们)监听到消息变动,进行信息消费同步至我们库中。

我们需要做的就是:

        1、创建一个新队列绑定到对方系统的交换机

        2、将监听到的消息进行合理解析,取出消息中的请求头:

              请求头信息为:"R"  ,则代表该生为入学操作;

              请求头信息为:"X"  ,则代表该生为休学操作;

              请求头信息为:"T"  ,则代表该生为退学操作;

        3、接下来根据获取到的请求头内容,来对对方系统传来的数据进行对应操作。

上代码,看思路:

    实现1:

/**
 * @Author: 宁兴星
 * @CreateTime: 2026-01-16  14:05
 * @Description: TODO
 */
@Configuration
public class RabbitMqConfig extends AbstractRabbitMQConfig {


    /**
     * 创建广播模式交换机(扇形)
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EventConstant.STUDENT_EXCHANGE, true, false);
    }

    /**
     * 创建被监听的队列
     */
    @Bean
    public Queue dealerInfoQueue() {
        return new Queue(EventConstant.STUDENT_QUEUE, true, false, false);
    }

    /**
     * 将队列绑定到扇形交换机上,实现广播模式消息接收
     *
     * @param dealerInfoQueue
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding binding(Queue dealerInfoQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(dealerInfoQueue).to(fanoutExchange);
    }

    /**
     * 配置消息监听容器工厂
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setPrefetchCount(10);
        return factory;
    }

}

实现2:

       

    /**
     * MQ监听学生数据变更
     *
     * @param message 消息体
     * @param deliveryTag 消息标识
     * @param channel 通道
     * @throws IOException IO异常
     */
    @RabbitListener(queues = EventConstant.STUDENT_QUEUE)
    @Operation(summary = "MQ监听学生数据变更", description = "MQ监听学生数据变更")
    public void handleMessage(Message message,
                            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                            Channel channel) throws IOException {
        try {
            log.info("同步学生数据,接收到MQ消息: {}", message);
            
            // 解析学生信息
            StudentInfo studentInfo = parseStudentInfo(message);
            log.info("解析后的学生数据: {}", studentInfo);
            
            // 获取操作类型并处理
            String action = getActionFromHeaders(message);
            processStudentAction(action, studentInfo);
            
            // 确认消息处理完成
            channel.basicAck(deliveryTag, false);
            
        } catch (Exception e) {
            log.error("处理学生数据消息异常: ", e);
            // 消息处理失败,重新入队
            channel.basicNack(deliveryTag, false, true);
        }
    }

    /**
     * 解析消息中的学生信息
     */
    private StudentInfo parseStudentInfo(Message message) throws IOException {
        ObjectMapper objectMapper = new ObjectMapper();
        return objectMapper.readValue(message.getBody(), StudentInfo.class);
    }

    /**
     * 从消息头获取action
     */
    private String getActionFromHeaders(Message message) {
        Map<String, Object> headers = message.getMessageProperties().getHeaders();
        String action = headers.get("action").toString();
        log.info("操作类型: {}", action);
        return action;
    }

实现3:

     /**
     * 根据不同action处理学生数据
     */
    private void processStudentAction(String action, StudentInfo studentInfo) {
        if (action == null) {
            return;
        }
        
        switch (action) {
            case EventConstant.LIGHT_UP:
                // 编写对应录取方法,此处省略具体信息
                handleLightUp(studentInfo);
                break;
            case EventConstant.OFFLINE:
                // 编写对应休学方法,此处省略具体信息
                handleOffline(studentInfo);
                break;
            case EventConstant.DELETE:
                // 编写对应退学方法,此处省略具体信息
                handleDelete(studentInfo);
                break;
            default:
                log.warn("未知的操作类型: {}", action);
        }
    }

结束啦,如有错误,敬请雅正!


http://www.kler.cn/a/512088.html

相关文章:

  • 鸿蒙子组件根据数据,刷新item Ui的规范
  • 如何将自己本地项目开源到github上?
  • PHP:从入门到进阶的全方位指南
  • Android Studio:Linux环境下安装与配置
  • Reactor 模式在 Edis、Nginx 和 Netty 中的应用与高性能网络模式解析
  • Redisson发布订阅学习
  • Ubuntu 完整卸载 WPS Office (deb包安装版)
  • 【C++篇】红黑树封装 实现map和set
  • 机器人“大脑+小脑”范式:算力魔方赋能智能自主导航
  • C# OpenCvSharp 部署文档矫正,包括文档扭曲/模糊/阴影等情况
  • 【数据库初阶】MySQL中表的约束(上)
  • Python基于OpenCV和PyQt5的人脸识别上课签到系统【附源码】
  • Navicat 导出表结构后运行查询失败ERROR 1064 (42000): You have an error in your SQL syntax;
  • Vue3轮播图左右联动
  • Astropay之坑
  • IJCAI-2024 | 具身导航的花样Prompts!VLN-MP:利用多模态Prompts增强视觉语言导航能力
  • 【TVM教程】为 ARM CPU 自动调优卷积网络
  • WPF MVVM 模式如何监听IsVisibleChanged 事件
  • AI预测福彩3D采取888=3策略+和值012路+胆码预测2025年1月20日新模型预测第1弹
  • OSCP - Proving Grounds - BullyBox
  • 基于SpringBoot的个人博客系统的设计与实现(源码+SQL脚本+LW+部署讲解等)
  • 单行文本框控件
  • (4)Vue 3 + Vite + Axios + Pinia + Tailwind CSS搭建一个基础框架
  • STL—stack与queue
  • 区块链 智能合约安全 | 回滚攻击
  • 【QT】 控件 -- 按钮类(Button)