当前位置: 首页 > article >正文

SpringBoot整合RabbitMQ应用

本文主要介绍SpringBoot中如何使用RabbitMQ,相关概念及基础使用参考RabbitMQ简单使用

常用配置及方法

展示rabbitmq各个模式在springboot中如何使用之前,先介绍rabbitmq在springboot中的一些常用配置及方法:

注册队列

//使用配置类以bean方式声明队列
@Configuration
public class QueueConfig {

    @Bean
    public Queue durableQueue() {
        return QueueBuilder
                .durable("simple_queue")
                .exclusive()
                .autoDelete()
                .ttl(5000)
                .expires(5000)
                .maxLength(1000)
                .maxLengthBytes(1000)
                .maxPriority(1)
                .deadLetterExchange("")
                .deadLetterRoutingKey("")
                .deliveryLimit(1)
                .lazy()
                .leaderLocator(null)
                .overflow(null)
                .quorum()
                .singleActiveConsumer()
                .build();
    }
    
}

以下是 RabbitMQ 在 Spring Boot 中使用 QueueBuilder 创建队列时,各方法的详细作用:

1. durable(String queueName):声明一个持久化队列。该队列会在 RabbitMQ 服务重启后仍然存在,队列元数据和内容不会丢失。

2. exclusive():声明队列为排他队列。该队列只能由当前连接的客户端使用,并且连接断开时队列会被删除。

3. autoDelete():设置队列为自动删除。当队列不再被任何消费者使用时,RabbitMQ 会自动删除该队列。

4. ttl(int milliseconds):为队列中消息设置TTL,即过期时间。超过 TTL 时间的消息会变为死信。

5. expires(int milliseconds):设置队列的存活时间。如果队列在指定时间内未被使用(未绑定消费者或未接收消息),RabbitMQ 将自动删除该队列。

6. maxLength(int length):设置队列的最大消息数量。队列中消息数量不能超过设置条数,超过的消息会变为死信(Dead Letter)。

7. maxLengthBytes(int size):设置队列的最大消息体总大小。队列中消息体总大小不能超过参数size字节,超出的消息会变为死信。

8. maxPriority(int priorityLevel):设置队列的最大优先级。优先级范围为 0 到 传入的参数priorityLevel。比如设置10,优先级范围则是0-10。开启此项后,存入此队列的每个消息都可以设置优先级属性,优先级越大越被优先消费。

9. deadLetterExchange(String exchangeName):指定死信队列的交换机。消息在变为死信时会被转发到该交换机。当传入空字符串 ""时,表示没有绑定死信交换机。

10. deadLetterRoutingKey(String routingKey):指定死信队列的路由键。当消息被转发到死信交换机时,会使用该路由键。传入空字符串 ""时,表示未指定。

11. deliveryLimit(int limit):设置消息的最大传递次数。消息如果被拒绝(basic.rejectbasic.nack)的次数超过此值,将变为死信。

12. lazy():设置队列为惰性队列。消息会尽量存储在磁盘而不是内存中,适合处理大量不频繁访问的消息。

13. leaderLocator(String policy):设置队列的主节点定位策略。用于集群模式下队列的主节点分配,例如通过策略指定主节点位置。此处为 null,表示未设置。

14. overflow(String policy):设置队列溢出策略。参数为"reject-publish"代表新消息会被拒绝。参数为"drop-head"代表会丢弃最早的消息(头部)。参数为null表示未设置溢出策略。

15. quorum():将队列设置为仲裁队列(Quorum Queue)。表示队列使用分布式架构(需要 RabbitMQ 3.8+ 支持),适合高可靠性场景。

16. singleActiveConsumer():启用单个活动消费者模式。即使队列有多个消费者连接,只有一个消费者会接收消息,其他消费者处于备用状态。

17. build():构建最终的队列配置对象,将前面所有配置参数应用到队列中。


注册交换机

//使用配置类以bean方式声明交换机
@Configuration
public class QueueConfig {
    
    @Bean
    public DirectExchange durableExchange() {
        return ExchangeBuilder
                .directExchange("direct_exchange")
                .durable(true)
                .admins()
                .alternate("")
                .autoDelete()
                .delayed()
                .ignoreDeclarationExceptions()
                .internal()
                .suppressDeclaration()
                .withArgument("","")
                .withArguments(null)
                .build();
    }
}

以下是 RabbitMQ 在 Spring Boot 中使用 ExchangeBuilder 创建交换机时,各方法的详细作用说明:

1. directExchange(String exchangeName):声明一个 direct 类型的交换机。参数为交换机名称。

2. durable(boolean durable):设置交换机是否持久化。true-持久化,false-非持久化,默认为true

3. admins(Object... admins):设置交换机的 RabbitAdmin 管理员。指定哪些 RabbitAdmin 实例可管理该交换机的声明、删除等操作。admins 是可变参数,可以指定一个或多个 RabbitAdmin

4. alternate(String alternateExchangeName):指定备用交换机(Alternate Exchange)。如果消息无法路由到绑定的队列,则会被转发到备用交换机。默认值:未指定备用交换机。

5. autoDelete():设置交换机为自动删除。当交换机不再被绑定到任何队列时,RabbitMQ 会自动删除该交换机。

6. delayed():设置交换机为延迟交换机。延迟交换机支持消息延迟投递到队列(需要 RabbitMQ 的延迟消息插件)。仅在 RabbitMQ 开启延迟功能插件时有效。

7. ignoreDeclarationExceptions():忽略交换机声明时发生的异常。声明交换机时,如果遇到异常(如交换机已存在但配置冲突),会自动忽略该异常,而不是抛出错误。

8. internal():将交换机设置为内部交换机。内部交换机只能由其他交换机进行消息路由,而无法直接接收生产者发送的消息。适用于消息的复杂路由逻辑。

9. suppressDeclaration():抑制交换机声明。声明不会在 RabbitMQ 上实际创建该交换机。用于测试或其他需要逻辑中声明但不实际创建的场景。

10. withArgument(String key, Object value):为交换机添加单个自定义参数。用于设置额外的参数,例如延迟交换机的 x-delayed-type

11. withArguments(Map<String, Object> arguments):为交换机添加多个自定义参数。

12. build():构建最终的交换机对象。将前面所有设置组合,返回一个可用于声明的交换机对象。


注册绑定关系

//使用配置类以bean方式声明交换机与队列的绑定关系
@Configuration
public class QueueConfig {

    //省略队列与交换机,参考上面
    
    //绑定交换机与队列
    @Bean
    public Binding durableBinding() {
        return new Binding(
                "durable_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "durable_exchange", //交换机名
                "durable-key",  //路由键
                null    //额外参数
        );
    }

}

Binding 类是 RabbitMQ 中用于描述交换机(Exchange)和队列(Queue)之间绑定关系的对象。它允许我们将一个队列绑定到交换机,或者将交换机绑定到交换机,并且在此过程中指定路由键(Routing Key)以及其他额外的参数。以下是该类的构造方法及参数的详细解析。

构造方法:

public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
			@Nullable Map<String, Object> arguments)

这个构造方法用于创建一个绑定对象。它接收五个参数:

参数 1: destination (String)

  • 类型: String
  • 含义: 绑定的目的地,可以是队列或交换机。此参数表示该绑定的目标对象名称。
  • 用途:
    • 如果绑定的目标是队列,则 destination 为队列名称。
    • 如果绑定的目标是交换机,则 destination 为交换机名称。

参数 2: destinationType (DestinationType)

  • 类型: DestinationType
  • 含义: 指定 destination 是一个队列(QUEUE)还是交换机(EXCHANGE)。DestinationType 是一个枚举类型,包含了两个值:
    • QUEUE: 绑定的目标是一个队列。
    • EXCHANGE: 绑定的目标是一个交换机。
  • 用途: 用于确定目标对象的类型,是队列还是交换机。这是决定绑定逻辑的重要信息。

参数 3: exchange (String)

  • 类型: String
  • 含义: 要绑定的交换机名称。该参数指定了源交换机的名称。
  • 用途: 这是绑定关系中的源交换机。如果目标是一个队列,则交换机将通过路由键来路由消息到队列。如果目标是一个交换机,则该交换机会将消息路由到其他交换机或队列。

参数 4: routingKey (String)

  • 类型: String
  • 含义: 用于路由消息的路由键(Routing Key)。
  • 用途:
    • 在绑定队列和交换机时,routingKey 决定了消息通过交换机时的路由规则。例如,在 direct 类型的交换机中,路由键用于匹配队列;在 topic 类型的交换机中,路由键支持模式匹配。

参数 5: arguments (Map<String, Object>)

  • 类型: Map<String, Object>
  • 含义: 可选的附加参数。用于配置绑定的额外属性,如 x-dead-letter-exchangex-message-ttl 等自定义属性。
  • 用途:
    • 可以传递额外的配置项,这些配置项会根据 RabbitMQ 的扩展功能被使用。例如,可以通过设置参数指定死信队列、消息过期时间、最大队列长度等。
    • arguments 是一个键值对的集合,其中的键是 RabbitMQ 允许的特殊参数名称,而值是相应的配置项。

MQ模板

RabbitTemplate 是 Spring AMQP 提供的一个强大且灵活的工具类,用于与 RabbitMQ 进行消息交互。它提供了方便的 API 来发送和接收消息,并支持异步操作、消息确认、回退、事务等功能,且易于配置和扩展。通过合理的配置和方法调用,RabbitTemplate 可以满足大多数与 RabbitMQ 集成的应用场景。

1. 基本功能和作用

RabbitTemplate 是一个用于发送和接收消息的同步工具,它实现了 RabbitMQ 的消息发送、消息接收和异步处理的功能。主要用于生产者和消费者之间的消息传递。

  • 发送消息:通过 RabbitTemplate 可以将消息发送到 RabbitMQ 中的指定交换机和队列。
  • 接收消息RabbitTemplate 还支持从队列中同步接收消息,并进行处理。

2. 常用配置

创建 RabbitTemplate Bean

通常,RabbitTemplate 会通过 Java 配置类创建并注入到 Spring 容器中。常见的创建方式如下:

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange("your-exchange");
    rabbitTemplate.setRoutingKey("your-routing-key");
    return rabbitTemplate;
}
  • connectionFactory:连接工厂,用于创建到 RabbitMQ 的连接。
  • setExchange:指定消息发送的交换机(如果在发送时没有指定,RabbitTemplate 会使用默认的交换机)。
  • setRoutingKey:设置消息发送的路由键,通常是队列名或自定义的键。

注意: 默认情况下,springboot会根据连接信息为我们自动配置RabbitTemplate,所以一般不需要单独做上面的配置,直接使用RabbitTemplate即可。

核心方法

