当前位置: 首页 > article >正文

【RabbitMQ】Spring Boot 结合 RabbitMQ 完成应用间的通信

   🔥个人主页: 中草药

🔥专栏:【中间件】企业级中间件剖析


 Spring 框架与 RabbitMQ 的整合主要通过 Spring AMQP(Advanced Message Queuing Protocol)模块实现,提供了便捷的消息队列开发能力。

引入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

配置

#配置RabbitMQ的基本信息
spring:
 rabbitmq:
 host: 110.41.51.65
 port: 15673 #默认为5672
 username: study
 password: study
 virtual-host: bite #默认值为

#或者以下这种方式
 rabbitmq:
   addresses: #amqp://username:password@Ip:port/virtual-host

一、工作队列模式(Work Queue)

场景:多个消费者共享一个队列,消息被轮询分发(Round-Robin),用于任务分发和负载均衡(如耗时任务处理)。

producer

@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/work")
    public String work(){
        //使用内置交换机 RoutingKey 和队列名称一致
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE,"Hello World"+i);
        }
        return "OK";
    }
}

@RabbitListener 是 Spring 框架中用于监听 RabbitMQ 队列的注解,通过使用这个注解,可以定义一个方法,以便从 RabbitMQ 队列中接收消息(相当于消费者)。该注解支持多种参数类型,这些参数类型代表了从 RabbitMQ 接收到的消息和相关信息。

以下是一些常用的参数类型:

  1. String:返回消息的内容。
  2. Messageorg.springframework.amqp.core.Message):Spring AMQP 的 Message 类,返回原始的消息体以及消息的属性,如消息 ID、内容、队列信息等。
  3. Channelcom.rabbitmq.client.Channel):RabbitMQ 的通道对象,可以用于进行更高级的操作,如手动确认消息 。
@Component
public class WorkListener {

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener1(Message message, Channel channel) {
        System.out.println("Listener1:["+Constants.WORK_QUEUE+"]"+message+",channel:"+channel);
    }

    @RabbitListener(queues = Constants.WORK_QUEUE)
    public void queueListener2(String message) {
        System.out.println("Listener2:["+Constants.WORK_QUEUE+"]"+message);
    }
}

观察控制台

轮询分发:默认按顺序将消息分发给不同消费者。

二、发布订阅模式(Publish/Subscribe)

场景:消息广播到所有绑定的队列(如日志广播、事件通知),使用 Fanout 交换机

声明使用交换机和队列 并绑定

@Configuration
public class RabbitMQConfig {
    @Bean("fanoutQueue1")
    public Queue funoutQueue1() {
        return QueueBuilder.durable(Constants.FUNOUT_QUEUE1).build();
    }

    @Bean("fanoutQueue2")
    public Queue funoutQueue2() {
        return QueueBuilder.durable(Constants.FUNOUT_QUEUE2).build();
    }

    @Bean("funoutExchange")
    public FanoutExchange funoutExchange(){
        return ExchangeBuilder.fanoutExchange(Constants.FUNOUT_EXCHANGE).durable(true).build();
    }

    @Bean("funoutQueueBinding1")
    public Binding funoutQueueBinding1(@Qualifier("fanoutQueue1") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){
        return BindingBuilder.bind(queue).to(funoutExchange);
    }

    @Bean("funoutQueueBinding2")
    public Binding funoutQueueBinding2(@Qualifier("fanoutQueue2") Queue queue,@Qualifier("funoutExchange") FanoutExchange funoutExchange){
        return BindingBuilder.bind(queue).to(funoutExchange);
    }
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/funout")
    public String funout(){
        rabbitTemplate.convertAndSend(Constants.FUNOUT_EXCHANGE,"","Hello Spring funout");
        return "OK";
    }
}

listener

@Component
public class FunoutListener {

    @RabbitListener(queues = Constants.FUNOUT_QUEUE1)
    public void queueListener1(String message) {
        System.out.println("Listener1:["+Constants.FUNOUT_QUEUE1+"]"+message);
    }

    @RabbitListener(queues = Constants.FUNOUT_QUEUE2)
    public void queueListener2(String message) {
        System.out.println("Listener2:["+Constants.FUNOUT_QUEUE2+"]"+message);
    }
}

在实际开发之中,一个 listener 监听一个 queue 

三、路由模式(Routing)

场景:根据 路由键(Routing Key) 精准匹配,将消息发送到指定队列,使用 Direct 交换机

声明使用交换机和队列 并绑定

@Configuration
public class RabbitMQConfig {
     //direct
    @Bean("directQueue1")
    public Queue directQueue1() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
    }

    @Bean("directQueue2")
    public Queue directQueue2() {
        return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
    }

    @Bean("directExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
    }

    @Bean("directQueueBinding1")
    public Binding directQueueBinding1(@Qualifier("directQueue1") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }

    @Bean("directQueueBinding2")
    public Binding directQueueBinding2(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("black");
    }

    @Bean("directQueueBinding3")
    public Binding directQueueBinding3(@Qualifier("directQueue2") Queue queue,@Qualifier("directExchange") DirectExchange directExchange){
        return BindingBuilder.bind(queue).to(directExchange).with("orange");
    }
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

      @RequestMapping("/direct/{routingKey}")
    public String direct(@PathVariable String routingKey){
        rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE,routingKey,"Hello Spring direct "+routingKey);
        return "direct OK";
    }
}

listener

@Component
public class DirectListener {

    @RabbitListener(queues = Constants.DIRECT_QUEUE1)
    public void queueListener1(String message) {
        System.out.println("Listener1:["+Constants.DIRECT_QUEUE1+"]"+message);
    }

    @RabbitListener(queues = Constants.DIRECT_QUEUE2)
    public void queueListener2(String message) {
        System.out.println("Listener2:["+Constants.DIRECT_QUEUE2+"]"+message);
    }
}

