当前位置: 首页 > article >正文

从0开始搭建一个生产级SpringBoot2.0.X项目(十)SpringBoot 集成RabbitMQ

前言

最近有个想法想整理一个内容比较完整springboot项目初始化Demo。

SpringBoot集成RabbitMQ

 RabbitMQ中的一些角色:

  • publisher:生产者

  • consumer:消费者

  • exchange个:交换机,负责消息路由

  • queue:队列,存储消息

  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。 

junit用于测试。

一、pom引入依赖amqp

        <!--rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <!--rabbitmq-->

二、application-dev.yaml 增加RabbitMQ相关配置

spring:
  #RabbitMQ服务器配置,地址账号密码,virtualhost等配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: murg
    password: 123456
    virtual-host: murg-host
    #队列中没有消息,阻塞等待时间
    template:
      receive-timeout: 2000
logging:
  level:
    org.springframework.security: debug


三、发布/订阅

RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型。此处只列举发布/订阅模式。

此模式下根据交换机类型又分为三种。

1.Fanout类型:   广播模式    把消息交给所有绑定到交换机的队列

2.Direct类型:    路由模式     把消息交给符合指定routing key 的队列

3Topic类型:     主题模式      把消息交给符合主题通配符的队列

3.1 Fanout类型

在广播模式下,消息发送流程是这样的:

1) 可以有多个队列

2) 每个队列都要绑定到Exchange(交换机)

3) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定

4) 交换机把消息发送给绑定过的所有队列

5) 订阅队列的消费者都能拿到消

3.1.1 声明Fanout类型交换机和队列 将交换机和队列绑定在一起

创建配置类FanoutConfig 

声明一个Fanout类型交换机命名为murg.fanout

声明两个Queue队列分别为fanout.queue1和fanout.queue2

分别将两个队列和交换机绑定,后续用于消费消息。

package com.murg.bootdemo.rabbitmq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Fanout 广播模式下
 *声明交换机和队列
 */
@Configuration
public class FanoutConfig {
    /**
     * 声明交换机和队列 将交换机和队列绑定在一起
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("murg.fanout");
    }

    /**
     * 第1个队列
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 第2个队列
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

3.1.2创建消息生产服务

创建消息生产服务MessageProducerService ,注入RabbitTemplate 用于发送消息。

 注意一点是rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列

新增测试广播模式下发送消息方法

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * Rabbitmq消息生产者
 */
@Service
@RequiredArgsConstructor
public class MessageProducerService {
    private final RabbitTemplate rabbitTemplate;

    /**
     * 测试广播模式下发送消息
     * @param msg
     */
    public void testFanoutExchange(String msg) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.fanout","",msg);
    }

   
}

3.13创建消息消费服务

创建消息生产服务MessageConsumerService 

@RabbitListener是 SpringAMQP AMQP提供的注解,用于简化 RabbitMQ 消息监听器的创建。通过在方法上添加 @RabbitListener 注解,可以将方法注册为消息监听器用于处理从 RabbitMQ 中接收到的消息。queues 参数定义队列名字 ,此处创建两个监听 在上述FanoutConfig 中已经将这两个队列和交换机murg.fanout绑定,所有可同时消费murg.fanout交换机的消息。

package com.murg.bootdemo.rabbitmq;

import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;

/**
 * 消息消费者
 *
 */
@Service
@RequiredArgsConstructor
public class MessageConsumerService {

    /**
     * 下面是Fanout,广播模式的监听
     * 通过RabbitListener监听队列名字FanoutConfig 中定义的 fanout.queue1 和fanout.queue2
     * @param msg
     */
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }


  
}

3.14修改测试类进行测试

修改创建项目时生成的 BootdemoApplicationTests.class

增加以下注解

@ActiveProfiles("dev") 

指定运行环境为开发环境
@RunWith(SpringRunner.class)

指定测试类的运行器(Runner)。其主要作用是将Spring的测试支持集成到JUnit测试中,使得在运行JUnit测试时,Spring的上下文可以被正确地加载和配置。

@SpringBootTest(classes={BootdemoApplication.class})

指定启动类

package com.murg.bootdemo;


import com.murg.bootdemo.rabbitmq.MessageProducerService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Profile;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.junit4.SpringRunner;

@ActiveProfiles("dev")
@RunWith(SpringRunner.class)
@SpringBootTest(classes={BootdemoApplication.class})// 指定启动类
public class BootdemoApplicationTests {
    @Autowired
    MessageProducerService messageProducerService;
    @Test
    public void contextLoads() {
        System.out.printf("aaaaaaaaaaaaa");
    }
    //测试
    @Test
    public void testFanoutExchange(){
        String msg = "遍身罗绮者,不是养蚕人";
        for (int i=0;i<10;i++){
            messageProducerService.testFanoutExchange(msg);
        }
    }

}

3.15 增加一个测试方法调用消息生产服务,发送 Fanout类型的消息。运行测试控制台输出结果

两个队列都可消费。 

3.2 Direct类型

3.2.1MessageProducerService生产服务增加方法testDirectExchange

在Direct模型下:
 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
 Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

