当前位置: 首页 > article >正文

【RabbitMQ】SpringBoot整合RabbitMQ实例

文章目录

    • SpringBoot整合RabbitMQ
      • 简单模式
      • 工作模式
      • 应用示例
      • QueueBuilder
        • 常用属性
        • 使用场景

SpringBoot整合RabbitMQ

引入 amqp 依赖

<!-- amqp 依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置 yml 文件

spring:
  rabbitmq:
    host: 127.0.0.1
	password: guest
	username: guest
	port: 5672

简单模式

写一个监听器实现简单队列模式(注意不要和已有的类命名冲突)

@Component
public class BootRabbitMQListener {
    @RabbitListener(queuesToDeclare = @Queue("test_01"))
    public void onMessage(String msg) {
        System.out.println("消费者收到的消息: " + msg);
    }
}

编写控制器

@RestController
@RequestMapping("/index")
public class IndexController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/send")
    public String send(){
        rabbitTemplate.convertAndSend("test_01", "hello world");
        return "发送成功";
    }
}

调用接口:http://localhost:8080/index/send

成功接收到消息

在这里插入图片描述

工作模式

工作队列模式实现,需要在监听器中添加消费者。

@Component
public class BootRabbitMQListener {
    @RabbitListener(queuesToDeclare = @Queue("test_01"))
    public void onMessage1(String msg) {
        System.out.println("消费者01收到的消息: " + msg);
    }
    @RabbitListener(queuesToDeclare = @Queue("test_01"))
    public void onMessage2(String msg) {
        System.out.println("消费者02收到的消息: " + msg);
    }
}

调用接口:http://localhost:8080/index/send (多拍几下)

工作队列轮询方式

在这里插入图片描述

实现公平分发,需要做两件事情:

  1. 设置预取值(Prefetch Count):通过设置每个消费者在收到下一个消息之前能处理的最大消息数,可以防止一个消费者处理过多消息而其他消费者空闲。
  2. 手动确认消息:消费者需要手动确认消息,这样 RabbitMQ 才会知道消息已经被处理,可以安全地删除或者将下一条消息发送给该消费者。

修改 yml 配置文件,开启手动确认(手动应答)并设置预取值。

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual # 监听器手动确认
        prefetch: 1 # 预取值 每次只处理一条消息

或者定义一个 SimpleRabbitListenerContainerFactory 的 Bean 设置预取值。

@Configuration
public class RabbitMQConfig {
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPrefetchCount(1); // 设置预取值为1
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 设置手动确认
        return factory;
    }
}

在消费者类中,可以通过 containerFactory 属性使注解:

@RabbitListener(queuesToDeclare = @Queue("test_01"), containerFactory = "myFactory")

这里使用的是 yml 文件配置方式。

