RabbitMQ介绍与使用
RabbitMQ官网
RabbitMQ 介绍
RabbitMQ 是一个开源的消息代理和队列服务器,基于 AMQP(高级消息队列协议)标准,使用 Erlang 编程语言构建。它是消息队列(MQ)的一种,广泛应用于分布式系统中,用于实现应用程序之间的异步消息传递。RabbitMQ 具有高可靠性、易扩展、高可用和功能丰富的特点,支持多种编程语言客户端,如 Java、Python、Ruby、C# 等。
RabbitMQ 的核心概念
- Producer(生产者):消息的生产者,负责将消息发送到 RabbitMQ 中的 Exchange。
- Consumer(消费者):消息的消费者,负责从队列中获取并处理消息。
- Connection:生产者/消费者和 Broker 之间的 TCP 连接。
- Channel:在 Connection 内部建立的逻辑连接,用于减少操作系统建立 TCP 连接的开销。
- Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
- Virtual Host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中。
- Exchange:消息到达 Broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的类型有 direct、topic 和 fanout。
- Queue:消息最终被送到这里等待消费者取走。
RabbitMq的交换机类型
1. Direct Exchange
- 描述:Direct 交换机是最简单的交换机类型。它根据消息的 routing key 将消息路由到一个特定的队列。如果队列的 binding key 与消息的 routing key 完全匹配,则消息会被路由到该队列。
- 特点:
- 一对一匹配:消息的 routing key 必须与队列的 binding key 完全相同。
- 简单直接:适用于一对一的消息传递场景。
- 示例:
- 生产者发送消息时指定 routing key 为
info
。 - 队列 A 绑定到交换机时,binding key 也为
info
。 - 消息将被路由到队列 A。
- 生产者发送消息时指定 routing key 为
2. Topic Exchange
- 描述:Topic 交换机允许更复杂的路由模式。消息的 routing key 和队列的 binding key 可以包含通配符,从而实现更灵活的路由规则。
- 特点:
- 模式匹配:支持通配符
*
(匹配一个单词)和#
(匹配多个单词)。 - 灵活多变:适用于多对多的消息传递场景,可以实现复杂的路由逻辑。
- 模式匹配:支持通配符
- 示例:
- 生产者发送消息时指定 routing key 为
user.info
。 - 队列 A 绑定到交换机时,binding key 为
user.*
。 - 队列 B 绑定到交换机时,binding key 为
*.info
。 - 消息将被路由到队列 A 和队列 B。
- 生产者发送消息时指定 routing key 为
3. Fanout Exchange
- 描述:Fanout 交换机是最简单的广播交换机。它不关心消息的 routing key,将消息广播到所有绑定到该交换机的队列。
- 特点:
- 广播消息:消息会被发送到所有绑定的队列,无论队列的 binding key 是什么。
- 简单高效:适用于需要将消息广播到多个消费者的情况。
- 示例:
- 生产者发送消息时,不指定 routing key。
- 队列 A、队列 B 和队列 C 都绑定到该交换机。
- 消息将被路由到队列 A、队列 B 和队列 C。
RabbitMQ 的主要特点
- 可靠性:使用消息确认机制,确保消息的可靠传递。生产者在发送消息后会收到一个确认,消费者在处理完消息后会发送一个确认。如果消息发送或处理失败,RabbitMQ 会重新发送消息,直到确认为止。
- 灵活性:支持多种消息传递模式,包括点对点、发布/订阅和消息路由等。
- 可扩展性:可以通过添加更多的节点来实现水平扩展,以处理更大的消息负载。它还支持集群和镜像队列,提供高可用性和负载均衡。
- 多语言支持:提供了多种编程语言的客户端库,包括 Java、Python、Ruby、C# 等。
MQ选型对比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
可用性 | 高,基于主从架构实现高可用 | 高,基于主从架构实现高可用 | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
单机吞吐量 | 万级 | 万级 | 十万级 | 十万级以上 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级以内 |
消息可靠性 | 较高,基本不丢 | 较低,有丢大概率 | 经过参数优化配置,可以做到 0 丢失 | 经过参数优化配置,可以做到 0 丢失 |
RabbitMQ安装
环境:Centos7.9,基于docker安装
1.使用docker run命令创建容器并安装mq
docker run \
-e RABBITMQ_DEFAULT_USER=mqadmin \
-e RABBITMQ_DEFAULT_PASS=mqadmin \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network mq-net\
-d \
rabbitmq:3.8-management
2.开放端口,或关闭防火墙(如果访问不了)
方法1:开放端口
#1.开放mq端口
firewall-cmd --zone=public --add-port=15672/tcp --add-port=5672/tcp --permanent
#2.重新加载防火墙配置
firewall-cmd --reload
方法2:临时关闭防火墙
systemctl stop firewalld
3.访问RabbitMQ控制台并登录
账号密码就是创建mq容器时指定的RABBITMQ_DEFAULT_USER和RABBITMQ_DEFAULT_PASS
访问IP地址:主机ip:15672
RabbitMQ控制台使用
1.收发消息
1.1创建消息队列
1.2创建一个交换机
1.3讲交换机与队列绑定
1.4发送消息
1.5查看消息
2.数据隔离
当我们只部署了一个mq的话,当多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。
实现步骤:
1.在我们的用户管理创建一个新用户
可以看到我们的用户创建成功,但是刚创建的用户是没有虚拟主机的
2.登录新创建的用户,配置虚拟主机
我们可以通过右上角选择自己的虚拟主机
可以看到,在我们选择我们当前用户的虚拟机主机之后,就看不到我们之前用/创建的队列了
SpringAMQP使用
将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ
采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ
交互。并且RabbitMQ
官方也提供了各种不同语言的客户端。
但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。
Spring AMQPSpringAmqp的官方地址:Spring AMQP
SpringAMQP提供了三个功能:
-
自动声明队列、交换机及其绑定关系
-
基于注解的监听器模式,异步接收消息
-
封装了RabbitTemplate工具,用于发送消息
1.创建一个Maven的mqDemo项目
2.创建两个子模块publisher(消息的发送者)、consumer(消息的消费者)
3.在父模块的pom.xml中导入以下配置:
<groupId>cn.mq.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.12</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
4.在子模块pom.xml中分别导入以下配置:
publisher
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.mq.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>publisher</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
consumer
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.mq.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
5.在两个子模块的application.yaml中加入以下配置:
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 192.168.181.32 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /test # 虚拟主机
username: testuser # 用户名
password: testuser # 密码
6.创建交换机、队列,并监听消息
方式1基于配置类创建交换机、队列并绑定:
在consumer下创建一个configuration类
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange() {
//创建交换机
return new FanoutExchange("test.fanout");
}
@Bean
public Queue fanoutQueue1() {
//创建队列
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
//创建队列
return new Queue("fanout.queue2");
}
@Bean
public Binding binDingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
//将队列与交换机进行绑定
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding binDingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
//将队列与交换机进行绑定
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
然后创建一个监听类,监听消息
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")//监听的队列
public void listenerFanoutQueue1(String message) throws InterruptedException {
System.out.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());
}
@RabbitListener(queues = "fanout.queue2")//监听的队列
public void listenerFanoutQueue2(String message) throws InterruptedException {
System.out.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());
}
}
方式2基于注解创建交换机、队列并绑定:
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "fanout.quque1"),
exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)
))
public void listenerFanoutQueue1(String message) throws InterruptedException {
System.err.println("消费者1接收到test.fanout消息:" + message + "," + LocalTime.now());
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanout.queue1"),
exchange = @Exchange(name = "test.fanout", type = ExchangeTypes.FANOUT)
))
public void listenerFanoutQueue2(String message) throws InterruptedException {
System.err.println("消费者2接收到test.fanout消息:" + message + "," + LocalTime.now());
}
}
启动ConsumerApplication类,查看rabbitmq控制台查看是否已经创建交换机和队列成功,并且正确绑定(上面的方式实现一种即可)
6.发送消息
在publisher创建测试类发送消息
@Slf4j
@SpringBootTest
class SpringAmqpTest {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testFanoutQueue() {
String exchangeName = "test.fanout";//交换机名称
String message = "hello,everyone!";//发送的消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
}
8.测试
1.运行ConsumerApplication启动类,保持运行状态
2.运行testFanoutQueue测试类的方法
3.查看控制台输出
正确接收到消息!