/**
     * 测试Direct模式下发送消息
     * 在Direct模型下:
     *
     * 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
     *
     * 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
     *
     * Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
     * murg.Direct
     * @param msg
     */
    public void testDirectExchange(String msg,String rountingkey) {
        // 发送消息
        //murg.fanout交换机名字 已经在 FanoutConfig中配置并且初始化了
        //rabbittemplate.convertAndSend不会自己创建队列,要先在控制台手动创建一个队列或者再消费者配置中声明一个队列
        rabbitTemplate.convertAndSend("murg.direct",rountingkey,msg);
    }

3.2.2MessageConsumerService消费服务增加方法testDirectExchange

Direct模式的消费监听修改队列和交换机的方式改为注解方式。
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:@Exchange(name = "murg.direct",type = ExchangeTypes.DIRECT)声明交换机名字及类型
通过key的值声明接收不同路由的消息

direct.queue1 消费 rountingkey为“蚕妇”的消息

direct.queue2 消费 rountingkey为“自京赴奉先县咏怀五百字”的消息

    /**
     *
     * 下面是rountingKey 路由key 模式的消费监听
     * 基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
     *
     * 在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
     *
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),//定义队列名字 direct.queue1
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"蚕妇"} //指定消费rountingkey
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),//定义队列名字 direct.queue2
            exchange = @Exchange(name = "murg.direct", type = ExchangeTypes.DIRECT),//指定交换机和交换机类型
            key = {"自京赴奉先县咏怀五百字"}//指定消费rountingkey
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }

3.2.3测试类增加测试方法

    @Test
    public void testDirectExchange() throws InterruptedException {
        String msg1 = "遍身罗绮者,不是养蚕人";
        String key1 ="蚕妇";
        String msg2 = "朱门酒肉臭,路有冻死骨";
        String key2 ="自京赴奉先县咏怀五百字";

        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testDirectExchange(msg2,key2);
                Thread.sleep(1000);
            }else {
                messageProducerService.testDirectExchange(msg1,key1);
                Thread.sleep(1000);

            }

        }
    }

调用消息生产服务,发送 Direct类型的消息。运行测试控制台输出结果

3.3 Topic类型

3.3.1MessageProducerService生产服务增加方法testTopicExchange

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
举例:
demo.#:能够匹配demo.spu.insert 或者 demo.spu    #.demo写法也可以
demo.*:只能匹配demo.spu

  public void testTopicExchange(String msg, String key) {
        rabbitTemplate.convertAndSend("murg.topic",key,msg);
    }

3.3.2MessageConsumerService消费服务增加方法testDirectExchange

@Exchange(name = "murg.topic",type = ExchangeTypes.TOPIC)声明交换机名字及类型
通过key的值声明接收不同路由的消息

topic.queue1 消费 rountingkey为“罗隐.*”的消息

topic.queue2 消费 rountingkey为“#.贫女”的消息

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),//定义队列名字 topic.queue1
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"罗隐.*"} //指定消费rountingkey
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),//定义队列名字 topic.queue2
            exchange = @Exchange(name = "murg.topic", type = ExchangeTypes.TOPIC),//指定交换机和交换机类型
            key = {"#.贫女"}//指定消费rountingkey
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }

3.3.3测试类增加测试方法 

    @Test
    public void testTopicExchange() throws InterruptedException {
        String msg = "采得百花成蜜后,为谁辛苦为谁甜";
        String key ="罗隐.蜂";
        String msg2 = "苦恨年年压金线,为他人作嫁衣裳";
        String key2 ="秦韬玉.贫女";
        for (int i=0;i<10;i++){
            if (i % 2 == 0){
                messageProducerService.testTopicExchange(msg,key);
                Thread.sleep(1000);
            }else {
                messageProducerService.testTopicExchange(msg2,key2);
                Thread.sleep(1000);

            }
        }
    }

调用消息生产服务,发送 Topic类型的消息。运行测试控制台输出结果


http://www.kler.cn/a/386549.html

相关文章:

  • VSCode可以安装最新版,并且可以对应Node 12和npm 6
  • 【RabbitMQ】08-延迟消息
  • SpringBoot(十)SpringBoot使用QQ邮箱stmp发送邮件
  • 招聘app开发,人才招聘、求职首要方式
  • Linux系统的网络设置
  • Selenium+Pytest自动化测试框架 ------ 禅道实战
  • Vue Element-UI 选择隐藏表格中的局部字段信息
  • 智慧交通系统:构建高效安全的城市交通网络
  • 2024 CSS保姆级教程四
  • 深度学习:自注意力机制(Self-attention)详解
  • 工作流初始错误 泛微提交流程提示_泛微协同办公平台E-cology8.0版本后台维护手册(11)–系统参数设置
  • 三菱QD77MS定位模块速度限制功能
  • yolov11-cpp-opencv-dnn推理onnx模型
  • LLM - 使用 LLaMA-Factory 微调大模型 环境配置与训练推理 教程 (1)
  • java多线程sleep() 和 wait() 有什么区别?
  • Shiro权限刷新
  • 「C/C++」C/C++标准库 之 #include<cstdlib> 通用工具函数库
  • hive表批量造数据
  • 08、Java学习-面向对象中级:
  • MySQL约束管理
  • 微服务day04
  • 技术速递|.NET 9 中 System.Text.Json 的新增功能
  • Linux命令 - linux索引节点、硬链接、软链接的介绍与使用
  • 一个简单ASP.NET购物车设计
  • 从0开始linux(24)——文件(5)磁盘文件系统
  • SpringBoot3中swagger无法使用