RabbitTemplate 提供了多种方法来发送和接收消息,最常用的包括:

  • send():发送消息到指定的交换机和路由键。
  • convertAndSend():将一个对象转换为消息并发送(通常是 JSON 或其他格式的对象)。
  • receive():接收一个消息并返回(同步方法)。
  • receiveAndConvert():接收一个消息并转换为对象。

示例代码

发送消息:

rabbitTemplate.convertAndSend("exchange", "routing-key", "消息内容");

接收消息:

Object response = rabbitTemplate.receiveAndConvert("queue-name");

发送/接收带有回调的消息

RabbitTemplate 还支持通过回调处理响应。例如,发送消息并等待消费者的回复:

Message responseMessage = rabbitTemplate.convertSendAndReceive("exchange", "routing-key", message);
  • convertSendAndReceive:将消息发送到指定交换机和路由键并等待回复。
  • sendAndReceive:可以发送消息并在接收到响应后进行处理。它可以传递一个 CorrelationData 对象,用于跟踪消息。

3. 常用配置和功能

消息确认和回退

通过配置 RabbitTemplate,可以启用消息确认机制。RabbitTemplate 提供了 setConfirmCallbacksetReturnsCallback 来处理消息确认和消息退回:

  • ConfirmCallback: 当消息成功发送到 RabbitMQ 交换机时,回调被触发。
  • ReturnsCallback: 如果消息无法路由到目标队列并被 RabbitMQ 退回,回调会被触发。
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        // 消息成功发送
    } else {
        // 处理失败的情况
    }
});

rabbitTemplate.setReturnsCallback(returned -> {
    // 处理消息被退回的情况
});

设置超时和重试

RabbitTemplate 允许设置超时、重试和消息过期等策略。例如:

  • setReplyTimeout():设置接收回复消息的超时时间。
  • setRetryTemplate():自定义消息重试策略。
rabbitTemplate.setReplyTimeout(10000); // 设置回复超时为10秒

使用临时队列和回调队列

通过 RabbitTemplate,可以指定临时队列作为消息回调队列。这对于 RPC 模式非常有用,消息发送后,消费者处理完消息可以通过回调队列返回结果。

rabbitTemplate.setUseTemporaryReplyQueues(true);
rabbitTemplate.setReplyAddress("callback-queue");

消息序列化与转换

RabbitTemplate 具有将消息体从 Java 对象转换为消息体字节数组的能力。可以通过自定义序列化器来实现不同的数据格式(例如 JSON、XML 或其他格式)。

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
  • Jackson2JsonMessageConverter:使用 Jackson 将 Java 对象和 JSON 进行相互转换。

4. 常见的高级配置

设置消息的持久化

你可以设置消息的持久性,确保消息在 RabbitMQ 中不丢失。通常通过在队列中设置持久性或者通过 RabbitTemplate 设置消息持久性:

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("Persistent Message".getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange", "routing-key", message);

事务支持

通过 RabbitTemplate 可以开启事务模式。事务模式可以确保消息的发送和接收过程中的原子性。

rabbitTemplate.execute(channel -> {
    channel.basicPublish("exchange", "routing-key", true, false, null);
    return null;
});

高级异步支持

Spring AMQP 提供的 RabbitTemplate 支持异步消息发送。通过设置 async 配置项,RabbitTemplate 可以异步发送消息。


5. 常见使用场景

  • 简单的消息发送和接收RabbitTemplate 用于同步地发送和接收消息,如简单的生产者-消费者模式。
  • RPC模式:结合 sendAndReceive 和回调队列,用于实现 RPC(远程过程调用)模式。
  • 消息确认与回退:使用 setConfirmCallbacksetReturnsCallback 配置消息的确认和失败回退处理。
  • 消息持久化与事务:通过设置 DeliveryMode.PERSISTENT 或配置事务机制,保证消息的可靠性。


监听注解

在 Spring Boot 中,@RabbitListener@RabbitHandler 是用于SpringBoot消费者处理 RabbitMQ 消息的两个重要注解。它们常常一起使用,但它们的用途和使用方式各有不同。以下是对这两个注解的详细解析。

@RabbitListener

@RabbitListener 用于标记一个方法或类,用来监听指定的队列并处理来自 RabbitMQ 的消息。

基本用法

  1. 在类级别使用
    配置一个类监听某些队列,该类的具体消息处理方法需要使用 @RabbitHandler 注解标注。

    @RabbitListener(queues = "example_queue")
    public class ExampleListener {
        @RabbitHandler
        public void handleMessage(String message) {
            System.out.println("Received: " + message);
        }
    }
    
  2. 在方法级别使用
    将一个方法直接注册为队列的监听器。此时不需要 @RabbitHandler

    public class ExampleListener {
        @RabbitListener(queues = "example_queue")
        public void handleMessage(String message) {
            System.out.println("Received: " + message);
        }
    }
    

关键属性

  1. queues

    • 指定要监听的队列名称。
    • 如果队列不存在,可能会根据配置自动创建。
    • 示例:
      @RabbitListener(queues = "example_queue")
      
  2. queuesToDeclare

    • 用于声明需要监听的队列。
    • 示例:
      @RabbitListener(queuesToDeclare = @Queue(value = "example_queue", durable = "true"))
      
  3. bindings

    • 声明绑定关系(队列、交换机、路由键)。
    • 示例:
      @RabbitListener(bindings = @QueueBinding(
          value = @Queue(value = "example_queue", durable = "true"),
          exchange = @Exchange(value = "example_exchange", type = "direct"),
          key = "routing_key"
      ))
      
  4. concurrency

    • 设置消费者的并发数。
    • 示例:
      @RabbitListener(queues = "example_queue", concurrency = "3-10")
      
  5. containerFactory

    • 指定使用的 RabbitListenerContainerFactory
    • 示例:
      @RabbitListener(queues = "example_queue", containerFactory = "myContainerFactory")
      
  6. errorHandler

    • 自定义错误处理逻辑。
    • 示例:
      @RabbitListener(queues = "example_queue", errorHandler = "myErrorHandler")
      
  7. messageConverter

    • 自定义消息转换器,用于将消息体转换为目标类型。
    • 示例:
      @RabbitListener(queues = "example_queue", messageConverter = "jsonMessageConverter")
      

@RabbitHandler

@RabbitHandler 是用于标注消息处理方法的注解。它通常与 @RabbitListener 配合使用,用于处理不同类型的消息。

基本用法

  • 配合类级别的 @RabbitListener 注解使用时,一个类中可以有多个 @RabbitHandler 方法,具体执行哪个方法取决于消息体的类型。
@RabbitListener(queues = "example_queue")
public class ExampleListener {

    @RabbitHandler
    public void handleTextMessage(String message) {
        System.out.println("Received text: " + message);
    }

    @RabbitHandler
    public void handleOrderMessage(Order order) {
        System.out.println("Received order: " + order);
    }

    @RabbitHandler
    public void handleDefault(Object message) {
        System.out.println("Received generic: " + message);
    }
}

在上面的代码中:

  • 如果消息体是 String 类型,调用 handleTextMessage
  • 如果消息体是 Order 类型,调用 handleOrderMessage
  • 其他类型的消息将调用 handleDefault

@RabbitHandler 的规则

  1. 一个类可以有多个 @RabbitHandler 方法
  2. 必须与类级别的 @RabbitListener 一起使用
  3. 根据消息体的类型或内容自动选择处理方法

两个注解的对比

特性@RabbitListener@RabbitHandler
标注位置类级别或方法级别方法级别
作用定义队列监听逻辑定义具体的消息处理逻辑
是否支持多类型消息不支持(直接监听的方法只能处理一种类型的消息)支持(根据类型选择不同的处理方法)
典型用法配置队列绑定、容器工厂等在同一个类中处理多种类型的消息

@RabbitListener:用于定义监听队列的逻辑,支持队列绑定、并发配置等。

@RabbitHandler:用于定义消息的具体处理逻辑,可以根据消息类型实现多态处理。

推荐用法

  • 如果消息只有一种类型,直接在方法级别使用 @RabbitListener
  • 如果消息有多种类型,在类级别使用 @RabbitListener,并配合多个 @RabbitHandler 实现不同类型消息的处理逻辑。


springboot中各模式的应用

新建springboot项目,pom相关依赖项如下

spring-boot版本:

<parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>2.7.10</version>
</parent>

其他依赖

<!-- 核心启动器 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>        

<!-- web启动器 -->
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- amqp支持 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

简单模式

yaml

只需配置rabbitmq的基本连接信息即可

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

新建一个配置类,使用spring的方式声明一个简单模式队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

    //简单模式队列
    @Bean("simpleQueue")
    public Queue SimpleQueue() {
        return QueueBuilder
                .durable("simple_queue") //持久化一个叫simple_queue的队列
                .build();
    }

}

生产者

创建消息对象Message,通过构造方法将消息内容及消息属性MessageProperties存入其中,并使用RabbitTemplate模板发布消息

springboot会根据yaml的配置自动创建连接工厂ConnectionFactoryRabbitTemplate默认使用ConnectionFactory来连接rabbitmq服务进行消息发布

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class SimpleProducer {

    //注入模板
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("simple")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  1.三个参数从左往右依次为交换机、路由键、消息对象
         *  2.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),
         *     路由键处填写队列名即可(会自动匹配队列)
         *  3.direct、topic等模式则填写对应的交换机和路由键
         * */
        rabbitTemplate.convertAndSend("","simple_queue",message);
    }


}

消费者

@RabbitListener注解可以指定要监听的队列,@RabbitHandler用于标记方法为消息处理方法。一旦监听的队列有消息时,监听类会自动调用消息处理方法进行处理

注意:一个@RabbitListener注解标记的类中可以有多个@RabbitHandler标记的方法,但参数类型不能相同。@RabbitListener注解也可以不标记在类上,直接标记在方法上。

只要simple_queue队列中有消息存入,如下代码就会自动触发:

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 1.@RabbitListener(queues = "simple_queue")
 *      需要simple_queue队列已存在
 *
 * 2.@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue",durable = "true",autoDelete = "false"))
 *      不需要simple_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
 *      使用此处的注解,可以不用上面的配置类即可声明simple_queue队列
 *
 * 3.@RabbitHandler注解配合@RabbitListener使用,标记方法为消息处理方法。但当生产者与消费者的消息类型不一致会报错
 *
 * */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue"))
public class SimpleCustomer {

    @RabbitHandler
    public static void excute(String message) {
        System.out.println(" 简单模式消费者接收消息: " + message + "!");
    }

}

使用测试

运行主启动类,请求地址:

localhost:8084/send/simple?data=简单模式测试消息

控制台输出:

简单模式消费者接收消息: 简单模式测试消息!


工作模式

yaml

只需配置rabbitmq的基本连接信息即可

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

