当前位置: 首页 > article >正文

RabbitMQ (Java)学习笔记

目录

一、概述

①核心组件

②工作原理

③优势

④应用场景

二、入门

1、docker 安装 MQ

2、Spring AMQP

3、代码实现

pom 依赖

配置RabbitMQ服务端信息

发送消息

接收消息

三、基础

work Queue

案例

消费者消息推送限制(解决消息堆积方案之一)

Work模型的使用

Fanout交换机

Fanout 广播模式

案例

Fanout Exchange交换机的作用是什么?

Direct 交换机


 

一、概述

        RabbitMQ是一种开源的消息代理软件,基于AMQP(高级消息队列协议)实现。它充当消息中间件的角色,允许应用程序通过消息队列进行异步通信。RabbitMQ的主要功能是接收、存储和转发消息,从而解耦应用程序组件,提高系统的可扩展性和可靠性。

①核心组件

  1. 生产者(Producer):负责创建消息并将其发送到RabbitMQ服务器。生产者可以是任何应用程序,它们将消息封装成特定的格式,然后通过网络发送到消息代理。

  2. 交换器(Exchange):交换器是消息传递的枢纽,它接收生产者发送的消息,并根据一定的规则将消息路由到一个或多个队列中。RabbitMQ提供了多种类型的交换器,如直接交换器(direct)、扇形交换器(fanout)、主题交换器(topic)等,每种交换器的路由规则不同。

    • 直接交换器:根据消息的路由键(routing key)直接将消息发送到与该路由键绑定的队列。

    • 扇形交换器:将消息广播到所有绑定的队列,不考虑路由键。

    • 主题交换器:根据路由键的模式匹配规则将消息发送到匹配的队列。

  3. 队列(Queue):队列是存储消息的容器,它是一个先进先出(FIFO)的数据结构。队列中的消息会被消费者(Consumer)依次消费。队列可以持久化,即使RabbitMQ服务器重启,队列中的消息也不会丢失。

  4. 消费者(Consumer):消费者从队列中获取消息并进行处理。消费者可以是应用程序的另一个组件,也可以是独立的进程。消费者通过订阅队列来接收消息,一旦队列中有消息到达,消费者就会按照一定的策略(如轮询、负载均衡等)进行消费。

②工作原理

  1. 消息发送过程

    • 生产者创建消息,并指定交换器和路由键。

    • 生产者将消息发送到RabbitMQ服务器。

    • 交换器根据路由键将消息路由到一个或多个队列。

    • 如果队列不存在,交换器会根据配置决定是否丢弃消息或返回错误。

  2. 消息接收过程

    • 消费者向RabbitMQ服务器发送订阅请求,指定要消费的队列。

    • 当队列中有消息到达时,RabbitMQ服务器将消息推送给消费者。

    • 消费者接收到消息后进行处理,处理完成后向服务器发送确认消息(ack),告知服务器该消息已被成功消费。如果消费者在处理消息过程中失败或崩溃,RabbitMQ服务器会将消息重新放入队列,等待其他消费者消费。

③优势

  1. 高可靠性:RabbitMQ支持消息持久化,即使服务器出现故障,消息也不会丢失。同时,它还支持镜像队列,可以在多个节点之间复制队列,提高系统的可用性。

  2. 高可用性:RabbitMQ可以部署在多个节点上,形成集群。集群中的节点可以相互备份,当某个节点出现故障时,其他节点可以接管其工作,保证系统的正常运行。

  3. 灵活的路由策略:通过不同类型的交换器和路由键,可以实现复杂的消息路由逻辑,满足各种业务场景的需求。

  4. 易于扩展:RabbitMQ支持水平扩展,可以通过增加节点来提高系统的处理能力。同时,它还支持多种编程语言的客户端库,方便开发者进行集成。

④应用场景

  1. 异步任务处理:当应用程序需要执行耗时的任务时,可以将任务封装成消息发送到RabbitMQ,然后由消费者在后台进行处理,从而提高系统的响应速度。

  2. 服务间通信:在微服务架构中,不同的服务可以通过RabbitMQ进行通信,实现服务的解耦和异步交互。

  3. 事件驱动架构:RabbitMQ可以作为事件总线,将事件消息传递给不同的消费者,实现事件驱动的业务逻辑。

  4. 日志收集:将日志消息发送到RabbitMQ,然后由专门的日志处理程序进行消费和分析,实现日志的集中管理和分析。

二、入门

        MQ技术选型参考图如下:

1、docker 安装 MQ

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \     # MQ 默认登录用户
 -e RABBITMQ_DEFAULT_PASS=123321 \      # MQ 默认登录用户的密码
 -v mq-plugins:/plugins \               # 默认挂载
 --name mq \                            # 容器名
 --hostname mq \                        # 主机名
 -p 15672:15672 \                       # MQ 访问图像化页面端口
 -p 5672:5672 \                         # MQ 通信端口
 -d \                        
 rabbitmq:3.8-management

