当前位置: 首页 > article >正文

RabbitMQ深度探索:SpringBoot 整合 RabbitMQ

  1. 需创建复合项目
  2. 父工程 Maven 依赖:
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
      </parent>
    
    
    <!--  父工程要打成 pom 包-->
      <groupId>com.qcby</groupId>
      <artifactId>springboot-rabbitmq</artifactId>
      <packaging>pom</packaging>
      <modules>
        <module>producer</module>
        <module>sms-consumer</module>
      </modules>
      <version>1.0-SNAPSHOT</version>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
    
        <!-- springboot-web组件 -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.49</version>
        </dependency>
    
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
        </dependency>
    
      </dependencies>
  3. 生产者工程代码:
    1. 配置类:
      package com.qcby.config;
      
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.FanoutExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      import org.springframework.stereotype.Component;
      
      
      @Component
      public class RabbitMQConfig {
      
          //定义交换机
          private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";
      
          //定义短信队列
          private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
      
          //定义邮件队列
          private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
      
          //配置邮件队列
          @Bean
          public Queue emailQueue(){
              return new Queue(FANOUT_EMAIL_QUEUE);
          }
      
          //配置消息队列
          @Bean
          public Queue smsQueue(){
              return new Queue(FANOUT_SMS_QUEUE);
          }
      
          //配置交换机
          @Bean
          public FanoutExchange fanoutExchange(){
              return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
          }
      
          //将队列绑定交换机
          @Bean
          public Binding bindingSmsFanoutExchange(Queue smsQueue,FanoutExchange fanoutExchange){
              return BindingBuilder.bind(smsQueue).to(fanoutExchange);
          }
      
          @Bean
          public Binding bindingEmailFanoutExchange(Queue emailQueue,FanoutExchange fanoutExchange){
              return BindingBuilder.bind(emailQueue).to(fanoutExchange);
          }
      }
    2. 消息实体类:
      package com.qcby.entity;
      
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      
      import java.io.Serializable;
      
      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class MsgEntity implements Serializable {
          private String MsgId;
          private String UserId;
          private String phone;
          private String email;
      }
    3. 生产者代码:
      package com.qcby.controller;
      
      import com.qcby.entity.MsgEntity;
      import org.springframework.amqp.core.AmqpTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      import java.util.UUID;
      
      @RestController
      public class FanoutProducer {
          @Autowired
          private AmqpTemplate amqpTemplate;
      
          /**
           * 发送消息
           *
           * @return
           */
          @RequestMapping("/sendMsg")
          public String sendMsg() {
              /**
               * 1.交换机名称
               * 2.路由key名称
               * 3.发送内容
               */
              MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),"22","12345","edddd");
              amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);
              return "success";
          }
      }
    4. yml 文件:
      spring:
        rabbitmq:
          ####连接地址
          host: 127.0.0.1
          ####端口号
          port: 5672
          ####账号
          username: guest
          ####密码
          password: guest
          ### 地址
          virtual-host: boyatopVirtualHost
  4. 消费者工程代码:
    1. 实体类:
      package com.qcby.entity;
      
      import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;
      
      import java.io.Serializable;
      
      @Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class MsgEntity implements Serializable {
          private String MsgId;
          private String UserId;
          private String phone;
          private String email;
      }
    2. 监听队列:
      package com.qcby.controller;
      
      import com.qcby.entity.MsgEntity;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      @Slf4j
      @Component
      @RabbitListener(queues = "fanout_sms_queue")
      public class smsController {
      
          @RabbitHandler
          public void listen(MsgEntity msgEntity){
              log.info("email:" + msgEntity);
              System.out.println(msgEntity);
          }
      }
    3. yml 文件:
      spring:
        rabbitmq:
          ####连接地址
          host: 127.0.0.1
          ####端口号
          port: 5672
          ####账号
          username: guest
          ####密码
          password: guest
          ### 地址
          virtual-host: boyatopVirtualHost
      server:
        port: 8081

生产者如何获取消费结果:

  1. 根据业务来定:
    1. 消费者消费成功结果:能够在数据库中插入一条数据
  2. Roketmq 自带全局消息 id,能够根据该全局消息获取消费结果
    1. 异步返回一个全局 id,前端使用 ajax 定时主动查询
    2. 在roketmq 中,自带根据消息 id 查询是否消费成功
  3. 原理:
    1. 生产者投递消息 MQ  服务器,NQ 服务器端在这时候返回一个全局消息 id ,当消费者消费该消息之后,消费者会给我们 MQ 服务器发送通知,标识该消息消费成功
    2. 生产者获取到该消息全局 id,每隔 2s 时间调用 MQ 服务器接口查询是否有被消费成功

http://www.kler.cn/a/532544.html

相关文章:

  • 洛谷P11655「FAOI-R5」Lovely 139
  • 当WebGIS遇到智慧文旅-以长沙市不绕路旅游攻略为例
  • 深度解读 Docker Swarm
  • Linux - 进程间通信(3)
  • SSRF 漏洞利用 Redis 实战全解析:原理、攻击与防范
  • C++ 自定义字面量
  • WordPress自定义.js文件排序实现方法
  • 【大模型理论篇】DeepSeek-R1:引入冷启动的强化学习
  • VSCode中代码颜色异常
  • 索引的底层数据结构、B+树的结构、为什么InnoDB使用B+树而不是B树呢
  • 使用 Postman 进行 API 测试:从入门到精通
  • 【漫话机器学习系列】079.超参数调优(Hyperparameter Tuning)
  • 了解 ALV 中的 field catalog (ABAP List Viewer)
  • 大数据治理:技术视角的解析
  • java项目当中的全局异常处理
  • WPS计算机二级•幻灯片的音视频表格与图形
  • 算法设计-哈夫曼树(C++)
  • kafka集群性能调优方案
  • DeepSeek 登顶140国应用商店榜首
  • 【BUUCTF杂项题】FLAG
  • C++输入输出(上)
  • CSS --- 设置不自动换行
  • 代码随想录算法训练营第十七天| 二叉树5
  • Linux——文件与磁盘
  • 求组合数(递推法、乘法逆元、卢卡斯定理、分解质因数)
  • 介绍一下Mybatis的Executor执行器