注册工作模式队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

    //工作模式队列
    @Bean("workQueue")
    public Queue WorkQueue() {
        return QueueBuilder
                .durable("work_queue")
                .build();
    }

}

生产者

创建消息对象Message,通过构造方法将消息内容及消息属性MessageProperties存入其中,并使用RabbitTemplate模板发布消息

springboot会根据yaml的配置自动创建连接工厂ConnectionFactoryRabbitTemplate默认使用ConnectionFactory来连接rabbitmq进行消息发布。这里循环发10条消息:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class WorkProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("work")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //发10条消息
        for (int i = 0; i < 10; i++) {

            /**
             * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
             *  1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
             *  2.direct、topic等模式则填写对应的交换机和路由键
             * */
            rabbitTemplate.convertAndSend("","work_queue",message);

        }
    }


}

轮询模式消费

  • RabbitMQ 将消息以 轮询的方式 均匀地分发给所有消费者,消息的分配模式是一个消费者分配一条,直至消息消费完成。
  • 每个消费者都会轮流收到消息,而不会考虑消费者当前的工作负载。

创建两个消费者,或者一个消费者中使用两个消息处理方法

消费者一

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 1.@RabbitListener(queues = "work_queue")
 *      需要work_queue队列已存在
 *
 * 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
 *      不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
 *
 * 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
 *
 * */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerOne {

    @RabbitHandler
    public static void excute(String message) {
        System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
    }

}

消费者二

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 1.@RabbitListener(queues = "work_queue")
 *      需要work_queue队列已存在
 *
 * 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
 *      不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
 *
 * 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
 *
 * */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerTwo {

    @RabbitHandler
    public static void excute(String message) {
        System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
    }

}

整合多个消息处理方法的消费者,与上面两个消费者的方式二选一即可:

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 如果消息处理方法的参数类型一致,将@RabbitListener标记在方法上
 * */
@Component
public class WorkConsumers {

    @RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
    public static void excute1(String message) {
        System.out.println(" 工作模式消费者-1接收消息: " + message + "!");
    }

    @RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
    public static void excute2(String message) {
        System.out.println(" 工作模式消费者-2接收消息: " + message + "!");
    }
}

使用测试

运行主启动类后,调用地址:

localhost:8084/send/work?data=工作模式测试消息

控制台输出如下,可发现两个消费者都接收到同等数量的消息,都是5个。

工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!

公平模式消费

  • 公平分配遵循能者多劳的原则,核心是基于 消费者的繁忙程度 分发消息。
  • RabbitMQ 通过 消息确认(ACK)机制 来检测消费者是否空闲。
  • 如果消费者在当前未完成前一个任务,则不会分配新的任务给该消费者。

公平模式也叫能者多劳,消息处理速度快的消费者会得到更多消息

在上面的基础上,yaml增加监听配置,完成后如下

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #监听配置
    listener:
      simple:
        prefetch: 1 # 队列一次只拿一条给消费者,限流.    prefetch:1 即qos=1, 默认250

        #开启消费者手动确认消息,在spring boot中提供了三种确认模式:
        #  NONE - 使用rabbitmq的自动确认
        #  AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
        #  MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
        #  默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
        acknowledge-mode: manual

生产者代码不变

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class WorkProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("work")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //发10条消息
        for (int i = 0; i < 10; i++) {

            /**
             * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
             *  1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
             *  2.direct、topic等模式则填写对应的交换机和路由键
             * */
            rabbitTemplate.convertAndSend("","work_queue",message);

        }
    }

}

创建两个消费者,或者一个消费者中使用两个消息处理方法

消费者一,添加手动确认方法,并加入线程阻塞模拟处理慢:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 1.@RabbitListener(queues = "work_queue")
 *      需要work_queue队列已存在
 *
 * 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
 *      不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
 *
 * 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
 *
 * */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerOne {

    @RabbitHandler
    public static void excute(String message, Message msg, Channel channel) throws IOException {

        System.out.println(" 工作模式消费者-1接收消息: " + message + "!");

        try {
            /**
             * 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
             * */
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

            //模拟让消费者1处理变慢
            Thread.sleep(300);

        } catch (Exception e) {
            e.printStackTrace();

            /**
             * 出现异常则拒绝消息。
             * 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
             * 如果第三个参数为True,则会把消息返回队列重新发送
             * */
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
        }
    }

}

消费者二,正常处理:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**
 * 1.@RabbitListener(queues = "work_queue")
 *      需要work_queue队列已存在
 *
 * 2.@RabbitListener(queuesToDeclare = @Queue(value = "work_queue",durable = "true",autoDelete = "false"))
 *      不需要work_queue存在,没有会自动创建,且指定了队列会持久化,不自动删除
 *
 * 3.当@RabbitListener注解在类上时,生产者与消费者的消息类型不一致会报错,此注解放在方法上则不会
 *
 * */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public class WorkConsumerTwo {

    @RabbitHandler
    public static void excute(String message, Message msg, Channel channel) throws IOException {

        System.out.println(" 工作模式消费者-2接收消息: " + message + "!");

        try {
            /**
             * 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
             * */
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {
            e.printStackTrace();

            /**
             * 出现异常则拒绝消息。
             * 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
             * 如果第三个参数为True,则会把消息返回队列重新发送
             * */
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
        }
    }

}

整合多个消息处理方法的消费者,与上面两个消费者的方式二选一即可:

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;


/**
 * 如果消息处理方法的参数类型一致,将@RabbitListener标记在方法上
 * */
@Component
public class WorkConsumers {

    @RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
    public static void excute1(String message, Message msg, Channel channel) throws IOException {

        System.out.println(" 工作模式消费者-1接收消息: " + message + "!");

        try {
            /**
             * 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
             * */
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

            //模拟让消费者1处理变慢
            Thread.sleep(300);

        } catch (Exception e) {
            e.printStackTrace();

            /**
             * 出现异常则拒绝消息。
             * 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
             * 如果第三个参数为True,则会把消息返回队列重新发送
             * */
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
        }
    }



    @RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
    public static void excute2(String message, Message msg, Channel channel) throws IOException {

        System.out.println(" 工作模式消费者-2接收消息: " + message + "!");

        try {
            /**
             * 手动ack,如果此方法不执行,则此消费者会一直等待,不会接收到新消息
             * */
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {
            e.printStackTrace();

            /**
             * 出现异常则拒绝消息。
             * 如果第二个参数为true,标识可以同时拒绝多个消息,false则只能一个
             * 如果第三个参数为True,则会把消息返回队列重新发送
             * */
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(),true,true);
        }
    }
}

使用测试

运行主启动类后,调用地址:

localhost:8084/send/work?data=工作模式测试消息

控制台输出如下,由于消费者一添加了阻塞,所以处理慢于消费者二,消费者二会得到更多消息。

 工作模式消费者-1接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-2接收消息: 工作模式测试消息!
 工作模式消费者-1接收消息: 工作模式测试消息!


广播模式

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

声明交换机、三个队列,并进行绑定

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {


    //广播模式队列
    @Bean("fanoutQueue1")
    public Queue fanoutQueue1() {
        return QueueBuilder
                .durable("fanout1_queue")
                .build();
    }
    @Bean("fanoutQueue2")
    public Queue fanoutQueue2() {
        return QueueBuilder
                .durable("fanout2_queue")
                .build();
    }
    @Bean("fanoutQueue3")
    public Queue fanoutQueue3() {
        return QueueBuilder
                .durable("fanout3_queue")
                .build();
    }

    //广播模式交换机
    @Bean("fanoutExchange")
    public Exchange fanoutExchange() {
        return ExchangeBuilder
                .fanoutExchange("fanout_exchange")
                .durable(true)
                .build();
    }



    //绑定fanout模式交换机与fanout模式队列
    @Bean
    public Binding fanoutBinding1() {
        return new Binding(
                "fanout1_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "fanout_exchange", //交换机名
                "",  //路由键,广播模式不需要路由键
                null    //额外参数,可用于配置死信队列的绑定等
        );
    }
    @Bean
    public Binding fanoutBinding2() {
        return new Binding(
                "fanout2_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "fanout_exchange", //交换机名
                "",  //路由键,广播模式不需要路由键
                null    //额外参数,可用于配置死信队列的绑定等
        );
    }
    @Bean
    public Binding fanoutBinding3() {
        return new Binding(
                "fanout3_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "fanout_exchange", //交换机名
                "",  //路由键,广播模式不需要路由键,空字符即可
                null    //额外参数,可用于配置死信队列的绑定等
        );
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;

@RestController
@RequestMapping("send")
public class FanoutProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("fanout")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  这里指定交换机为配置类中声明的,路由键传空字符串是因为广播模式不需要路由键
         * */
        rabbitTemplate.convertAndSend("fanout_exchange","",message);


    }

}

消费者

可将多个消费者整合到一个类中,或者为每个消费者单独定义一个类。两种二选一使用即可

整合多个消费者到一个类

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutConsumer {

//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(value = "fanout1_queue"), //声明的队列
//            exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
//    ))
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
    @RabbitListener(queues = "fanout1_queue")
    public void receive1(String message) {
        System.out.println("广播模式消费者1接收到消息:" + message);
    }



//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(value = "fanout2_queue"), //声明的队列
//            exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
//    ))
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
    @RabbitListener(queues = "fanout2_queue")
    public void receive2(String message) {
        System.out.println("广播模式消费者2接收到消息:" + message);
    }



//    @RabbitListener(bindings = @QueueBinding(
//            value = @Queue(value = "fanout3_queue"), //声明的队列
//            exchange = @Exchange(value = "fanout_exchange",type = "fanout") //声明的交换机,需指定类型,否则默认为direct
//    ))
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
    @RabbitListener(queues = "fanout3_queue")
    public void receive3(String message) {
        System.out.println("广播模式消费者3接收到消息:" + message);
    }

}

或为每个消费者单独定义一个类

@RabbitListener(queues = "fanout1_queue")
@Component
public class FanoutConsumerOne {
	
	@RabbitHandler
	public void receive(String message) {
        System.out.println("广播模式消费者1接收到消息:" + message);
    }

}


@RabbitListener(queues = "fanout2_queue")
@Component
public class FanoutConsumerTwo {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("广播模式消费者2接收到消息:" + message);
    }

}


@RabbitListener(queues = "fanout3_queue")
@Component
public class FanoutConsumerThree {

    @RabbitHandler
    public void receive(String message) {
        System.out.println("广播模式消费者3接收到消息:" + message);
    }

}

使用测试

运行主启动类后,调用地址:

localhost:8084/send/fanout?data=广播模式测试消息

控制台输出如下,每个消费者都得到了消息。

广播模式消费者2接收到消息:广播模式测试消息
广播模式消费者3接收到消息:广播模式测试消息
广播模式消费者1接收到消息:广播模式测试消息


Direct模式

yaml

配置基本连接信息

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

