当前位置: 首页 > article >正文

【02基础】- RabbitMQ基础

目录

  • 2- RabbitMQ
    • 2-1 介绍和安装
      • 安装
    • 2-2 RabbitMQ 快速入门
    • 2-3 RabbitMQ 数据隔离
  • 3- Java客户端
    • 3-1 快速入门
      • AMQP
      • 快速入门
      • 📑小结:SpringAMQP如何收发消息?
    • 3-2 WorkQueues 任务模型
      • 案例-使用 WorkQueue 单队列绑定多消费者
      • 📑小结:Work模型的使用
    • 3-3 Fanout 交换机
      • 案例- FanouotExchange 使用
      • 📑小结:交换机的作用是什么?
    • 3-4 Direct 交换机
      • 案例-利用 SpringAMQP 演示 DirectExchange 的使用
      • 📑小结:DirectExchange 的作用是什么?
    • 3-5 Topic 交换机
      • 案例-利用 SpringAMQP 演示 DirectExchange 的适用
      • 📑小结:描述下 Direct 交换机和 Topic 交换机的差异
    • 3-6 声明队列和交换机
      • 声明队列和交换机的方式(Java代码实现)
        • 代码实现
        • 使用注解的方式解决 Direct 交换机问题
      • 📑小结
    • 3-7 消息转换器
      • 案例-利用 SpringAMQP 发送对象类型的消息
      • 问题
  • 4- 使用 MQ 改造支付业务代码
    • 案例

2- RabbitMQ

2-1 介绍和安装

RabbitMQ 的整体架构以及核心概念

  • publisher:消息发送者
  • cunsumer:消息消费者
  • queue:队列
  • exchange:交换机,负责路由消息
  • virtual-host :虚拟主机,起到数据隔离的作用;一个 MQ 中可以创建多个 virtual-host

数据流转的模型①生产者将数据发送给交换机 ——> ②交换机将消息路由给队列 ——> ③消费者监听队列拿到消息

安装

  1. 上传镜像文件 mq.tar 到 Linux 系统中
  2. 执行命令
docker load -i mq.tar
  1. 复制以下代码执行
  • 其中 15672 ,是控制台端口
  • 其中 5672 ,是收发消息的端口
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network hmall \
-d \
rabbitmq:3.8-management

2-2 RabbitMQ 快速入门

需求

  • rabbitmq 的控制台完成下列操作:
  • 新建队列 hello.queue1hello.queue2
  • 向默认的 amp.fanout 交换机发送一条消息
  • 查看消息是否到达 hello.queue1hello.queue2

实现

  • 需要创建队列,同时在交换机中需要绑定队列才能实现消息的路由。

总结规律

①如果交换机和队列没有绑定能否收到消息?

  • 交换机:是负责路由和转发消息的。交换机通过绑定队列,之后可以将消息转发到队列中。

②如果绑定了所有队列是不是所有队列都可以收到消息?

  • 是的,如果一个交换机绑定了多个队列,那类似于广播的效果所有队列都能收到消息。

2-3 RabbitMQ 数据隔离

  • 在 RabbitMQ 中有虚拟主机的概念,对于交换机和队列而言都有二者自己的虚拟主机。

需求

  • 在 RabbitMQ 的控制台下完成下列操作
    • 新建一个用户 hmall
    • 为 hmall 用户创建一个 vitual host
    • 测试不同的 vitual host 之间的数据隔离现象

实现

  • ① 首先在 Admin ——> Users 中创建用户
  • ② 在 Admin ——> Users 中创建虚拟主机

总结

  • 各个虚拟主机下都是相互隔离的。

3- Java客户端

3-1 快速入门

AMQP

什么是 AMQP?

  • Advanced Message Queuing Protocol,是一种高级的消息队列协议,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP

  • Spring AMQP是基于 AMQP 协议定义的一套 API 规范,提供了模板来发送和接收消息。包含两部分,其中Spring AMQP 是基础抽象,spring-rabbit 是底层的默认实现

快速入门

需求

  • 利用控制台创建队列 simple.queue
  • 在 publisher 服务中,利用 SpringAMQP 直接向 simple.queue 发送消息
  • 在 consumer 服务中,利用 SpringAMQP 编写消费者监听 simple.queue 队列