如果某一个队列即绑定了black和orange,将会分别发送到队列

四、通配符模式(Topics)

场景:根据路由键的 通配符规则 匹配队列,使用 Topic 交换机,支持 *(匹配一个单词)和 #(匹配多个单词)。

Topics 和 Routing 模式的区别是:

  1. topics 模式使用的交换机类型为 topic (Routing 模式使用的交换机类型为 direct)
  2. topic 类型的交换机在匹配规则上进行了扩展,Binding Key 支持通配符匹配

声明使用交换机和队列 并绑定

@Configuration
public class RabbitMQConfig {
      //TOPIC
    @Bean("topicExchange")
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
    }

    @Bean("topicQueue1")
    public Queue topicQueue1(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
    }

    @Bean("topicQueue2")
    public Queue topicQueue2(){
        return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
    }

    @Bean("topicQueueBinding1")
    public Binding topicQueueBinding1(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue1") Queue topicQueue){
        return BindingBuilder.bind(topicQueue).to(topicExchange()).with("*.orange.*");
    }

    @Bean("topicQueueBinding2")
    public Binding topicQueueBinding2(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("*.*.rabbit");
    }

    @Bean("topicQueueBinding3")
    public Binding topicQueueBinding3(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("topicQueue2") Queue topicQueue){
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("lazy.#");
    }
}

producer


@RestController
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

      @RequestMapping("/topic/{routingKey}")
    public String topic(@PathVariable String routingKey){
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE,routingKey,"Hello Spring topic "+routingKey);
        return "topic OK";
    }
}

listener

@Component
public class TopicListener {

    @RabbitListener(queues = Constants.TOPIC_QUEUE1)
    public void queueListener1(String message) {
        System.out.println("Listener1:["+Constants.TOPIC_QUEUE1+"]"+message);
    }

    @RabbitListener(queues = Constants.TOPIC_QUEUE2)
    public void queueListener2(String message) {
        System.out.println("Listener2:["+Constants.TOPIC_QUEUE2+"]"+message);
    }
}

五、完成应用通信

SpringBoot 整合 RabbitMQ 实现应用通信是微服务/分布式系统中常见的异步解耦方案。

以此图为实例

订单系统

@Configuration
public class RabbitMQConfig {
    @Bean("orderQueue")
    public Queue orderQueue() {
        return QueueBuilder.durable("order.create").build();
    }
}


@RestController
@RequestMapping("/order")
public class OrderController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/create")
    public String create(){
        rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+ UUID.randomUUID());
        //这里仅仅是模拟演示,实际的下单操作比较复杂,包括参数的校验,数据库存储等等 业务代码省略
        return "下单成功";
    }
}

 物流系统

@Component
@RabbitListener(queues = "order.create")
public class OrderListener {
    @RabbitHandler
    //该注解根据所识别的数据类型不同自动分配不同的方法
    public void handleOrder(String orderInfo) {
        System.out.println("接收到新的订单消息:"+orderInfo);
        //接收到订单消息后,进行相应的业务出路 代码省略
    }

    @RabbitHandler
    //在此处为方便演示我们将order-service 让此项目应用
    //正确的做法是将OrderInfo抽取出来单独作为一个包,两个service都引用这个包
    public void handleOrder2(OrderInfo orderInfo) {
        System.out.println("接收到新的订单消息:"+orderInfo);
    }
}

@RabbitHandler 注解用于标记方法,这些方法会根据消息的类型来处理接收到的消息。当一个消息监听器容器接收到消息时,它会根据消息的类型选择合适的 @RabbitHandler 注解的方法来处理该消息。 

测试结果

当在发送的是一个对象时,为保证对象的可读性,我们要保证对象可被序列化,且通过 Jackson2JsonMessageConverter 将其从原生序列化改为Json格式

@Configuration
public class RabbitMQConfig {
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
}

对于Listener,为保证同样具有解读json的能力,也应该去加上相同的配置 

@Data
public class OrderInfo implements Serializable {//实现序列化接口
    private String orderId;
    private String name;
}

 可观察结果


对时间的慷慨,就等于慢性自杀。——奥斯特洛夫斯基

🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀🍀

以上,就是本期的全部内容啦,若有错误疏忽希望各位大佬及时指出💐

  制作不易,希望能对各位提供微小的帮助,可否留下你免费的赞呢🌸 

 


http://www.kler.cn/a/574417.html

相关文章:

  • HCIA-IP路由动态-RIP
  • 计算机网络笔记(一)——1.1计算机网络在信息时代中的作用
  • SpringBoot集成Sentry日志收集-1 (Sentry安装)
  • 论文粗读——Isometric 3D Adversarial Examples in the Physical World
  • Emulate Docker CLI using podman. Create /etc/containers/nodocker to quiet msg.
  • Linux网络环境配置及常用命令
  • JavaScript 基础语法
  • 【区块链 + 绿色低碳】东方易电城市微电网智能平台 | FISCO BCOS 应用案例
  • C++ 单词识别_牛客题霸_牛客网
  • flink重启策略
  • JMeter 断言最佳实践
  • plt和cv2有不同的图像表示方式和颜色通道顺序
  • pytorch3d学习(一)——开始(架构概述、输入数据、相机坐标系、纹理渲染)
  • Golang的网络流量控制
  • 【每日八股】Redis篇(三):持久化(上)
  • 自律linux 第 34 天
  • 怎么做数据冷热分离?怎么做分库分表?为什么要用ES?
  • 大模型——模型上下文协议 (MCP)
  • 配电柜/环网柜温湿度控制装置 功能参数介绍
  • Android MVC、MVP、MVVM三种架构的介绍和使用。