2、Spring AMQP

SpringAmqp的官方地址 :SpringAMQP官方网址

        

3、代码实现

pom 依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置RabbitMQ服务端信息

在每个微服务中引入MQ服务端信息,这样微服务才能连接到RabbitMQ

spring:
  rabbitmq:
    host: localhost # 主机名
    port: 5672          # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall  # 用户名
    password: 123   # 密码

发送消息

SpringAMQP提供了RabbitTemplate工具类,方便我们发送消息。样例发送消息代码如下

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testSendMessage2Queue() {

//        队列名称
        String queueName = "simple.queue";
//        消息
        String message = "Hello,Spring amqp !";
//        发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

接收消息

SpringAMQP提供声明式的消息监听,我们只需要通过注解在方法上声明要监听的队列名称,将来SpringAMQP就会把消息传递给当前方法:

@Slf4j
@Component
public class mqListener {

    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueue(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串
        log.info("消费者收到了Simple.queue的消息:【{}】",msg);

    }
}

三、基础

work Queue

        Workqueues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

案例

        模拟WorkQueue,实现一个队列绑定多个消费者

        基本思路如下:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

通过Thread.sleep 方式 模拟性能不同的时候接受到的数据

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串
        System.out.println("消费者1 收到了 work.queue 的消息:【" + msg + "】");
        Thread.sleep(20);
    }

    @RabbitListener(queues = "work.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串
        System.err.println("消费者2 收到了 work.queue 的消息........:【" + msg + "】");
        Thread.sleep(200);
    }

最后发现,消费者接受到的消息是默认的一人一半效果,而处理消息方面可以显示出效率比较低下

消费者消息推送限制(解决消息堆积方案之一)

        默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1   # 每次只能获取一条消息,处理完成才能获取下一个消息

我们重新发送消息可以发现

这样我们可以充分消费每一条消息,能者多劳效果。

Work模型的使用

  • 多个消费者绑定到一个队列,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout : 广播
  • Direct : 定向
  • Topic : 话题

Fanout 广播模式

Fanout Exchange 会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

通过 exchange 广播消息给队列,多个消费者 处理绑定的一个队列中的不同订单消息,并行操作,提高效率。

案例

利用SpringAMQP演示FanoutExchange的使用
实现思路如下:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMO控制台中,声明交换机hmall.fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向hmall.fanout发送消息.

消费者方法

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串
        System.out.println("消费者1 收到了 fanout.queue1 的消息:【{"+ msg + "}】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg){     // 注意这里接受的参数类型,取决于队列中发送者发送的消息类型,因为我们发送者发送字符串,所以这里接受的参数类型也是字符串
        System.err.println("消费者2 收到了 fanout.queue2 的消息:【{" + msg + "}】");
    }

发送消息方法

    @Test
    void testSendFanout() {

//        队列名称
        String exchangeName = "hmall.fanout";
//        消息
        String message = "Hello,everyone!!!";
//        发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
//      rabbitTemplate.convertAndSend(exchangeName, null, message);  // 或者 第二个参数为 null

    }

Fanout Exchange交换机的作用是什么?

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • Fanout Exchange的会将消息路由到每个绑定的队列

Direct 交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

---------------------------------------------- 持续更新中----------------------------------------------------------


http://www.kler.cn/a/585614.html

相关文章:

  • 高安全可靠MCU芯片AS32X601应用解析
  • 【北上广深杭大厂AI算法面试题】计算机视觉篇...YOLOv4 相较于YOLOv3有哪些改进?速度更快还是更慢,为什么?(二)
  • 关于Django框架的面试题及其解析
  • 嵌入式2-按键
  • Python网络爬虫之BeautifulSoup库的使用流程和方法
  • 力扣hot100_二叉树(5)_python版本
  • 实验5 逻辑回归
  • 梯度下降法以及随机梯度下降法
  • 作业9 (2023-05-05 数组的定义和初始化)
  • 富文本编辑器(Rich Text Editor,RTE)
  • 矩阵交换行(信息学奥赛一本通-1119)
  • 基于NXP+FPGA永磁同步电机牵引控制单元(单板结构/机箱结构)
  • CMU15445(2023fall) Project #2 - Extendible Hash Index 匠心分析
  • 【搭建环境】windows环境redis\集群;rabbitmq
  • 在 Spring Boot 中实现基于 TraceId 的日志链路追踪
  • 编程自学指南:java程序设计开发,Java I/O流,为什么需要I/O流?,Java I/O体系结构,字节流,字符流,对象流与序列化
  • MATLAB 控制系统设计与仿真 - 25
  • 突破连接边界!O9201PM Wi-Fi 6 + 蓝牙 5.4 模块重新定义笔记本无线体验
  • 宇树与智元的崛起:机器人“灵魂”注入的技术密码
  • 电脑热点无法打开