实现

  • ① 引入 spring-amqp依赖,在父工程中引入 spring-amqp 依赖,这样 publisherconsumer 服务都可以使用
<!-- AMQP依赖,包含RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • ② 在每个微服务中引入 MQ 服务端信息,这样微服务才能连接到 RabbitMQ

  • ③ 发送消息: SpringAMQP 提供了 RabbitTemplate 工具类,方便我们发送消息,发送消息的代码如下
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSimpleQueue() {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, spring amqp!";
    // 发送消息
    rabbitTemplate.convertAndSend(queueName, message);
}

  • ④ 接收消息
  • SpringAMQP 提供声明式的消息监听,我们只需要通过 注解 在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给当前方法。
@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        log.info("spring 消息接收到消息: [" + msg + "]");
        if (true) {
            throw new MessageConversionException("故意的");
        }
        log.info("消息处理完毕");
    }
}

📑小结:SpringAMQP如何收发消息?

在这里插入图片描述


3-2 WorkQueues 任务模型

  • Work queues,任务模型。简单来说就是 让多个消费者绑定到一个队列,共同消费队列中的消息。

案例-使用 WorkQueue 单队列绑定多消费者

模拟 WorkQueue 实现一个队列绑定多个消费者,基本思路如下

  • ① 在 RabbitMQ 的控制台创建一个队列,名为 work.queue
  • ② 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到 work.queue
  • ③ 在 consumer 服务中顶级两个消息监听者,都监听 work.queue 队列
  • ④ 消费者每 1 秒处理 50 条消息,消费者每 2 秒处理 5 条消息

实现

  • ① 实现消费者
@Slf4j
@Component
public class MqListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者收到了simple.queue的消息: [" + msg + "]");
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) {
        System.out.println("消费者1 收到了 work.queue的消息: [" + msg + "]");
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) {
        System.err.println("消费者2 收到了 work.queue的消息: [" + msg + "]");
    }
}

  • ② 实现生产者
@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {
        String queueName = "simple.queue";
        String msg = "hello, amqp!";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

    @Test
    void testWorkQueue() throws InterruptedException {
        String queueName = "work.queue";
        for (int i = 1; i <= 50; i++) {
            String msg = "hello, worker, message_" + i;
            rabbitTemplate.convertAndSend(queueName, msg);
            Thread.sleep(20);
        }
    }
}

结果

  • 发现在消费的过程中,两个消费者并没有都消费 50 条消息。
  • 二者消费的过程是采用轮询的方式进行消费。

通过改变消费速度

  • 即便改变了消费的速度,消费的过程中消费者1 和消费者2,也是按照轮询的方式消费任务。
    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) {
        System.out.println("消费者1 收到了 work.queue的消息: [" + msg + "]");
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) {
        System.err.println("消费者2 收到了 work.queue的消息: [" + msg + "]");
        Thread.sleep(20);
    }

📑小结:Work模型的使用

在这里插入图片描述


3-3 Fanout 交换机

Fanout Exchange 交换机会将收到的消息广播到每一个跟其绑定的 queue,所以也叫 广播模式

案例- FanouotExchange 使用

实现思路如下

  • ① 在 RabbitMQ 控制台中,声明队列 fanout.queue1fanout.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.fanout 将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费者方法,分别监听 fanout.queue1**fanout.queue2
  • ④ 在 publisher 中编写测试方法,向 hmall.fanout 发送消息。

Fanout 交换机

  • 消费者
@RabbitListener(queues = "fanout.queue1")
public void listenWorkQueue1(String msg) {
    System.out.println("消费者1 收到了 fanout.queue1的消息: [" + msg + "]");
}

@RabbitListener(queues = "fanout.queue2")
public void listenWorkQueue2(String msg) {
    System.err.println("消费者2 收到了 fanout.queue2的消息: [" + msg + "]");
}
  • 生产者
@Test
void testSendFanout()  {
    String exchangeName = "hmall.fanout";
    String msg = "hello,everyone";
    rebbitTemplate.converAndSend(exchangeName,numm,msg);
}
  • 运行结果

