当前位置: 首页 > article >正文

MQ基础:RabbitMQ真面目

同步调用方式,指的是发送方直接发送给接收方的形式。而这种方式在某些情况下可能出现问题,比如当业务逻辑变得复杂,同步的方式需要等待上一条指令被接收后才会继续,对性能的影响很大。

异步的方式,增加了一个消息代理的角色,可以管理、暂存、转发消息。这样消息提供者只需要把消息发送给消息代理,就能够继续进行下一个业务。而消息代理只需要把消息发送给消息接受者,就完成了它的任务。

异步的方式,有很多优点:

  • 能够解除消息发送者和消息接受者之间的耦合关系,拓展性强
  • 消息发送者无需等待消息是否发送给消息接受者,性能好
  • 当消息接受者出现故障时,只需要消息代理把消息发送给修复好的消息接受者就代表完成
  • 当消息短时间突然增多时,消息代理可以缓存消息,削峰填谷,减少服务器的压力

MQ(MessageQueue),中文是消息队列,就是存放消息的队列,是异步调用中的Broke。

目录

安装RabbitMQ

Java客户端

Work模型

交换机

Fanout

Direct

Topic

声明队列和交换机

消息转换器


安装RabbitMQ

基于docker安装:

把mq.tar镜像拷贝到linux中,再读取:

docker load -i mq.tar 

通过一串命令就可以把RabbitMQ启动起来:

docker run \
  -e RABBITMQ_DEFAULT_USER=root \
  -e RABBITMQ_DEFAULT_PASS=123 \
  -v mq-plugins:/plugins \
  --name mq \
  --hostname mq \
  -p 15672:15672 \
  -p 5672:5672 \
  --network demo_network \
  -d \
  rabbitmq:3.8-management

RabbitMQ整体架构:

  • publisher:消息发送者
  • consumer:消息消费者
  • queue:队列,存储消息
  • exchange:交换机,负责路由消息

在这其中,交换机必须要和队列绑定,才能够把消息路由给队列。

数据隔离

在Rabbitmq的网页中,有Virtual host这一栏。这代表着虚拟主机,不同的虚拟主机,数据是不同的,队列什么的也不通用。

想要创建虚拟主机,只需要在Virtual中新建一个就可以了。这样两个虚拟主机数据是分离开的。

Java客户端

由于Rabbitmq官方提供的API、比较复杂,于是我们使用AMQP来处理MQ。

Spring AMQP是基于AMQP协议定义的一套API规范,提供了板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。我们就通过Spring AMQP来完成对于Rabbitmq的应用。

引入AMQP的依赖:

SpringAMQP如何收发消息?

  1. 引入spring-boot-starter-amqp依赖
  2. 配置rabbitmq服务端信息
  3. 利用RabbitTemplate发送消息
  4. 利用@RabbitListener注解声明要监听的队列,监听消息

利用控制台创建队列simole.queue:

在publisher服务中,利用SpringAMQP直接向simple.queue发送消息:

配置好配置文件,

为了方便测试,我们在test中定义好方法:

在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

Work模型

实现一个队列绑定多个消费者:

  1. 在RabbitMQ的控制台创建一个队列,名为work.queue
  2. 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  3. 在consumer服务中定义两个消息监听者,都监听work.queue队列
  4. 消费者1每秒处理50条消息,消费者2每秒处理5条消息

先新建一个队列作为work.queue

此时再在代码中修改两个消息监听者:

默认情况下,多个RabbitListener接收消息是通过轮询的方式,并且分配很均匀。当50条消息分配给两个消费者,会被均匀的分配给这两个消费者。

但是要是消费者1和消费者2的消息处理能力不同,会发生什么呢?

我们让消费者1每隔20ms才接受一次消息,消费者2每隔200ms才接收一次消息:

这显然不符合我们的要求。既然消费者1的性能要好一些,就应该让它多接受一些消息,而不是像现在这样仍然是每个消费者接收25条消息。

消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

这样的分配机制,就充分利用了消费者1的性能,实现“能者多劳”。

交换机

生产环境中都会经过exchange交换机,通过交换机路由到队列。交换机的类型有三种:

  • Fanout  广播
  • Direct    定向
  • Topic     话题

Fanout