使用ExchangeBuilder.directExchange()声明direct模式交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    //direct模式交换机
    @Bean("directExchange")
    public Exchange DirectExchange() {
        return ExchangeBuilder
                .directExchange("direct_exchange")
                .durable(true)
                .build();
    }

    //direct模式队列
    @Bean("directQueue1")
    public Queue DirectQueue1() {
        return QueueBuilder
                .durable("direct1_queue")
                .build();
    }
    @Bean("directQueue2")
    public Queue DirectQueue2() {
        return QueueBuilder
                .durable("direct2_queue")
                .build();
    }



    //绑定direct模式交换机与direct模式队列
    @Bean
    public Binding directBinding1() {
        return new Binding(
                "direct1_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "direct_exchange", //交换机名
                "info",  //路由键
                null    //额外参数,可用于配置死信队列的绑定
        );
    }
    @Bean
    public Binding directBinding2() {
        return new Binding(
                "direct2_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "direct_exchange", //交换机名
                "error",  //路由键
                null    //额外参数,可用于配置死信队列的绑定
        );
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class DirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("direct")
    public void execute(@RequestParam String data, @RequestParam String key){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  这里指定交换机为配置类中声明的,路由键由请求传入
         * */
        rabbitTemplate.convertAndSend("direct_exchange",key,message);


    }

}

消费者

将多个消费者整合到一个类中,也可以为每一个消费者单独定义一个类,这里使用整合方式

整合方式

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectConsumer {


//    @RabbitListener(bindings = {
//            @QueueBinding(
//                    value = @Queue(value = "direct1_queue"), // 创建队列
//                    key = {"info"}, // 路由key,绑定多个键:key = {"info", "error"}
//                    exchange = @Exchange(name = "direct_exchange", type = "direct") //交换机,也可用ExchangeTypes.DIRECT来设置交换机类型
//            )})
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
    @RabbitListener(queues = "direct1_queue")
    public void receive1(String message) {

        System.out.println("Direct模式消费者1接收到消息:" + message);

    }
    


//    @RabbitListener(bindings = {
//            @QueueBinding(
//                    value = @Queue(value = "direct2_queue"), // 创建队列
//                    key = {"error"}, // 路由key
//                    exchange = @Exchange(name = "direct_exchange", type = "direct") //交换机,也可用ExchangeTypes.DIRECT来设置交换机类型
//            )})
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系,该注解会帮我们完成注册
    @RabbitListener(queues = "direct2_queue")
    public void receive2(String message) {

        System.out.println("Direct模式消费者2接收到消息:" + message);

    }


}

单独定义

@Component
@RabbitListener(queues = "direct1_queue")
public class DirectConsumerOne {

    @RabbitHandler
    public void receive1(String message) {
        System.out.println("Direct模式消费者1接收到消息:" + message);
    }
}
@Component
@RabbitListener(queues = "direct2_queue")
public class DirectConsumerTwo {

    @RabbitHandler
    public void receive1(String message) {
        System.out.println("Direct模式消费者2接收到消息:" + message);
    }
}

使用测试

运行主启动类

调用如下地址:

localhost:8084/send/direct?data=路由模式测试消息&key=info

因为消费者一所监听的队列绑定交换机的路由键为info,所以控制台输出:

Direct模式消费者1接收到消息:路由模式测试消息

再调用如下地址:

localhost:8084/send/direct?data=路由模式测试消息&key=error

因为消费者二所监听的队列绑定交换机的路由键为error,所以控制台输出:

Direct模式消费者2接收到消息:路由模式测试消息


topic模式

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

使用ExchangeBuilder.topicExchange()声明topic模式交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    //topic模式交换机
    @Bean("topicExchange")
    public Exchange topicExchange() {
        return ExchangeBuilder
                .topicExchange("topic_exchange")
                .durable(true)
                .build();
    }

    //topic模式队列
    @Bean("topicQueue1")
    public Queue topicQueue1() {
        return QueueBuilder
                .durable("topic1_queue")
                .build();
    }
    @Bean("topicQueue2")
    public Queue topicQueue2() {
        return QueueBuilder
                .durable("topic2_queue")
                .build();
    }



    //绑定topic模式交换机与topic模式队列
    @Bean
    public Binding topicBinding1() {
        return new Binding(
                "topic1_queue", //队列
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "topic_exchange", //交换机
                "#.message.#",  //路由键
                null    //额外参数,可用于配置死信队列的绑定
        );
    }
    @Bean
    public Binding topicBinding2() {
        return new Binding(
                "topic2_queue", //队列
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "topic_exchange", //交换机
                "message.*",  //路由键
                null    //额外参数,可用于配置死信队列的绑定
        );
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class TopicProducer {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("topic")
    public void execute(@RequestParam String data, @RequestParam String key){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  这里指定交换机为配置类中声明的,路由键由请求传入
         * */
        rabbitTemplate.convertAndSend("topic_exchange",key,message);


    }


}

消费者

将多个消费者整合到一个类中,也可以为每一个消费者单独定义一个类,这里使用整合方式

整合方式:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

//    @RabbitListener(bindings = {
//            @QueueBinding(
//                    value = @Queue(value = "topic1_queue"), // 创建队列
//                    key = {"#.message.#"}, // 路由key
//                    exchange = @Exchange(name = "topic_exchange", type = "topic") //交换机
//            )})
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系
    @RabbitListener(queues = "topic1_queue")
    public void receive1(String message) {

        System.out.println("topic模式消费者1接收到消息:" + message);

    }



//    @RabbitListener(bindings = {
//            @QueueBinding(
//                    value = @Queue(value = "topic2_queue"), // 创建队列
//                    key = {"message.*"}, // 路由key
//                    exchange = @Exchange(name = "topic_exchange", type = "topic") //交换机
//            )})
    //与上面注释掉的注解二选一使用即可,使用上面注释掉的注解则可以不用在配置类中配置队列、交换机以及绑定关系
    @RabbitListener(queues = "topic2_queue")
    public void receive2(String message) {

        System.out.println("topic模式消费者2接收到消息:" + message);

    }

}

为每个消费者单独定义一个类的方式:

//消费者1
@Component
public class TopicConsumerOne {

    @RabbitListener(queues = "topic1_queue")
    public void receive1(String message) {

        System.out.println("topic模式消费者1接收到消息:" + message);

    }

}

//消费者2
@Component
public class TopicConsumerTwo {

    @RabbitListener(queues = "topic2_queue")
    public void receive2(String message) {

        System.out.println("topic模式消费者2接收到消息:" + message);

    }

}

使用测试

topic模式的通配符匹配规则:

  • *: 匹配不多不少恰好1个词
  • #: 匹配零个、一个或多个词

运行主启动类,当调用地址为:

localhost:8084/send/topic?data=路由模式测试消息&key=message.info

请求传入的路由键为message.info,消费者1绑定的路由键为**#.message.#,消费者2绑定的路由键为message.***,二者皆可匹配,所以控制台输出:

topic模式消费者1接收到消息:路由模式测试消息
topic模式消费者2接收到消息:路由模式测试消息

当把请求修改为如下地址再次调用时,将只有消费者1可以匹配:

localhost:8084/send/topic?data=路由模式测试消息&key=info.message

控制台只会输出:

topic模式消费者1接收到消息:路由模式测试消息


rpc模式

rpc模式配置步骤

  1. 创建rpc模式的消息队列queue

  2. 配置rpc模式专用的RabbitTemplate模板,为其指定**回调队列(可使用临时队列或固定队列)**并设置CorrelationId

  3. 配置监听器MessageListenerContainer监听回调队列

  4. 生产者使用RabbitTemplate模板的sendAndReceive方法来发布消息并等待接收回调消息

  5. 消费者接收消息后,获取CorrelationId并设置为回调消息的属性,然后使用RabbitTemplate模板的send方法返回回调消息

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #监听配置
    listener:
      simple:
        #开启消费者手动确认消息,在spring boot中提供了三种确认模式:
        #  NONE - 使用rabbitmq的自动确认
        #  AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
        #  MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
        #  默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
        acknowledge-mode: manual

配置类

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    @Autowired
    private ConnectionFactory connectionFactory;


    //rpc模式队列
    @Bean("rpcQueue")
    public Queue rpcQueue() {
        return QueueBuilder
                .durable("rpc_queue")
                .build();
    }

    //用于rpc模式进行消息回调的队列
    @Bean("replyQueue")
    public Queue replyQueue() {
        return QueueBuilder
                .durable("reply_queue")
                .build();
    }


    @Bean
    public RabbitTemplate replyTemplate() {

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //决定是否使用临时队列接收回调消息。设置为 false 使用固定队列。
        rabbitTemplate.setUseTemporaryReplyQueues(false);
        //指定固定回调队列,配合 setUseTemporaryReplyQueues(false) 使用。
        rabbitTemplate.setReplyAddress("reply_queue");
        //开启CorrelationId自动关联到消息的功能,如不设置为true则需要手动创建CorrelationData对象并关联到消息上
        rabbitTemplate.setUserCorrelationId(true);
        //设置等待回调消息的超时时间,超时后抛出异常。
        rabbitTemplate.setReplyTimeout(60000);

        return rabbitTemplate;
    }


    /**
     * RPC回调队列监听器,监听reply_queue回调队列,接收消费者的回调消息并将其交付给生产者的 sendAndReceive 方法。
     *
     **/
    @Bean
    public SimpleMessageListenerContainer createReplyListenerContainer() {
        SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        listenerContainer.setQueueNames("reply_queue");
        listenerContainer.setMessageListener(replyTemplate());
        return listenerContainer;
//      listenerContainer.setConsumerBatchEnabled(true);//设置消费多线程处理
//      listenerContainer.setDeBatchingEnabled(true);//开启多线程处理
//      listenerContainer.setBatchSize(500);//设置监听器一次批量处理的消息数量
//      listenerContainer.setConcurrentConsumers(500);  //设置线程数
//      listenerContainer.setMaxConcurrentConsumers(1000); //最大线程数
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.UUID;


@RestController
@RequestMapping("send")
public class RpcProducer {

    //使用注册的rpc模式模板
    @Autowired
    private RabbitTemplate replyTemplate;

    @GetMapping("rpc")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //设置消息的回调id,作用是标识此消息来自于哪个生产者,以便消费者进行回调时,通过此id将消息返回给来源的生产者
        messageProperties.setCorrelationId(UUID.randomUUID().toString());
        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);


        /**
         * sendAndReceive:向rpc队列投递消息并同步等待消费结果。
         *  从左到右参数为:
         *      交换机:rpc模式可不用指定交换机,这里传空字符传
         *      路由键:设置为rpc模式的队列名
         *      消息内容
         *      回调id:用于标识消息来自那个生产者,没有此id消息将无法回调给生产者
         * */
        Message returnMsg = replyTemplate.sendAndReceive("","rpc_queue",message);

        /**
         * 此处与配置类 rabbitTemplate.setUserCorrelationId(true) 的设置相关联,如果未设置true或设置了false,
         * 则需要使用如下方式创建CorrelationData对象,并将此对象传入发送方法的第四个参数
         * */
        //CorrelationData correlationData = new CorrelationData();
        //correlationData.setId(messageProperties.getCorrelationId());
        //Message returnMsg = replyTemplate.sendAndReceive("","rpc_queue",message,correlationData);

        //输出消费者回调回来的消息
        System.out.println("生产者接收回调消息:"+new String(returnMsg.getBody()));

    }

}