📑小结:交换机的作用是什么?

在这里插入图片描述


3-4 Direct 交换机

Direct Exchange 会将接收到的消息根据路由规则路由到指定的 Queue,因此可以称为定向路由。

  • 每一个 Queue 都与 Exchange 设置一个 Bindingkey
  • 发布者发送消息时,制定消息的 RoutingKey
  • Exchange 将消息路由到 BindingKey 与消息 RoutingKey 一致的队列

案例-利用 SpringAMQP 演示 DirectExchange 的使用

需求如下

  • ① 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.direct,将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费方法,分别监听 direct.queue1 和 direct.queue2
  • ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.direct 发送消息

📑小结:DirectExchange 的作用是什么?

:::info

  • DirectExchange 交换机可以通过 bindingKey 来设置,将消息通过 bindingKey 发送到指定的队列中。通过设置合适的绑定键,您可以确保特定的消息被发送到特定的微服务进行处理。这样可以实现消息的精确路由,确保消息只被需要的消费者接收和处理。

:::


3-5 Topic 交换机

TopicExchange 与 DirectExchange 类似,区别在于 routingKey 可以是多个单词的列表,并且以 . 分割。

  • Queue 与 Exchange 指定 BindingKey 时可以使用通配符
    • #代表 0 个 或多个单词。
    • * :代指一个单词

  • 类似上述实现,如果一个 bindingKey 定义为了 china.# 那么,对于其而言只会接收与 china.#开头的消息。

案例-利用 SpringAMQP 演示 DirectExchange 的适用

需求如下

  • ① 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
  • ② 在 RabbitMQ 控制台中,声明交换机 hmall.topic 将两个队列与其绑定
  • ③ 在 consumer 服务中,编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
  • ④ 在 publisher 中编写测试方法,利用不同的 RoutingKey 向 hmall.topic 发送消息

  • 消费者
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1 收到了 topic.queue1的消息: [" + msg + "] ");
}

@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg) throws InterruptedException {
    System.out.println("消费者2 收到了 topic.queue2的消息: [" + msg + "] ");
}
  • 发送者
    • 以下情况消息会被路由到 **#.news** 的队列中。
@Test
void testSendTopic() {
    String exchangeName = "hmall.topic";
    String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName, "japan.news", msg);
}
  • 发送者2
    • 以下情况两个队列都会收到消息。
@Test
void testSendTopic() {
    String exchangeName = "hmall.topic";
    String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName, "china.news", msg);
}
  • 发送者3
    • 以下情况只有队列1 会受到消息。
@Test
void testSendTopic() {
    String exchangeName = "hmall.topic";
    String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
    rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
}

📑小结:描述下 Direct 交换机和 Topic 交换机的差异

在这里插入图片描述

3-6 声明队列和交换机

  • 使用 Java 代码声明队列交换机才是最靠谱的方式

声明队列和交换机的方式(Java代码实现)

SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系

  • Queue:用于声明队列,可用工厂类 QueueBuilder 创建
  • Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建

代码实现
  • 例如,声明一个 Fanout 类型的交换机,并且创建队列与其绑定
public class FanoutConfiguration {

    @Bean
    public FanoutExchange fanoutExchange(){
        // ExchangeBuilder.fanoutExchange("").build();
        return new FanoutExchange("hmall.fanout2");
    }

    @Bean
    public Queue fanoutQueue3(){
        // QueueBuilder.durable("ff").build();
        return new Queue("fanout.queue3");
    }

    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
    }

    @Bean
    public Queue fanoutQueue4(){
        // QueueBuilder.durable("ff").build();
        return new Queue("fanout.queue4");
    }

    @Bean
    public Binding fanoutBinding4(){
        return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());
    }
}
使用注解的方式解决 Direct 交换机问题
  • 由于 Direct 交换机可以配置对应的 key,但对于声明式方式来说,需要对每个 key 都写一个 binding 方法,这样效率很低,所以引入注解的方式实现
  • SpringAMQP 还提供了基于 @RabbitListener 注解来声明队列和交换机的方式,在 Listener 的部分通过注解实现
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1",durable = "true"),
        exchange = @Exchange(name = "hmall,direct",type = ExchangeTypes,DIRECT),
        key = {"red","blue"}
))
public void listenDirectQueue1(String msg)  throws InterruptedException{
    System.out.println("消费者 1 收到了 direct.queueq 的消息:【"+msg+"】");
}