@Component
public class BootRabbitMQListener {
    @RabbitListener(queuesToDeclare = @Queue("test_01"))
    public void onMessage1(Message message, Channel channel) {
        String msg = null;
        try {
            msg = new String(message.getBody(), "utf-8");
            System.out.println("消费者01收到的消息: " + msg);
            // 手动反馈
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            e.printStackTrace();
            try {
                // 错误 放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        } finally {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    @RabbitListener(queuesToDeclare = @Queue("test_01"))
    public void onMessage2(Message message, Channel channel) {
        // 同上修改为消费者02......
    }
}

批量发送信息

@GetMapping("/send")
public String send(){
    for (int i = 0; i < 10; i++){
        rabbitTemplate.convertAndSend("test_01", "hello world"+i);
    }
    return "发送成功";
}

调用接口:http://localhost:8080/index/send

实现公平分发

在这里插入图片描述

应用示例

实现商品购买功能:

  1. 调用 controller 方法向队列中添加抢购对象(用户 ID 和商品 ID)

  2. 监听队列,手动反馈,并获得信息,修改数据库的库存记录

ObjectMapper 是 Jackson 库提供的一个类,用于处理 JSON 数据的序列化和反序列化。它可以将 Java 对象转换为 JSON 字符串,也可以将 JSON 字符串转换为 Java 对象。(与 REST API 或消息队列交互)

// 发送端
ObjectMapper objectMapper = new ObjectMapper();
String message = objectMapper.writeValueAsString(buypro);
// 接收端
ObjectMapper objectMapper = new ObjectMapper();
Buypro buypro = objectMapper.readValue(message, Buypro.class);

SerializationUtils 是 Spring 框架提供的一个工具类,位于 org.springframework.util.SerializationUtils 包中。它主要用于对象的序列化和反序列化操作。主要用于将 Java 对象序列化为字节数组(serialize 方法),或将字节数组反序列化为 Java 对象(deserialize 方法)。它适用于二进制数据的序列化。(网络传输或存储)

// 使用 SerializationUtils 序列化对象
byte[] serializedObject = SerializationUtils.serialize(buypro);
// 使用 SerializationUtils 反序列化对象
Buypro buypro = (Buypro) SerializationUtils.deserialize(message);

实现:

1、创建数据库表以及实体类

// 商品表
@Data
public class Product implements Serializable {
  @TableId(value = "id", type = IdType.AUTO)
  private Long pid;
  private Long stock;
  ......
}
// 订单表
@Data
@TableName("buy_pro")
public class Buypro implements Serializable {
    @TableId(type = IdType.AUTO)
    private Long pid;
    private Long uid;
}

2、数据库连接、mybatis-plus 的依赖引入;yml 相关配置;dao 层构建

3、发送端(使用 ObjectMapper)

@GetMapping("/purchase")
public String purchase(@RequestParam Long uid
                       , @RequestParam Long pid){
    Buypro buypro = new Buypro();
    buypro.setUid(uid);
    buypro.setPid(pid);
    ObjectMapper objectMapper = new ObjectMapper();
    String message = null;
    try {
        message = objectMapper.writeValueAsString(buypro);
    } catch (JsonProcessingException e) {
        e.printStackTrace();
    }
    rabbitTemplate.convertAndSend("purchase_queue", message);
    return "抢购请求已发送";
}

4、接收端及业务逻辑

@Component
public class BootRabbitMQListener {
    @Resource
    private ProductMapper productMapper;
    @RabbitListener(queuesToDeclare = @Queue("purchase_queue"))
    public void receiveMessage(String message) {

        ObjectMapper objectMapper = new ObjectMapper();
        try {
            Buypro buypro = objectMapper.readValue(message, Buypro.class);
            processPurchase(buypro);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private void processPurchase(Buypro buypro) {
        // 根据商品ID查询商品信息
        Product product = productMapper.selectById(buypro.getPid());
        if (product != null && product.getStock() > 0) {
            // 减少库存
            product.setStock(product.getStock() - 1);
            productMapper.updateById(product);
            System.out.println("用户 " + buypro.getUid() + " 抢购了商品 " + buypro.getPid());
        } else {
            System.out.println("商品 " + buypro.getPid() + " 库存不足");
        }
    }
}

QueueBuilder

QueueBuilder 是在与消息队列系统(如 RabbitMQ)集成时,用于以编程方式构建队列(Queue)的工具类或构建器模式的实现。它提供了一种便捷的方法来配置队列的各种属性,使得在应用程序中能够根据具体需求灵活创建不同类型和特性的队列。

  1. 属性配置灵活性:通过一系列方法,可以轻松设置队列的关键属性,如是否持久化、是否排他、是否自动删除等,以满足不同的业务场景和消息处理要求。

  2. 与消息队列系统紧密集成:通常是特定消息队列客户端库(如 Spring AMQP 用于 Spring Boot 与 RabbitMQ 集成)的一部分,能够无缝对接相应的消息队列服务,确保创建的队列在系统中正确生效。

常用属性

创建持久化队列

持久化队列在消息队列服务器重启或发生意外故障后,其队列定义(包括队列名称、属性等)以及队列中尚未被消费的消息不会丢失,会在服务器恢复正常后继续存在,可以继续被消费者消费。

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
Queue durableQueue = QueueBuilder.durable("Queue_name")
    .build();

创建非持久化队列

非持久化队列在某些情况下(如连接关闭、相关消费者进程结束等),队列及其可能剩余的消息通常会被自动删除,不具备在服务器重启等情况下的数据保留能力。

Queue nonDurableQueue = QueueBuilder.nonDurable("Queue_name")
    .build();

创建排他队列

排他队列是连接专属的,只有创建它的连接可以使用它,并且当连接关闭时,该队列会被自动删除。

Queue exclusiveQueue = QueueBuilder.exclusive("Queue_name")
    .build();

创建自动删除队列

自动删除队列在所有消费者都取消订阅或者队列中的消息都被消费完后(满足其中一个条件即可),队列会会被自动删除。

Queue autoDeleteQueue = QueueBuilder.autoDelete("Queue_name")
    .build();

设置队列长度限制

用于设置队列的最大长度限制,即队列中最多能容纳的消息数量。当队列中的消息数量达到该限制时,后续发送到该队列的消息可能会根据消息队列系统的策略进行相应处理(如被丢弃等)。

Queue queueWithLengthLimit = QueueBuilder.durable("Queue_name")
    .maxLength(1000)
    .build();
使用场景

创建好队列对象后,可以根据具体的消息队列集成场景进行后续操作,比如将队列绑定到交换机(在使用交换机的消息队列架构中)

@Configuration
public class RabbitMQConfig {
    // 创建一个持久化队列
    @Bean
    public Queue myQueue() {
        return QueueBuilder.durable("myQueue")
            .build();
    }
    // 创建一个持久化的直接交换机
    @Bean
    public DirectExchange myDirectExchange() {
        return ExchangeBuilder.directExchange("myDirectExchange")
            .durable(true)
            .build();
    }
    // 创建队列与交换机之间的绑定关系
    @Bean
    public Binding binding(DirectExchange myDirectExchange, Queue myQueue) {
        return BindingBuilder.bind(myQueue)
            .to(myDirectExchange)
            .with("routingKey");
    }
}

binding() 方法通过 @Bean 注解创建了一个 Binding 对象,用于将前面创建的 myQueue 持久化队列和 myDirectExchange 交换机进行绑定,并且指定了路由键为 routingKey 。Spring 会自动将已经创建好的 myDirectExchange 和 myQueue 这两个 Bean 作为参数传递给 binding 方法,无需手动干预。


http://www.kler.cn/a/503144.html

相关文章:

  • 前端开发:表格、列表、表单
  • ue5 蒙太奇,即上半身动画和下半身组合在一起,并使用。学习b站库得科技
  • java -jar启动项目报错:XXX.jar中没有主清单属性
  • 【STM32-学习笔记-8-】I2C通信
  • 【论文阅读】MAMBA系列学习
  • 【TI毫米波雷达】DCA1000不使用mmWave Studio的数据采集方法,以及自动化实时数据采集
  • C++(类和对象)
  • 生成式数据增强在大语言模型中的应用与实践
  • UE5.4运行报错解决(关于osg使用-无法解决的外部命令)(未解决)
  • 优秀持久层框架——MyBatis
  • 两分钟解决 :![rejected] master -> master (fetch first) , 无法正常push到远端库
  • Chromium 中的 WebUI
  • Springboot内置Apache Tomcat 安全漏洞(CVE-2024-50379)
  • vue2修改表单只提交被修改的数据的字段传给后端接口
  • JavaScript:简介
  • 春秋云镜——initial
  • 二 RK3568 固件中打开 ADB 调试
  • qt 汉字输出 中文输出 显示乱码 qDebug() 乱码 解决
  • Spring Boot项目中增加MQTT对接
  • DELTA并联机械手视觉方案荣获2024年度机器人应用典型案例奖
  • 在 Linux 下Ubuntu创建同权限用户
  • JavaScript系列(24)--内存管理机制详解
  • 2025年第三届“华数杯”国际赛A题解题思路与代码(Python版)
  • 计算机网络(四)——网络层
  • 利用 Tree Shaking 提升 React.js 性能
  • 江科大STM32入门——读写备份寄存器(BKP)实时时钟(RTC)笔记整理