消费者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;


@Component
public class RpcConsumer {

    @Autowired
    private RabbitTemplate replyTemplate;


    @RabbitListener(queues = "rpc_queue")
    public void receive(String message, Message msg, Channel channel) throws IOException {

        //接收来自生产者发布到队列的消息
        System.out.println("rpc模式消费者接收到消息:" + message);

        try{

            //封装一个消费者返回给生产者的消息,也就是rpc的回调消息
            MessageProperties messageProperties = new MessageProperties();
            //要从消息中取出生产者发布消息时设置的CorrelationId,此id标识消息来自哪个生产者,回调时会根据此id来给对应生产者返回消息
            messageProperties.setCorrelationId(msg.getMessageProperties().getCorrelationId());
            Message reMsg = new Message("返回给生产者的rpc回调消息".getBytes(StandardCharsets.UTF_8),messageProperties);

            //回调方法,指定路由键为配置类中注册的回调队列
            replyTemplate.send("","reply_queue",reMsg);

            //手动确认消息接收成功
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

        }catch(Exception e){

            //出现异常就拒绝消息
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);

        }

    }

}

使用测试

运行主启动类,调用地址:

localhost:8084/send/rpc?data=rpc模式测试消息

控制台输出如下,消费者输出了消息,生产者也接收到了回调:

rpc模式消费者接收到消息:rpc模式测试消息
生产者接收回调消息:返回给生产者的rpc回调消息


死信队列

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #监听配置
    listener:
      simple:
        #开启消费者手动确认消息,在spring boot中提供了三种确认模式:
        #  NONE - 使用rabbitmq的自动确认
        #  AUTO - 使用rabbitmq的手动确认, springboot会自动发送确认回执 (默认)
        #  MANUAL - 使用rabbitmq的手动确认, 且必须手动执行确认操作
        #  默认的AUTO模式中, 处理消息的方法抛出异常, 则表示消息没有被正确处理, 该消息会被重新发送.
        acknowledge-mode: manual

配置类

注册普通队列时使用deadLetterExchange()为其指定死信交换机

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    /**
     * 用于存放超时消息的死信队列
     * */
    @Bean("deadQueue")
    public Queue DeadQueue() {

        return QueueBuilder.durable("dead_queue").build();
    }

    /**
     * 死信交换机
     * */
    @Bean("deadExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("dead_exchange").durable(true).build();
    }

    /**
     * 绑定死信交换机与死信队列
     * */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding(
                "dead_queue",
                Binding.DestinationType.QUEUE,
                "dead_exchange",
                "dead-key",
                null
        );
    }


    /**
     * 创建普通队列,并绑定死信交换机,用于消息过期后把消息放入死信队列
     *
     * */
    @Bean("normalQueue")
    public Queue normalQueue() {
        return QueueBuilder
                .durable("normal_queue")
                //设置队列内所有消息的超时时间为5秒,并指定此队列的死信交换机
                .ttl(5000).deadLetterExchange("dead_exchange").deadLetterRoutingKey("dead-key")
                .build();
    }
    

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class DlxProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("dlx")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
         *  2.direct、topic等模式则填写对应的交换机和路由键
         * */
        rabbitTemplate.convertAndSend("","normal_queue",message);
    }


}

普通队列消费者

用于监听普通队列的消息,手动制造异常来让消息被拒绝,并投放到死信队列中

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import java.io.IOException;


@Component
public class NormalConsumer {

//    @RabbitListener(
//                    queuesToDeclare = @Queue(
//                            value = "normal_queue", // 普通队列名称
//                            durable = "true",      // 队列持久化
//                            autoDelete = "false", // 不自动删除
//                            arguments = {          // 配置死信队列的参数, 也可以设置其他队列属性
//                                    @Argument(name = "x-dead-letter-exchange", value = "dead_exchange"), // 指定死信交换机
//                                    @Argument(name = "x-dead-letter-routing-key", value = "dead-key"), // 指定死信路由键
//                                    @Argument(name = "x-message-ttl", value = "5000", type = "java.lang.Integer") // 设置消息过期时间为5秒
//                            }
//                    )
//    )
    //与上面的注解二选一使用,使用上面的注解后,可以不用再配置类中进行normal_queueu队列的配置,但死信队列的配置不能去掉
    @RabbitListener(queues = "normal_queue")
    public void receive1(String message, Message msg, Channel channel) throws IOException {

        try{

            //制造一个异常模拟消费者拒绝消息
            int i = 1/0;

            //手动确认消息接收成功
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);

        }catch(Exception e){

            System.out.println("出现异常,消费者拒绝接收消息: " + message+",异常信息为:"+e.getMessage());

            //出现异常就拒绝消息
            channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);

        }

    }

}

死信队列消费者

用于监听死信队列,当死信队列中存在消息时,会被此消费者消费

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DlxConsumer {

    @RabbitListener(queues = "dead_queue")
    public void receive1(String message) {
        System.out.println("死信队列消费者接收到消息: " + message);
    }

}

使用测试

运行主启动类,调用地址

localhost:8084/send/dlx?data=死信队列的测试消息

控制台输出:

出现异常,消费者拒绝接收消息: 死信队列的测试消息,异常信息为:/ by zero
死信队列消费者接收到消息: 死信队列的测试消息

消息会被普通消费者拒绝,存入死信队列并被死信队列的消费者消费



消息可靠性保证

在 RabbitMQ 中,为了保证消息的可靠性,主要通过以下机制来实现消息从生产者到消费者的可靠传递:

  1. 消息确认机制,用于保证消息可以从生产者投递到交换机中。
  2. 消息回退机制,用于保证消息可以从交换机投递到队列中。
  3. 消息持久化,确保消息在 RabbitMQ 中持久化存储,不会因服务崩溃或重启而丢失。
  4. 消费者的消息确认,消费者通过basicAckbasicNack等方法手动确认或拒绝消息。
  5. 死信队列消息被消费者拒绝并指定不重新入队列、消息在队列中过期、队列达到最大长度限制 等三种情况会将消息投入死信队列,可通过监听死信队列对失效消息进行处理
  6. 事务机制,使用 RabbitMQ 提供的事务机制(channel.txSelect()txCommit()txRollback()),但性能开销较大已不建议使用,使用 消息回退与确认机制 替代 事务机制,性能更优。
  7. 消息重试,使用Spring AMQP提供的重试机制,在消息消费失败后重新发送给消费者。

关于消费者的消息确认(在工作模式示例中)、死信队列已在上面展示过不再进行展示,事务机制已基本不用也不进行展示。下面主要演示消息确认回退机制、消息持久化、消息重试


消息确认机制示例

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #开启消息确认机制,保证消息可从生产者正确投递到交换机中
    publisher-confirm-type: correlated

在 Spring Boot 中,publisher-confirm-type 是用于配置 RabbitMQ 的生产者确认模式的属性,决定了 RabbitMQ 如何处理生产者发布消息的确认逻辑。以下是该配置的详细解析:

配置可选值

1. NONE(默认值)

  • 说明
    • 不启用生产者发布确认机制。
    • 生产者不会收到任何消息投递的成功或失败回调。
  • 适用场景
    • 对消息可靠性要求不高,或者使用其他机制保障可靠性。
  • 优缺点
    • 优点:性能高,没有额外的回调开销。
    • 缺点:如果消息丢失或未成功投递,生产者无法感知。

2. CORRELATED

  • 说明
    • 启用简单的发布确认模式。
    • 配合 ConfirmCallback方法,可以监听消息是否成功到达交换机。
  • 适用场景
    • 需要知道消息是否成功到达交换机。
    • 不需要追踪消息的详细流转状态。
  • 优缺点
    • 优点:简单易用,适合大多数场景。
    • 缺点:无法区分消息是否成功路由到队列。

3. SIMPLE

  • 说明

    • 启用完整的发布确认机制。
    • 配合 ConfirmCallbackReturnCallback方法,可以监听消息是否成功到达交换机以及是否成功路由到队列。
    • 消息可靠性更高。
  • 适用场景

    • 对消息可靠性要求较高的场景。
    • 需要捕获消息未成功路由到队列的详细信息。
  • 优缺点

    • 优点:功能强大,适用于复杂的业务场景。
    • 缺点:需要额外处理 ReturnCallback 的逻辑,稍复杂。

配置类

注册一个队列和带有消息确认函数rabbitmq模板,配置类中注释掉的交换机及绑定先不要放开,以观察测试效果。

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;


@Configuration
public class QueueConfig {

    //连接工厂
    @Autowired
    private ConnectionFactory connectionFactory;

    //direct模式队列
    @Bean("confirmCallQueue")
    public Queue confirmCallQueue() {
        return QueueBuilder.durable("confirmCall_queue").build();
    }


//    @Bean("confirmCallExchange")
//    public Exchange confirmCallExchange() {
//        return ExchangeBuilder.directExchange("confirmCall_exchange").durable(true).build();
//    }
//
//    @Bean
//    public Binding confirmCallBinding() {
//        return new Binding("confirmCall_queue", Binding.DestinationType.QUEUE, "confirmCall_exchange", "confirmCall-key", null);
//    }