📑小结

在这里插入图片描述


3-7 消息转换器

案例-利用 SpringAMQP 发送对象类型的消息

  • ① 声明一个队列,名为 object.queue
  • ② 编写单元测试,向队列中直接发送一条消息,消息类型为 Map
  • ③ 在控制台查看消息,总结你能发现的问题
// 准备消息
Map<String,Object> msg = new HashMap<>();
msg.put("name","Jack");
msg.put("age",21);
@Test
void testSendObject() {
    Map<String, Object> msg = new HashMap<>(2);
    msg.put("name", "jack");
    msg.put("age", 21);
    rabbitTemplate.convertAndSend("object.queue", msg);
}

问题

  • Spring 对消息对象的处理是由 org.springframework,amqp.support.converter.MessageConveerter 来处理的。而默认实现是 SimpleMessageConverter,基于 JDK 的 ObjectOutputStream 完成序列化。

存在以下问题

  • JDK 的序列化有安全风险
  • JDK 序列化的消息太大
  • JDK 序列化的消息可读性差

建议采用 JSON 薛丽华代替默认的的 JDK 序列化,要做两件事情:

  • 在 publisher 和 consumer 中都要引入 Jackson 依赖:
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
  • 在 publisher 和 consumer 中都要配置 MessageConverter
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

4- 使用 MQ 改造支付业务代码

案例

  • 需求:改造余额支付功能,不再同步调用交易服务的 OpenFeign 接口,而是采用异步的 MQ 通知交易服务更改订单状态

    1. 业务中引入 AMQP 依赖
<!-- amqp -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 2.配置 MQ 的地址
spring:
  rabbitmq:
    host: 192.168.150.101
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123
    1. 配置 MQ 的 Configure
@Configuration
public class MqConfig {

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}
    1. 编写监听器
@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "mark.order.pay.queue", durable = "true"),
        exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
        key = "pay.success"
    ))
    public void listenOrderPay(Long orderId) {
        // 标记订单状态为已支付
        orderService.markOrderPaySuccess(orderId);
    }
}
    1. 业务异步调用
rabbitTemplate.convertAndSend("pay.topic", "pay.success", po.getBizOrderNo());

http://www.kler.cn/a/375864.html

相关文章:

  • 【漏洞预警】FortiOS 和 FortiProxy 身份认证绕过漏洞(CVE-2024-55591)
  • idea 如何安装 github copilot
  • 《汽车维修技师》是什么级别的期刊?是正规期刊吗?能评职称吗?
  • 联合体(Union)
  • 【PHP】双方接口通信校验服务
  • 设计模式03:行为型设计模式之策略模式的使用情景及其基础Demo
  • 基于51单片机的无线防盗报警器proteus仿真
  • element-plus校验单个form对象合法性
  • ctfshow(89,90)--PHP特性--intval函数
  • 履带式排爆演习训练机器人技术详解
  • 从0学习React(10)
  • opencv优秀文章集合
  • 【系统架构设计师】2024年上半年真题论文: 论大数据lambda架构(包括解题思路和素材)
  • react的antd-mobile使用Steps显示物流
  • YOLOv6-4.0部分代码阅读笔记-dbb_transforms.py
  • ESP-IDF 配置 SimpleFOC 项目
  • 企业如何通过架构蓝图实现数字化转型
  • Unity 游戏性能优化实践:内存管理与帧率提升技巧
  • Android和iOS有什么区别?
  • redis详细教程(4.GEO,bitfield,Stream)
  • 自动驾驶上市潮中,会诞生下一个“英伟达”吗?
  • 基于深度学习的机器人智能控制算法 笔记
  • 【Linux】编辑器vim 与 编译器gcc/g++
  • OpenCV Python 版使用教程(二)摄像头调用
  • 二叉树选择题
  • 11.01学习