这个交换机会把消息广播到每一个跟其绑定的queue。

实现思路:

  1. 在RabbitMQ控制台中,声明队列fanout.queue1和fanout.queue2
  2. 在RabbitMQ控制台中,声明交换机fanout,将两个队列与其绑定
  3. 在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
  4. 在publisher中编写测试方法,向fanout发送消息

测试结果是两个消费者都可以接收到消息

Direct

实际生产环境中,交换机和消费者可能会存在复杂的关系,如果按照Fanout的方式来路由,可能会出现精细度不够的情况。Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

跟Fanout大体上一样,需要在Routing key中绑定key:

代码中,在生产者方只需要指定routingkey就完成了。

Topic

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分割。

并且Topic还支持通配符, # 代表0个或者多个单词, * 代表一个单词。这就意味着Topic可以完成Fanout和Direct的功能,更加精细化的控制交换机。

代码中,也只需要更改routingkey即可完成匹配。

声明队列和交换机

在之前演示的过程中,就已经手动创建了很多个交换机和队列。实际生产环境中,只会比这个更多,所以用代码来创建队列和交换机就显得尤为重要。

SpringAMOP提供了几个类,用来声明队列、交换机及其绑定关系:

  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

fanout是最简单的,如果是direct和topic,都需要在绑定关系中加上routingkey参数:

public Binding directQueue1Binding(Queue directQueue1, DirectExchange directExchange) {
    return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

除了这种方式,还可以用注解的方式。基于@RabbitListener注解来声明队列和交换机的方式:

注意,都是在消费者中创建。

消息转换器

当我们发送的消息是一个Map类型时

接收端接收到的是这样子的:

Payload采用的是JDK的序列化方式,明明msg中只有简单的jack,但是却占用了这么大的空间,并且这样的序列化方式还不安全,容易被篡改。

解决方式就是,我们采用Json序列化。

在publisher和consumer中都引入jackson依赖,并且配置MessageConverter:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter jacksonMessageConverter() {
    return new Jackson2JsonMessageConverter();
}
//消费者生产者都需要配置这个,配置到启动类即可

此处就可以看到消息转换器发挥了作用。


http://www.kler.cn/a/329416.html

相关文章:

  • Spring Boot 3.3.4 升级导致 Logback 之前回滚策略配置不兼容问题解决
  • ARP 表、MAC 表、路由表、跨网段 ARP
  • kubernetes学习-Service(七)
  • Ubuntu 24.04 LTS 安装 tailscale 并访问 SMB共享文件夹
  • Flowable 管理各业务流程:流程设计器 (获取流程模型 XML)、流程部署、启动流程、流程审批、流程挂起和激活、任务分配
  • 25/1/15 嵌入式笔记 初学STM32F108
  • 学习记录:js算法(四十九):二叉树的层序遍历
  • 【AI大模型】深入Transformer架构:编码器部分的实现与解析(上)
  • JavaScript爬虫:数据抓取的艺术与实践
  • 【北京迅为】《STM32MP157开发板嵌入式开发指南》- 第十三章 Linux连接档概念
  • 数学建模运筹优化——规划问题Python版(线性、非线性、整数、0/1)
  • 中九无科研无竞赛保研经验帖——上交软院、中科大计算机、复旦工程硕、南大工程硕、浙大软件
  • 【MySQL】逐一更新数据(字段唯一)-存储过程
  • 《安富莱嵌入式周报》第343期:雷电USB4开源示波器正式发布,卓越的模拟前端低噪便携示波器,自带100W电源的便携智能烙铁,NASA航空航天锂电池设计
  • 西电25考研 VS 24考研专业课大纲变动汇总
  • Oracle EBS中 预算编制与计划 模块的财务流程概览
  • golang web笔记-2.请求request
  • 大表性能优化的关键技术
  • 【Vue】从后端返回数据如何保留文本的格式,包括换行
  • 数据库查询
  • 注册安全分析报告:科研诚信查询平台无验证方式导致安全隐患
  • buuctf [ACTF2020 新生赛]Include
  • 面试题05.08绘制直线问题详解(考察点为位运算符)
  • 软件设计模式概述
  • 面试题:MySQL你用过WITH吗?领免费激活码
  • PHP安装后Apache无法运行的问题