    //配置带有消息确认函数的rabbitmq模板
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        /**
         * 消息确认方法:
         *  如果消息未成功从消费者到达交换机,ack参数为false,cause为失败原因
         *  correlationData中含有回调消息,但需要在生产者代码中手动添加才可在此处获取,可从中定义并取出消息内容等投递失败的具体信息。可取参数如下:
         *      1.message:被退回的消息本体,包含消息的内容和消息属性
         *      2.replyCode:RabbitMQ返回的退回代码,表示消息退回的原因,通过此代码可以判断失败类型
         *      3.replyText:RabbitMQ返回的退回描述文本。用于补充说明消息退回的具体原因。
         *      4.exchange:消息被投递的交换机名称
         *      5.routingKey:消息被投递时使用的路由键
         *
         * */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {

            if(ack){

                System.out.println("消息已正确发送到交换机");

            }else{

                ReturnedMessage rm = correlationData.getReturned();
                Message message = rm.getMessage();

                //从消息中获取消息属性
                MessageProperties messageProperties = message.getMessageProperties();

                //消息内容
                String data = new String(message.getBody(), StandardCharsets.UTF_8);

                //返回的退回代码:
                // 312: No route. 消息无法路由到任何队列。
                // 313: Message expired. 消息在队列中过期。
                // 404: Exchange not found. 指定的交换机不存在。
                int replyCode = rm.getReplyCode();

                //RabbitMQ 返回的退回描述文本。用于补充说明消息退回的具体原因。配合 replyCode 提供更清晰的退回原因描述。
                //"NO_ROUTE": 消息无法路由到任何队列。
                //"MESSAGE_EXPIRED": 消息过期。
                String replyText = rm.getReplyText();

                //消息被投递的交换机名称
                String exchange = rm.getExchange();

                //消息被投递时使用的路由键
                String routingKey = rm.getRoutingKey();

                System.out.println("消息未正确发送到交换机,失败原因是:"+cause+"\n投递失败消息的消息内容为:"+data+", 退回代码为:"+replyCode+", 退回描述文本为:"+replyText+", "+
                        "交换机名称为:"+exchange+", 路由键是:"+routingKey);

            }
        });


        return rabbitTemplate;
    }

}

生产者

指定一个不存在的交换机,以测试消息确认机制是否生效

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;


@RestController
@RequestMapping("send")
public class ConfirmProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("confirm")
    public void execute(@RequestParam String data, @RequestParam String key){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //关联消息确认方法中的correlationData参数,为其提供确认回调信息
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        //ReturnedMessage构造参数从左到右分别为:消息内容message、退回代码replyCode、退回描述replyText、队列名或路由键routingKey
        correlationData.setReturned(new ReturnedMessage(message,404,"NOT_FOUND","",key));

        //发送时,由于配置类中注释掉了交换机,这里相当于是指定了一个不存在的交换机,以测试消息确认机制生效结果
        rabbitTemplate.convertAndSend("confirmCall_exchange",key,message,correlationData);
        
    }

}

消费者

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "confirmCall_queue")
public class ConfirmConsumer {

    @RabbitHandler
    public void receive1(String message) {
        System.out.println("消费者接收到消息:" + message);
    }

}

使用测试

运行主启动类

当消息未成功发送到交换机的情况

调用地址:

localhost:8084/send/confirm?data=生产者消息确认机制的测试消息&key=confirmCall-key

控制台输出具体错误信息,可以看到,在确认方法ConfirmCallbackcause参数中,已包含了reply-codereply-text等信息

消息未正确发送到交换机,失败原因是:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirmCall_exchange' in vhost '/', class-id=60, method-id=40)

投递失败消息的消息内容为:生产者消息确认机制的测试消息, 退回代码为:404, 退回描述文本为:NOT_FOUND, 交换机名称为:, 路由键是:confirmCall-key

消息成功发送到交换机的情况

在这里放开配置类中注释的如下部分:

@Bean("confirmCallExchange")
public Exchange confirmCallExchange() {
	return ExchangeBuilder.directExchange("confirmCall_exchange").durable(true).build();
}

@Bean
public Binding confirmCallBinding() {
	return new Binding("confirmCall_exchange", Binding.DestinationType.QUEUE, "confirmCall_queue", "confirmCall-key", null);
}

然后再次调用地址:

localhost:8084/send/confirm?data=生产者消息确认机制的测试消息&key=confirmCall-key

控制台输出:

消息已正确发送到交换机
消费者接收到消息:生产者消息确认机制的测试消息

因为注释放开,交换机已存在,所以消息被成功路由并被消费者消费掉。



消息回退机制示例

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #开启消息退回机制,保证消息可从交换机投递到队列中
    publisher-returns: true
    template:
      # 启用强制路由(与 ReturnCallback 配合使用)
      mandatory: true

publisher-returns: true 的作用

启用消息的退回机制(ReturnCallback)。当消息无法成功路由到队列时(例如路由键不匹配,或队列不存在),RabbitMQ 会触发 ReturnCallback 回调方法,通知生产者消息未能被路由。

适用场景:

  • 交换机到队列的路由失败场景。
  • 消息丢失防护:通过 ReturnCallback 机制,可以捕获未被路由的消息并处理(例如重新发送或记录日志)。

mandatory 的作用

  • 如果 mandatorytrue,消息在无法路由到队列时,会触发 ReturnCallback
  • 如果 mandatoryfalse,消息无法路由时将被丢弃,且生产者不会收到任何通知。

配置类

注册一个交换机和带有消息回退函数rabbitmq模板,配置类中注释掉的队列及绑定先不要放开,以观察测试效果。

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    //连接工厂
    @Autowired
    private ConnectionFactory connectionFactory;

    //direct模式交换机
    @Bean("returnsCallExchange")
    public Exchange returnsCallExchange() {
        return ExchangeBuilder.directExchange("returnsCall_exchange").durable(true).build();
    }

    //direct模式队列
//    @Bean("returnsCallQueue")
//    public Queue returnsCallCallQueue() {
//        return QueueBuilder.durable("returnsCall_queue").build();
//    }
//
//    @Bean
//    public Binding returnsCallBinding() {
//        return new Binding("returnsCall_queue", Binding.DestinationType.QUEUE, "returnsCall_exchange", "returnsCall-key", null);
//    }


    //配置带有消息回退函数的rabbitmq模板
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

        // 配置 ReturnCallback,returnedMessage中包含退回信息
        rabbitTemplate.setReturnsCallback(returnedMessage -> {

            // 获取退回消息的相关信息:

            //原始消息内容,包含消息体和消息属性。
            String messageBody = new String(returnedMessage.getMessage().getBody());
            //RabbitMQ 的退回代码,指示失败的具体原因。
            int replyCode = returnedMessage.getReplyCode();
            //RabbitMQ 提供的退回描述信息。
            String replyText = returnedMessage.getReplyText();
            //消息被发送到的交换机名称。
            String exchange = returnedMessage.getExchange();
            //消息使用的路由键。
            String routingKey = returnedMessage.getRoutingKey();

            System.out.println("消息退回:");
            System.out.println("内容:" + messageBody);
            System.out.println("退回码:" + replyCode);
            System.out.println("原因:" + replyText);
            System.out.println("交换机:" + exchange);
            System.out.println("路由键:" + routingKey);

            // 可在此处理退回的消息,比如记录日志或重发等
        });

        // 设置消息强制路由
        rabbitTemplate.setMandatory(true);

        return rabbitTemplate;
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class ReturnProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("return")
    public void execute(@RequestParam String data, @RequestParam String key){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //发送时,由于配置类中注释掉了队列,这里相当于是指定了一个不存在的队列,以测试消息回退机制生效结果
        rabbitTemplate.convertAndSend("returnsCall_exchange",key,message);

    }

}

使用测试

测试消息未成功从交换机发送到队列的情况

运行主启动类,调用地址:

localhost:8084/send/return?data=生产者消息回退机制的测试消息&key=returnsCall-key

因为队列在配置类中被注释掉,消息无法路由,触发回退方法,所以控制台输出:

消息退回:
内容:生产者消息回退机制的测试消息
退回码:312
原因:NO_ROUTE
交换机:returnsCall_exchange
路由键:returnsCall-key

测试消息成功从交换机发送到队列的情况

放开配置类的如下注释部分:

    @Bean("returnsCallQueue")
    public Queue returnsCallCallQueue() {
        return QueueBuilder.durable("returnsCall_queue").build();
    }

    @Bean
    public Binding returnsCallBinding() {
        return new Binding("returnsCall_queue", Binding.DestinationType.QUEUE, "returnsCall_exchange", "returnsCall-key", null);
    }

并添加如下消费者:

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@RabbitListener(queues = "returnsCall_queue")
public class ReturnConsumer {

    @RabbitHandler
    public void receive1(String message) {
        System.out.println("消费者接收到消息:" + message);
    }

}

运行主启动类,再调用地址:

localhost:8084/send/return?data=生产者消息回退机制的测试消息&key=returnsCall-key

因为队列在配置类中被注册,消息可以路由到队列中,回退方法不会被触发,消费被消费者消费,所以控制台输出:

消费者接收到消息:生产者消息回退机制的测试消息

消息确认与回退对比

回调类型触发条件适用场景
ConfirmCallback消息是否成功到达交换机判断消息是否到达交换机
ReturnCallback消息未能路由到任何队列时触发捕获消息路由失败的详细信息


消息持久化示例

消息持久化是指 RabbitMQ 将消息存储到磁盘,而不是仅保存在内存中。持久化的消息即使在 RabbitMQ 服务重启后仍然可用。

需要配置持久化的对象

  1. 交换机(Exchange)持久化。
  2. 队列(Queue)持久化。
  3. 消息本身持久化。

yaml

消息持久化只需要基本连接配置信息即可,无需额外配置

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

配置消息持久化,需要先配置其使用的交换机及队列为持久化(简单模式等未使用交换机的模式可以不用配交换机持久化,因为其使用默认交换机)

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {


    @Bean
    public Queue durableQueue() {
        return QueueBuilder
            .durable("durable_queue")//队列持久化
            .build();
    }

    @Bean
    public DirectExchange durableExchange() {
        return ExchangeBuilder.directExchange("durable_exchange")
                .durable(true)  // 交换机持久化
                .build();
    }

    //绑定交换机与队列
    @Bean
    public Binding durableBinding() {
        return new Binding(
                "durable_queue", //队列名
                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
                "durable_exchange", //交换机名
                "durable-key",  //路由键
                null    //额外参数
        );
    }

}

生产者

调用消息属性的setDeliveryMode(MessageDeliveryMode.PERSISTENT)为消息指定持久化,MessageDeliveryMode.PERSISTENT对应的值为2

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class DurableProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("durable")
    public void execute(@RequestParam String data, @RequestParam String key){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //设置消息持久化
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  这里指定交换机为配置类中声明的,路由键由请求传入
         * */
        rabbitTemplate.convertAndSend("durable_exchange",key,message);
        
    }

}

使用测试

运行主启动类,调用地址:

localhost:8084/send/durable?data=持久化的测试消息&key=durable-key

访问管理页面,地址为你的ip:15672

http://192.168.137.200:15672/#/queues

查看队列,有1条未消费消息:

持久化的未消费消息

点击队列名进入队列详情页面,点击Get messgae按钮,查看消息,其持久化属性已被设置(蓝色框):

队列详情

使用linux命令systemctl restart rabbitmq-server重启rabbitmq服务,再回到管理页面查看,消息依然存在,持久化成功:

消息依然存在



消息重试示例

为什么需要消息重试?

  • 消费者处理消息时可能遇到短暂性错误(如服务不可用、网络波动等)。
  • 重试可以在一定时间内自动再次尝试处理消息,避免消息丢失。

注意事项

  • 消息重试针对的是消费者端在处理消息时出现异常、未能正确消费的情况,在消息未确认之前,RabbitMQ 会将消息重新投递到消费者。如果是消费者手动拒绝(执行 basicRejectbasicNack 方法)则不会触发重试机制。
  • 如果超过最大重试次数,消息会被丢弃或转入死信队列(如果配置了死信队列)。

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

    #在监听配置中开启消息重试
    listener:
      simple:
        retry:
          enabled: true               # 开启消费者重试
          max-attempts: 5             # 最大重试次数
          initial-interval: 3000      # 重试间隔时间(毫秒)
          #multiplier: 2.0            # 重试间隔的倍增因子,大于0时,下一次重试的延迟时间为:重试间隔时间 * 倍增因子 
          #max-interval: 10000        # 最大重试间隔时间(毫秒)

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class RetryProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("retry")
    public void execute(@RequestParam String data){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        /**
         * convertAndSend():将指定的消息对象转换为 AMQP 消息并发送到指定的交换机,支持通过指定的路由键进行消息路由。
         *  1.在使用简单、工作、广播模式时不需要指定交换机,使用的是默认的交换机(AMQP default),路由键处填写队列名即可(自动匹配队列)
         *  2.direct、topic等模式则填写对应的交换机和路由键
         * */
        rabbitTemplate.convertAndSend("","retry_queue",message);
    }
    
}

消费者

简化配置,不在配置类中以bean的形式创建队列,直接在消费者中使用注解声明

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
//如果retry_queue队列不存在会自动创建
@RabbitListener(queuesToDeclare = @Queue(value = "retry_queue"))
public class RetryCustomer {

    @RabbitHandler
    public static void excute(String message) {


        System.out.println(
                LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                        +"处理消息:" + message
        );

        System.out.println();

        int i = 1/0;


    }

}

使用测试

启动后,调用地址:

localhost:8084/send/retry?data=消息重试机制的测试消息

消费者控制台会输出5次处理信息,然后抛出异常并丢弃队列中的消息(不想丢弃可以结合死信队列处理):

2024-11-21 08:54:42处理消息:消息重试机制的测试消息

2024-11-21 08:54:45处理消息:消息重试机制的测试消息

2024-11-21 08:54:48处理消息:消息重试机制的测试消息

2024-11-21 08:54:51处理消息:消息重试机制的测试消息

2024-11-21 08:54:54处理消息:消息重试机制的测试消息


消息优先级

消息优先级(Message Priority)是 RabbitMQ 提供的一种机制,用于在消费者消费消息时,根据消息的重要性或优先级先处理高优先级的消息。

RabbitMQ 消息优先级的特点

  1. 队列优先级声明
    • 消息优先级在 RabbitMQ 中是以队列为单位配置的,所以要先在队列上开启优先级配置。
    • 一个队列可以设置一个最大优先级(x-max-priority),即优先级范围是 0 到最大优先级(通常不超过 255)。
  2. 消息优先级范围
    • 消息优先级默认为 0
    • 如果队列没有声明优先级,所有消息按先进先出(FIFO)处理。
  3. 优先级策略
    • 消息优先级越高,越早被消费者处理。
    • 优先级设置得过大可能会增加内存消耗和队列扫描的性能开销。
  4. 工作机制
    • 队列在存储消息时,会根据优先级对消息进行排序,高优先级的消息先被处理。

注意事项

  1. 优先级对性能的影响
    • 优先级队列可能会影响性能,因为 RabbitMQ 需要维护一个优先级排序。
    • 推荐在实际业务中设置适当的优先级范围,如 010
  2. 队列必须声明支持优先级
    • 如果队列未设置 x-max-priority,即使消息设置了优先级也不会生效。
  3. 优先级范围
    • 如果消息的优先级超出队列的 x-max-priority,则优先级被限制在最大值。
  4. 消息排序机制
    • RabbitMQ 并非严格意义的优先级队列,高优先级消息并不总是最先被处理,只能保证“尽量优先”。

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    //简单模式队列
    @Bean("priorityQueue")
    public Queue priorityQueue() {
        return QueueBuilder
                .durable("priority_queue") //持久化一个叫priority_queue的队列
                .maxPriority(10) //开启优先级配置,设置优先级范围为:0 - 10
                .build();
    }

}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;


@RestController
@RequestMapping("send")
public class PriorityProducer {

    //注入模板
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("priority")
    public void execute(@RequestParam String data, @RequestParam Integer number){

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        //设置消息的优先级
        messageProperties.setPriority(number);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //发布消息
        rabbitTemplate.convertAndSend("","priority_queue",message);
    }

}

消费者

需要注意的是,在生产者产生消息的速度大于消费者消费速度、且有消息堆积的情况下,设置队列及消息优先级才有意义,否则,生产者刚生产一条就被消费者消费了,无法体现优先级。

这里先注释掉@RabbitListener注解,让消息堆积在队列中,以测试优先级效果:

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class PriorityCustomer {

    //@RabbitListener(queues = "priority_queue")
    public static void excute(String message) {

        System.out.println(" Priority消费者接收到消息: " + message + "!");

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

使用测试

运行主启动类,分别调用下面地址(参数number代表消息的优先级大小,传入生产者代码),产生3条消息存于队列中:

localhost:8084/send/priority?data=优先级测试消息1&number=1
localhost:8084/send/priority?data=优先级测试消息2&number=2
localhost:8084/send/priority?data=优先级测试消息3&number=3

发送后看管理界面,看到有3条消息在队列中等待,蓝色的Pri标志代表队列是优先级队列

优先级队列3条消息

然后放开消费者@RabbitListener注解的注释,重启springboot主启动类查看控制台输出:

Priority消费者接收到消息: 优先级测试消息3!
Priority消费者接收到消息: 优先级测试消息2!
Priority消费者接收到消息: 优先级测试消息1!

优先级数字大的先被消费,所以打印顺序为3、2、1。



延迟队列

延迟队列的概念

延迟队列是一种特殊的消息队列,消息会在特定时间之后被投递给消费者,而不是立即投递。它广泛应用于以下场景:

  • 订单未支付自动取消。
  • 定时任务调度。
  • 延迟消息通知。

实现延迟队列的方法

方法一:使用 RabbitMQ 插件实现延迟队列

RabbitMQ 提供了 Delayed Message Plugin 插件,支持基于交换机的延迟消息功能。

方法二:使用 TTL + 死信队列实现延迟队列

  1. TTL(Time-To-Live):

    • 为消息或队列设置生存时间,消息超过生存时间后会变成死信。
  2. 死信队列:

    • 死信消息会被转发到绑定的死信交换机,再由死信交换机路由到指定的队列。

两种方法对比

方法优点缺点
TTL + 死信队列不依赖插件,兼容性强。实现复杂,无法实现单条消息不同延迟时间。
Delayed Message Plugin配置简单,支持单条消息设置不同延迟时间。依赖插件,需要额外安装,可能存在兼容性问题。
  • TTL + 死信队列适合场景较为固定的延迟需求,如所有消息有相同的延迟时间。
  • Delayed Message Plugin更灵活,适合需要动态设置消息延迟时间的场景。
  • 根据业务需求选择合适的实现方式,同时注意性能影响和插件的兼容性。

插件方式实现

插件下载

访问地址:

https://www.rabbitmq.com/community-plugins#routing

点击下图红框处的Releases

延迟队列插件下载1

跳转github后,选择对应的版本下载,我这里选择当前最新版4.0.2的.ez文件 (ez文件对应linux系统)

延迟队列下载插件2

将下载后的ez文件放入/usr/lib/rabbitmq/lib/rabbitmq_server-4.0.3/plugins目录中,注意4.0.3是版本号,你的可能不同。

放入后,使用以下命令启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果

[root@Centos7 ~]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@localhost:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.

验证插件是否启用成功:

rabbitmq-plugins list

在输出中找到 rabbitmq_delayed_message_exchange,并确保其状态为 [E*](已启用)。

[root@Centos7 ~]# rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@localhost
 |/
[  ] rabbitmq_amqp1_0                  4.0.3
[  ] rabbitmq_auth_backend_cache       4.0.3
[  ] rabbitmq_auth_backend_http        4.0.3
[  ] rabbitmq_auth_backend_ldap        4.0.3
[  ] rabbitmq_auth_backend_oauth2      4.0.3
[  ] rabbitmq_auth_mechanism_ssl       4.0.3
[  ] rabbitmq_consistent_hash_exchange 4.0.3
[E*] rabbitmq_delayed_message_exchange 4.0.2
[  ] rabbitmq_event_exchange           4.0.3
[  ] rabbitmq_federation               4.0.3
[  ] rabbitmq_federation_management    4.0.3
[  ] rabbitmq_federation_prometheus    4.0.3
[  ] rabbitmq_jms_topic_exchange       4.0.3
[E*] rabbitmq_management               4.0.3
[e*] rabbitmq_management_agent         4.0.3
[  ] rabbitmq_mqtt                     4.0.3
[  ] rabbitmq_peer_discovery_aws       4.0.3
[  ] rabbitmq_peer_discovery_common    4.0.3
[  ] rabbitmq_peer_discovery_consul    4.0.3
[  ] rabbitmq_peer_discovery_etcd      4.0.3
[  ] rabbitmq_peer_discovery_k8s       4.0.3
[  ] rabbitmq_prometheus               4.0.3
[  ] rabbitmq_random_exchange          4.0.3
[  ] rabbitmq_recent_history_exchange  4.0.3
[  ] rabbitmq_sharding                 4.0.3
[  ] rabbitmq_shovel                   4.0.3
[  ] rabbitmq_shovel_management        4.0.3
[  ] rabbitmq_shovel_prometheus        4.0.3
[  ] rabbitmq_stomp                    4.0.3
[  ] rabbitmq_stream                   4.0.3
[  ] rabbitmq_stream_management        4.0.3
[  ] rabbitmq_top                      4.0.3
[  ] rabbitmq_tracing                  4.0.3
[  ] rabbitmq_trust_store              4.0.3
[e*] rabbitmq_web_dispatch             4.0.3
[  ] rabbitmq_web_mqtt                 4.0.3
[  ] rabbitmq_web_mqtt_examples        4.0.3
[  ] rabbitmq_web_stomp                4.0.3
[  ] rabbitmq_web_stomp_examples       4.0.3

重启rabbitmq服务

使用如下命令重启

systemctl restart rabbitmq-server

重启后,下面编写代码测试。

yaml

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;


@Configuration
public class QueueConfig {

    // 配置延迟交换机
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        //延迟参数
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, arguments);
    }

    // 延迟队列
    @Bean
    public Queue delayedQueue() {
        return QueueBuilder
                .durable("delayed_queue") //持久化一个叫simple_queue的队列
                .build();
    }

    // 绑定延迟队列和交换机
    @Bean
    public Binding delayedBinding() {

        return BindingBuilder.bind(delayedQueue())
                .to(delayedExchange())
                .with("delayed_key")
                .noargs();

// 或使用如下名称注册方式
//        return new Binding(
//                "delayed_queue", //队列名
//                Binding.DestinationType.QUEUE, //指定绑定的目标类型为队列
//                "delayed_exchange", //交换机名
//                "delayed_key",  //路由键,广播模式不需要路由键
//                null    //额外参数
//        );

    }
}

生产者

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@RestController
@RequestMapping("send")
public class DelayedMessageProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("delayed")
    public void sendDelayedMessage(@RequestParam String data) {

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();
        
        //设置延迟时间(毫秒),这里为10秒的发送延迟
        messageProperties.setDelay(10000);
        //也可以使用这种方式设置
        //messageProperties.setHeader("x-delay", 10000); // 设置延迟时间(毫秒)
        
        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_key", message);

        System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                +",生产者发送延迟消息:" + data);
    }
}

消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
public class DelayedMessageConsumer {

    @RabbitListener(queues = "delayed_queue")
    public void receiveMessage(String message) {
        System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                +",消费者接收到延迟消息:" + message);
    }
}

使用测试

运行代码,访问mq管理页面ip:15672,看到多出了一个带有DM标识的交换机,即为新加的延迟交换机

延迟交换机

然后调用地址

localhost:8084/send/delayed?data=这是一条延迟队列测试消息

控制台输出如下,注意要等待10秒才会输出消费者消息,因为生产者代码中设置了10秒的发送延迟:

时间点2024-11-21 14:38:27,生产者发送延迟消息:这是一条延迟队列测试消息
时间点2024-11-21 14:38:37,消费者接收到延迟消息:这是一条延迟队列测试消息

死信队列实现延迟

yaml

配置基本信息即可

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

注册普通队列、死信队列、死信交换机及绑定关系,注意为普通队列指定死信队列是要设置超时时间,以实现延迟

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    /**
     * 用于存放超时消息的死信队列
     * */
    @Bean("deadDelayQueue")
    public Queue DeadQueue() {

        return QueueBuilder.durable("dead_delay_queue").build();
    }

    /**
     * 死信交换机
     * */
    @Bean("deadDelayExchange")
    public Exchange deadDelayExchange() {
        return ExchangeBuilder.directExchange("dead_delay_exchange").durable(true).build();
    }

    /**
     * 绑定死信交换机与死信队列
     * */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding(
                "dead_delay_queue",
                Binding.DestinationType.QUEUE,
                "dead_delay_exchange",
                "dead-delay-key",
                null
        );
    }


    /**
     * 创建普通队列,并绑定死信交换机,用于消息过期后把消息放入死信队列
     *
     * */
    @Bean("normalDelayQueue")
    public Queue normalDelayQueue() {
        return QueueBuilder
                .durable("normal_delay_queue")
                //设置队列内所有消息的超时时间为5秒
                .ttl(5000)
            	//指定此队列的死信交换机及路由键
                .deadLetterExchange("dead_delay_exchange").deadLetterRoutingKey("dead-delay-key")
                .build();
    }
}

生产者

直接向普通队列中发布消息

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@RestController
@RequestMapping("send")
public class DeadDelayedProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("deadDelayed")
    public void sendDelayedMessage(@RequestParam String data) {

        //MessageProperties: 消息属性。MessageProperties提供了很多方法来对消息的属性进行设置
        MessageProperties messageProperties = new MessageProperties();

        //设置消息类型为文本: text/plain
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);

        //消息对象
        Message message = new Message(data.getBytes(StandardCharsets.UTF_8), messageProperties);

        //向普通队列投放消息
        rabbitTemplate.convertAndSend("", "normal_delay_queue", message);

        System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                +",死信生产者发送延迟消息:" + data);
    }
}

死信队列消费者

监听死信队列来消费延迟消息,之所以不创建普通队列消费者,是为了让投递到普通队列的消息无法被消费,进而达到过期时间被放入死信队列,来让死信消费者消费。这样就以间接方式实现了延迟队列(5秒延迟)

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;


@Component
public class DeadDelayedConsumer {

    @RabbitListener(queues = "dead_delay_queue")
    public void receiveMessage(String message) {
        System.out.println("时间点"+LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
                +",死信队列消费者接收到延迟消息:" + message);
    }
}

使用测试

运行代码后,请求地址:

localhost:8084/send/deadDelayed?data=这是一条死信队列实现方式的延迟队列测试消息

控制台输出如下,需等待5秒显示完毕:

时间点2024-11-21 15:37:59,死信生产者发送延迟消息:这是一条死信队列实现方式的延迟队列测试消息
时间点2024-11-21 15:38:04,死信队列消费者接收到延迟消息:这是一条死信队列实现方式的延迟队列测试消息

可见两者输出间隔5秒,实现了延迟发送。



消息唯一性

在 RabbitMQ 中,确保消息唯一性是为了防止重复消息的处理,RabbitMQ自身没有去重机制,但通常可使用消息的唯一标识来实现。下面列举一个简单的例子:

RabbitMQ 的消息属性中包含一个 Message ID 字段,可以为每条消息指定一个全局唯一的 ID,用于标识消息。

生产者设置 Message ID

在消息发送时,生产者为每条消息生成一个唯一的 ID(如 UUID)。

MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message("message content".getBytes(), messageProperties);
rabbitTemplate.send("exchange_name", "routing_key", message);

消费者校验唯一性

消费者处理消息时,检查 Message ID 是否已处理过。可使用 Redis、数据库等存储已处理的 Message ID,并在处理前进行去重校验。

if (!redis.contains(message.getMessageProperties().getMessageId())) {
    // 处理消息
    processMessage(message);
    // 将消息ID存入Redis
    redis.save(message.getMessageProperties().getMessageId());
} else {
    // 忽略重复消息
    System.out.println("Duplicate message ignored");
}

优点

  • 消息的唯一性完全由 Message ID 确保,与 RabbitMQ 的内部机制无关。
  • 适合高并发场景。

注意事项

  • 消息的 Message ID 必须全局唯一,可以使用 UUID 或数据库的唯一键生成。
  • 消费者的去重存储系统(如 Redis)的容量和性能要能够满足业务需求。


发布与消费自定义类型消息

在使用 RabbitTemplate 发布消息时,可以将自定义类型的对象进行序列化并发送,而不是直接发送字符串。以下是实现的步骤和关键点:

yaml

配置基本连接信息即可

server:
  port: 8084

spring:
  application:
    name: RabbitMQ-Demo
  rabbitmq:
    host: 192.168.137.200
    port: 5672
    username: admin
    password: admin
    virtual-host: /

配置类

配置消息的序列化机制

RabbitTemplate 默认使用 SimpleMessageConverter 将消息转为字符串。为了支持自定义对象类型,可以配置其他 MessageConverter,例如 Jackson2JsonMessageConverter,将对象序列化为 JSON 格式:

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class QueueConfig {

    @Autowired
    private ConnectionFactory connectionFactory;

    //声明队列
    @Bean("serializeQueue")
    public Queue serializeQueue() {
        return QueueBuilder.durable("serialize_queue").build();
    }

    //配置序列化器
    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    //配置模板,指定序列化器
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }


}

定义自定义对象

定义要发送的自定义对象,确保其可被序列化(通过 Jackson 或其他方式)。

import com.fasterxml.jackson.annotation.JsonProperty;


public class CustomMessage {

    @JsonProperty("id")
    private String id;
    
    @JsonProperty("content")
    private String content;


    public CustomMessage(String id, String content) {
        this.id = id;
        this.content = content;
    }

    public CustomMessage() {}

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}

生产者

发布自定义对象,通过 RabbitTemplate 发送消息时,直接传递自定义对象即可。

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("send")
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("serialize")
    public void sendMessage() {
        CustomMessage message = new CustomMessage("12345", "Hello RabbitMQ!");
        rabbitTemplate.convertAndSend("", "serialize_queue", message);
        System.out.println("生产者发送消息: " + message);
    }
}

消费者

接收自定义对象,确保消费者可以正确反序列化消息。

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(queues = "serialize_queue")
    public void receiveMessage(CustomMessage message) {
        System.out.println("消费者接受消息,消息id: " + message.getId() + ", 消息content: " + message.getContent());
    }
}

注意事项

  1. 类型匹配

    • 消费者方法的参数类型必须与发送的对象类型一致,否则会抛出序列化/反序列化错误。
  2. 依赖配置

    • 如果使用 Jackson2JsonMessageConverter,请确保 Jackson的相关依赖已引入:

      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.x.x</version>
      </dependency>
      
  3. 序列化格式

    • 默认情况下,Jackson2JsonMessageConverter 会将对象序列化为 JSON 格式并设置 content-typeapplication/json
  4. 其他序列化方式

    • 如果需要使用其他序列化机制,可以自定义实现 MessageConverter

使用测试

启动代码后调用地址:

localhost:8084/send/serialize

控制台输出:

生产者发送消息: com.Serializable.CustomMessage@4c9be371
消费者接受消息,消息id: 12345, 消息content: Hello RabbitMQ!

通过上述配置和代码,自定义类型的对象可以轻松在 RabbitTemplate 中传递并消费。


http://www.kler.cn/a/411184.html

相关文章:

  • Spring |(五)IoC/DI的注解开发
  • CSS —— 子绝父相
  • 永磁同步电机末端振动抑制(输入整形)
  • qt+opengl 三维物体加入摄像机
  • leetcode 面试150之 156.LUR 缓存
  • 【强化学习的数学原理】第04课-值迭代与策略迭代-笔记
  • 避坑ffmpeg直接获取视频fps不准确
  • CBK7运营安全
  • C语言学习 12(指针学习1)
  • 图像小波去噪与总变分去噪详解与Python实现
  • C++ 优先算法 —— 无重复字符的最长子串(滑动窗口)
  • 【pyspark学习从入门到精通19】机器学习库_2
  • 第六届国际科技创新学术交流大会暨新能源科学与电力工程国际(NESEE 2024)
  • Scala文件读写——成绩分析
  • 【AI绘画】Midjourney进阶:色调详解(上)
  • pyshark安装使用,ubuntu:20.04
  • QT6学习第四天
  • PAT甲级-1145 Average Search Time
  • C# 结构体
  • C#基础练习61-65
  • MMCM DRP动态配置方法(超详细讲解)
  • Spring Boot 3.4 正式发布,结构化日志!
  • 【Redis篇】String类型命令详讲以及它的使用场景
  • 互联网直播/点播EasyDSS视频推拉流平台视频点播有哪些技术特点?
  • 实战项目负载均衡式在线 OJ
  • Notepad++ 替换所有数字给数字加单引号