RabbitMQ简单应用
概念
RabbitMQ 是一种流行的开源消息代理(Message Broker)软件,它实现了高级消息队列协议(AMQP - Advanced Message Queuing Protocol)。RabbitMQ 通过高效的消息传递机制,主要应用于分布式系统中解耦应用组件、异步消息发送、流量削峰等场景,可提高系统扩展性和稳定性。
RabbitMQ 的核心功能
-
消息队列
RabbitMQ 提供消息队列功能,用于存储和转发消息,确保生产者和消费者之间的解耦。 -
可靠性
RabbitMQ 支持消息持久化、确认机制和高可用性集群,保证消息在传递过程中的可靠性。 -
灵活的路由
借助交换机(Exchange)机制,RabbitMQ 支持灵活的消息路由规则,包括广播(fanout)、直连(direct)、主题(topic)等。 -
消息确认机制
消费者需要确认消息已被成功消费,未确认的消息可以重新投递(避免消息丢失)。 -
扩展性和高可用性
RabbitMQ 支持集群部署,可根据负载动态扩展,同时提供镜像队列功能,实现高可用性。 -
插件机制
RabbitMQ 支持丰富的插件,用于监控、身份验证、消息追踪等扩展功能。
RabbitMQ 的核心概念
-
消息(Message): RabbitMQ 处理的最小单位,包含消息头和消息体。 消息头描述消息的属性(如优先级、过期时间等),消息体是实际的数据内容。
-
生产者(Producer): 发送消息到 RabbitMQ 的应用程序。
-
消费者(Consumer): 从 RabbitMQ 中接收并处理消息的应用程序。
-
队列(Queue): 存储消息的容器,遵循先进先出(FIFO)规则。消息只能存储到队列中,消费者从队列中取出消息进行处理。
-
交换机(Exchange): 用于接收生产者发送的消息,并根据绑定规则将消息路由到队列。 常见交换机类型:
direct
:根据精确匹配的路由键转发消息。fanout
:将消息广播到所有绑定的队列。topic
:按模式匹配的路由键转发消息。headers
:根据消息头的属性匹配路由。 -
绑定(Binding): 交换机与队列之间的关系,指定消息如何从交换机路由到队列。
-
路由键(Routing Key): 用于匹配交换机和队列的绑定规则。
-
虚拟主机(Virtual Host,vhost): 类似于一个命名空间,用于隔离队列、交换机等资源。
-
连接和通道(Connection & Channel): 生产者和消费者通过连接与 RabbitMQ 交互,每个连接可包含多个通道,通道是实际读写消息的通信路径。
-
ACK 确认机制: 生产者和消费者可确认消息是否成功投递或处理。确认机制分为:
生产者确认
:确保消息发送到队列。消费者确认
:确保消息成功处理。
RabbitMQ 的工作流程
-
生产者发送消息到交换机
生产者通过指定交换机和路由键发送消息。 -
交换机将消息路由到队列
根据绑定规则,交换机将消息路由到一个或多个队列。 -
消费者从队列接收消息
消费者监听队列,并从队列中取出消息进行处理。 -
消息确认
消费者处理完成后,向 RabbitMQ 发送确认,RabbitMQ 删除该消息。若消费者未确认,RabbitMQ 可重新投递消息。
RabbitMQ 的应用场景
-
解耦微服务
RabbitMQ 在分布式架构中充当消息桥梁,避免服务之间的直接依赖。 -
异步任务处理
将耗时的任务放入队列,消费者后台处理,提升系统响应速度。 -
日志收集
使用 RabbitMQ 作为日志消息的中间件,集中处理和分析日志数据。 -
分布式系统的负载均衡
RabbitMQ 可将消息分发给多个消费者,实现任务的均衡处理。 -
实时消息推送
支持高并发的实时消息推送场景,如在线聊天、通知系统。
RabbitMQ 的优缺点
优点:
- 支持多种协议(AMQP、STOMP、MQTT 等)。
- 功能强大,支持复杂的消息路由。
- 高可靠性,支持持久化和集群模式。
- 插件机制丰富,便于扩展。
- 广泛支持多种编程语言(Java、Python、Go 等)。
缺点:
- 性能较 ActiveMQ、Kafka 稍低,不适合大数据流场景。
- 配置复杂,需要一定的学习成本。
- 占用资源较高,尤其是大量队列和消息积压时。
RabbitMQ 是一个功能强大、易用的消息中间件,适合需要可靠消息传递、灵活路由和高可用性的场景。通过其简单直观的架构,开发者可以轻松实现消息解耦、异步处理和分布式通信功能,从而大大提高系统的可扩展性和可靠性。
接下来通过java代码展示其简单应用。关于rabbitmq服务的安装,请参考linux安装rabbitmq
创建连接
创建一个maven项目,在
pom
添加如下依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.21.0</version>
</dependency>
<!--
导入slf4j相关,为解决控制台出现如下信息:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version> <!-- 或使用其他版本 -->
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.6</version> <!-- 或使用其他版本 -->
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.6</version>
</dependency>
使用rabbitmq的连接工厂,来创建对
rabbitmq-server
的连接:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 连接工具,建立与RabbitMQ服务的连接
*
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址,也就是安装rabbitmq的服务器ip
factory.setHost("192.168.137.200");
//端口
factory.setPort(5672);
//设置虚拟机名称,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq,这里使用默认虚拟机
factory.setVirtualHost("/");
//设置用户名
factory.setUsername("admin");
//设置密码
factory.setPassword("admin");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
虚拟机信息在管理控制台页面中如下
在Admin
页签的Virtual Host
一栏中,Name
即为虚拟机名字(‘’/" 为rabbitmq的默认虚拟机),Users
为该虚拟机中的用户:
想要添加新的虚拟机,可以通过上图中的Add virtual host
按钮进行添加
用户信息在管理控制台页面中如下
在Admin
页签的Users
一栏中,想要添加新的用户,可以通过下图中的Add user
按钮进行添加,用户可分配的权限为Admin、Monitoring、Policymaker、Management、Impersonator、None:
Admin (管理员权限)
赋予用户完全的管理权限,可以执行几乎所有操作,适用于需要对 RabbitMQ 系统进行全面管理和配置的用户。
包括的权限:
- 创建、删除、管理队列、交换机、绑定和虚拟主机。
- 配置和管理用户权限和角色。
- 配置 RabbitMQ 集群、插件和策略。
- 查看和修改 RabbitMQ 的所有设置。
Monitoring (监控权限)
允许用户查看 RabbitMQ 的监控信息,但不能进行任何修改操作。适用于需要查看 RabbitMQ 系统运行状况、但不需要做出修改的用户(如运维人员、监控人员)
包括的权限:
- 查看队列、交换机、连接、通道的状态信息。
- 查看消息流、消息队列的深度和消费者等监控数据。
- 查看系统的资源使用情况(如内存、磁盘、CPU 使用等)。
Policymaker (策略管理权限)
允许用户管理 RabbitMQ 中的策略,适用于负责 RabbitMQ 策略配置(如队列策略、镜像策略等)的用户。
包括的权限:
- 创建、删除和修改虚拟主机的策略。
- 配置消息队列的生命周期、镜像策略、磁盘空间限制等。
- 不允许进行其他管理操作,如修改队列、交换机、绑定等。
Management (管理界面权限)
允许用户访问和使用 RabbitMQ 的管理控制台,查看系统状态和配置信息,但不包括修改操作。适用于需要监控和查看 RabbitMQ 系统状态,但不需要对系统做修改的用户。
包括的权限:
- 访问 RabbitMQ 的管理界面。
- 查看管理控制台的所有信息(如队列、交换机、连接、用户等)。
- 不允许执行创建、删除、修改等操作。
Impersonator (伪装权限)
允许用户以其他用户的身份执行操作,但不具有实际的权限修改能力。适用于需要代替其他用户执行操作或进行调试的用户。
包括的权限:
- 可以 "伪装" 成为其他用户,从而以该用户的权限来执行操作。通常,用于临时授予某些操作权限。
- 这种权限通常用于管理审计或系统调试。
None (无权限)
此权限不授予任何权限,适用于不需要访问 RabbitMQ 系统的用户,或者是仅用作某些临时操作的用户。
包括的权限:
- 不允许用户访问管理页面,执行任何操作。
- 该用户几乎不具有任何权限,不能进行查看或修改操作。
简单模式
RabbitMQ 的简单模式(Simple模式) 是消息队列的一种基本模式,该模式对应一个生产者与一个消费者。在简单模式下,消息生产者将消息发送到队列中,然后由消费者从队列中取出消息进行处理。
简单模式的基本概念
- 生产者(Producer):负责发送消息的应用程序或服务。生产者将消息发送到指定的队列中。
- 队列(Queue):消息的存储区域。队列在 RabbitMQ 服务器上,生产者将消息发送到队列,消费者从队列中获取消息。
- 消费者(Consumer):负责接收和处理消息的应用程序或服务。消费者从队列中获取消息进行处理。
- RabbitMQ 服务器:负责管理队列并协调消息的发送和接收。
简单模式的工作流程如下:
- 生产者连接到 RabbitMQ 服务器。
- 创建通道:生产者通过连接创建一个通道,用于声明队列及发布消息。
- 声明队列:生产者在发送消息之前,会利用通道声明一个队列。声明队列的作用是确保队列存在,如果队列不存在会创建队列;如果队列已存在,则跳过创建。
- 生产者发送消息到队列:生产者使用
basicPublish
方法将消息发送到指定的队列中。 - 消费者连接到 RabbitMQ 服务器。
- 消费者从队列中获取消息:消费者从指定队列中接收消息,进行消费处理。
- 确认消息(ACK):在消费完成后,消费者会发送确认信号(ACK),告知 RabbitMQ 消息已经处理完毕。这样 RabbitMQ 可以将消息从队列中删除。ACK可设置自动回复或手动回复。
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class SimpleProducer {
//队列名称
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息内容
String message = "Hello World";
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者代码
消费者连接rabbitmq后,依然需要声明队列,因为需要确保队列的存在
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class SimpleConsumer {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先运行生产者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框Messages
部分即代表队列中的消息, Ready
、Unacked
和 Total
的含义如下:
-
Ready:表示队列中已经准备好可以被消费者消费的消息数量。即这些消息还没有被任何消费者接收。
-
Unacked(未确认的消息):表示已经被消费者接收但还没有被确认(acknowledged)的消息数量。这意味着这些消息被消费者消费后,还未发送确认,因此 RabbitMQ 会等待消费者确认消息处理完成。如果消费者断开连接或未确认,RabbitMQ 会将这些消息重新放回队列中,以便被其他消费者重新消费。
-
Total:表示队列中的消息总数,是
Ready
和Unacked
两者的总和。
图中:
Ready
为 1,表示有 1 条消息在队列中等待被消费。Unacked
为 0,表示没有消息被消费者接收且未确认。Total
为 1,表示队列中的消息总数为 1。
再运行消费者主方法,运行后查看管理页面的
Queues and Streams
页签:
红框Messages
部分都为0,代表消息都已被消费
消费者代码控制台输出:
接收消息: Hello World!
工作模式
在 RabbitMQ 的工作队列模式(Work Queue / Task Queue)中,一个生产者会对应多个消费者。
消息分发给多个消费者的方式主要有两种:轮询分配 和 公平分配。
轮询分配(Round-robin Dispatching)
原理:
- 默认情况下,RabbitMQ 将消息以 轮询的方式 均匀地分发给所有消费者,消息的分配模式是一个消费者分配一条,直至消息消费完成。
- 每个消费者都会轮流收到消息,而不会考虑消费者当前的工作负载。
特点:
- 无视消费者处理能力:
RabbitMQ 不会关心某个消费者是否已经忙碌或是否处理得更快,而是严格地轮流发送消息。 - 简单高效:
实现方式简单,但在消费者性能不均的情况下,可能导致某些消费者负载过高或过低。
公平分配(Fair Dispatching)
原理:
- 公平分配遵循能者多劳的原则,核心是基于 消费者的繁忙程度 分发消息。
- RabbitMQ 通过 消息确认(ACK)机制 来检测消费者是否空闲。
- 如果消费者在当前未完成上一个任务,则不会分配新的任务给该消费者。
特点:
-
消费者负载感知:
RabbitMQ 根据消费者的负载情况分发消息,而不是简单地轮流发送,这种方式确保消息只发送到空闲的消费者,避免让忙碌的消费者承担额外的负担。 -
消息确认机制(ACK):
消费者需要显式地向 RabbitMQ 确认(ACK)已成功处理一条消息。
未确认的消息(比如因消费者挂掉或处理时间过长)会重新投递到其他消费者,确保消息不会丢失。
-
基于 QoS 的限流控制:
使用
basicQos
参数(如basicQos(1)
)限制 RabbitMQ 在未收到消费者确认时不发送新的消息。
轮询发送
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量是否一致
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class WorkProducer {
//队列名称
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//循环发送10个消息
for (int i = 0; i < 10; i++) {
// 消息内容
String message = "工作模式消息-Hello World-" + i;
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class WorkConsumerOne {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("工作模式消费者1接收消息: " + msg + "!");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class WorkConsumerTwo {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("工作模式消费者2接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动两个消费者,最后启动生产者
消费者一控制台输出
工作模式消费者1接收消息: 工作模式消息-Hello World-0!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
工作模式消费者1接收消息: 工作模式消息-Hello World-4!
工作模式消费者1接收消息: 工作模式消息-Hello World-6!
工作模式消费者1接收消息: 工作模式消息-Hello World-8!
消费者二控制台输出
工作模式消费者2接收消息: 工作模式消息-Hello World-1!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
这里看到,生产者一共发送了10条消息到队列中,即便消费者一添加了线程阻塞方法来延缓执行,两个消费者接收到的消息数量依然相同。
公平分发
启用公平分配的设置:
要实现公平分配,需要修改以下两个参数:
-
basicQos
参数:- 用于限制 RabbitMQ 在消费者未确认消息时,不会发送新的消息。
- 设置为
basicQos(1)
表示每次只分发一条消息,消费者处理并确认(ACK)后,才会继续分发下一条消息。
-
消息确认(Manual ACK):
- 需要将消费者改为 手动确认模式。
- 当消费者处理完消息后,手动发送一个 ACK 来告诉 RabbitMQ,消息已经处理完成。
- 如果消息没有确认(如消费者挂掉),RabbitMQ 会重新将消息发送给其他消费者。
创建一个生产者,两个消费者来演示轮询发送效果:
- 每个消费者都添加
channel.basicQos(1)
,来保证每次只接收一条消息,用于演示轮询效果- 每个消费者都添加
channel.basicAck(envelope.getDeliveryTag(),false)
实现手动确认- 消费者一添加线程阻塞1秒,消费者二不添加,观察二者得到的消息数量
生产者代码
生产者代码与轮询模式的生产者相同
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class WorkProducer {
//队列名称
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//循环发送10个消息
for (int i = 0; i < 10; i++) {
// 消息内容
String message = "工作模式消息-Hello World-" + i;
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,工作模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,工作模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者一
在消费方法中加入Thread.sleep(1000)
,让消费者一的消息处理变慢。
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class PubWorkConsumerOne {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("工作模式消费者1接收消息: " + msg + "!");
//手动返回ack
channel.basicAck(envelope.getDeliveryTag(),false);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
消费者二
无线程阻塞,正常处理消息,以观察处理结果
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class PubWorkConsumerTwo {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("工作模式消费者2接收消息: " + msg + "!");
//手动返回ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, false, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动两个消费者,最后启动生产者
消费者一
工作模式消费者1接收消息: 工作模式消息-Hello World-1!
工作模式消费者1接收消息: 工作模式消息-Hello World-2!
消费者二
工作模式消费者2接收消息: 工作模式消息-Hello World-0!
工作模式消费者2接收消息: 工作模式消息-Hello World-3!
工作模式消费者2接收消息: 工作模式消息-Hello World-4!
工作模式消费者2接收消息: 工作模式消息-Hello World-5!
工作模式消费者2接收消息: 工作模式消息-Hello World-6!
工作模式消费者2接收消息: 工作模式消息-Hello World-7!
工作模式消费者2接收消息: 工作模式消息-Hello World-8!
工作模式消费者2接收消息: 工作模式消息-Hello World-9!
由于消费者一中存在线程阻塞,消费者二没有,消费者二处理更快。所以根据能者多劳原则,消费者二会处理更多的消息。
两者对比
轮询与公平分发对比
特性 | 轮询分配 | 公平分配 |
---|---|---|
分配机制 | 严格轮流,无视消费者负载 | 根据消费者工作量分配 |
消息确认机制 | 可选(默认自动 ACK) | 必须手动确认 |
适用场景 | 消费者处理能力相当的场景 | 消费者处理能力差异大的场景 |
优点 | 实现简单,消息均匀分配 | 分配更智能,避免过载 |
缺点 | 消费者容易过载或空闲 | 稍微复杂,需要手动 ACK |
- 轮询分配 适用于简单任务,且消费者负载相近的场景。
- 公平分配 适用于任务复杂度不同、消费者能力差异较大的场景,是生产中更常见的做法,因为它可以更好地利用系统资源并避免消息堆积。
广播模式
RabbitMQ 的 广播模式 是一种特殊的消息分发模式,使用 Fanout Exchange(扇形交换机) 实现。它可以将消息广播到所有绑定到该交换机的队列中,所有消费者都会接收到消息。
注意:广播模式下是一个消费者对应一个队列(如上图),并通过一个交换机将消息分发给多个绑定的队列来实现广播
工作机制
- 交换机类型:
fanout
(扇形交换机)。 - 消息分发规则:
- 扇形交换机忽略路由键(Routing Key),不关心消息的具体内容。
- 绑定到交换机的所有队列都能接收到消息,进而将消息分发给队列的消费者,无论绑定时是否指定了路由键。
- 消息流程:
- 生产者:发送消息到 Fanout Exchange。
- 交换机:将消息复制并广播到绑定的所有队列中。
- 消费者:从对应队列中获取消息进行消费。
特性
- 无条件广播:所有绑定到交换机的队列都能接收消息,队列对应的消费者都会消费消息。
- 路由键无效:Fanout Exchange 不会检查或使用路由键。
- 动态绑定:队列可以在交换机创建后动态绑定或解绑。
场景举例
- 群发通知:多个消费者需要同时收到一条通知,比如发布新闻、推送更新等。
- 日志处理:多个系统模块需要接收相同的日志信息以进行分析或处理。
- 实时监控:比如系统状态的实时监控,需要广播到多个模块进行处理。
优缺点
优点
- 高效广播:生产者只需发送一次消息,交换机负责广播,降低了生产者的复杂性。
- 解耦设计:生产者无需知道消费者的具体信息,消费者动态绑定队列即可。
- 支持多消费者:一个消息可以被多个消费者消费。
缺点
- 所有绑定队列都接收消息:无选择性,可能导致某些消费者接收到不需要的消息。
- 消息积压风险:如果某个队列的消费者处理速度较慢,可能导致队列堆积。
特点总结
- Fanout Exchange 忽略路由键,直接广播消息。
- 消息广播给所有绑定队列,支持多个消费者消费相同消息。
- 常用于群发通知、日志处理等场景。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class FanoutProducer {
//交换机名称
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) {
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
// 消息内容
String message = "广播模式消息-Hello World";
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,广播模式下不使用路由键,会把消息发布给所有绑定交换机的队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class FanoutConsumerOne {
//交换机名称
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、路由键
*
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("广播模式消费者1接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(queueName, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
运行消费者一后查看管理界面,会多一个临时队列:
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class FanoutConsumerTwo {
//交换机名称
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 fanout-广播类型
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//声明临时队列临时队列
String queueName = channel.queueDeclare().getQueue();
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、路由键
*
* */
channel.queueBind(queueName,EXCHANGE_NAME,"");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("广播模式消费者2接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(queueName, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
运行消费者二后查看管理界面,也会多一个临时队列:
测试
先启动两个消费者,再启动生产者,可以看到两个消费者都得到了消息
消费者一控制台输出:
广播模式消费者1接收消息: 广播模式消息-Hello World!
消费者二控制台输出:
广播模式消费者2接收消息: 广播模式消息-Hello World!
此时去管理界面查看交换机,多了新创建的fanout_exchange
:
结束两个消费者的运行后,临时队列消失:
Direct模式
RabbitMQ 的 Direct 模式 是最常用的消息路由模式之一,适用于精确匹配路由键的场景。在 Direct 模式下,队列会通过路由键与交换机进行绑定。发布消息时,需要指定路由键进行发布,交换机会将消息发送到与该路由键精确匹配的队列。
注意: Direct 模式中一个队列对应一个消费者,交换机通过路由键将消息发布到不同的队列中由消费者消费
Direct 模式的特点
-
精确匹配:
消息发布的路由键必须与队列绑定的路由键完全一致,消息才能被路由到该队列。
不支持模糊匹配。 -
消息定向投递:
用于发送消息到特定的队列,实现消息的点对点投递。
如果消息发布使用的路由键没有任何对应绑定的队列,消息会被丢弃(除非使用备用交换机)。 -
支持多个队列绑定:
多个队列可以使用相同的路由键绑定到同一个交换机,消息会同时发送到所有匹配的队列。
Direct 模式的核心概念
-
路由键(Routing Key):消息发送时指定的字符串,用于指示消息的目标。是 Direct 模式中消息路由的唯一依据。
-
交换机(Exchange):Direct 模式使用
direct
类型的交换机。 -
队列(Queue):消息最终被路由到的存储位置,消费者从队列中获取消息进行处理。
Direct 模式的工作原理
-
创建交换机和队列:生产者创建一个类型为
direct
的交换机,并创建需要的队列。 -
绑定队列:队列通过路由键绑定到交换机。
-
发送消息:生产者发送消息时指定路由键。
-
路由消息:交换机会根据消息的路由键将消息路由到对应绑定的队列。
Direct 模式的应用场景
-
任务分发:将不同类型的任务发送到不同的队列,由专门的消费者处理。
-
日志系统:按日志级别(如
info
、error
、debug
)发送消息到不同的队列。 -
定向通知:给特定用户或特定服务发送消息。
Direct 模式的优缺点
优点:
- 简单易用,逻辑清晰。
- 精确匹配路由键,适合点对点场景。
- 高效且易于维护。
缺点:
- 灵活性相对较低,仅支持精确匹配。
- 无法实现广播或模糊匹配场景(需要结合其他模式)。
生产者
声明交换机后,通过两个路由键来发布不同的消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class DirectProducer {
//交换机名称
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) {
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//创建两个路由键
String routingkey1 = "info";
String routingkey2 = "error";
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,direct模式下交换机会通过路由键发布消息,只有通过该路由键绑定到交换机的队列才会接收到消息
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
*
* 这里使用两个路由键来发布两个消息
* */
channel.basicPublish(EXCHANGE_NAME,routingkey1,null, "路由模式消息-info".getBytes());
channel.basicPublish(EXCHANGE_NAME,routingkey2,null, "路由模式消息-error".getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者一
声明两个队列,通过与生产者相同的路由键来绑定生产者的交换机,来接收生产者消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class DirectConsumerOne {
//交换机名称
private final static String EXCHANGE_NAME = "direct_exchange";
//用于接收info路由键所发布消息的队列
private final static String QUEUE_INFO = "queue_info";
//用于接收error路由键所发布消息的队列
private final static String QUEUE_ERROR = "queue_error";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明队列
channel.queueDeclare(QUEUE_INFO, true, false, false, null);
channel.queueDeclare(QUEUE_ERROR, true, false, false, null);
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "error");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println("路由模式消费者1接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO, true, consumer);
channel.basicConsume(QUEUE_ERROR, true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
消费者二
声明一个队列,使用生产者不存在的路由键来绑定生产者的交换机,观察是否会收到生产者发布的消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class DirectConsumerTwo {
//交换机名称
private final static String EXCHANGE_NAME = "direct_exchange";
//队列
private final static String QUEUE_OTHER = "queue_other";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 direct-路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明队列
channel.queueDeclare(QUEUE_OTHER, true, false, false, null);
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* */
channel.queueBind(QUEUE_OTHER, EXCHANGE_NAME, "other");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println("路由模式消费者2接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_OTHER, true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
消费者一控制台输出如下:
路由模式消费者1接收消息: 路由模式消息-info!
路由模式消费者1接收消息: 路由模式消息-error!
因为direct路由模式根据路由键来进行精确匹配,生产者并没有用与消费者二相同的路由键发布消息,所以消费者二没有收到任何消息。
下图是路由模式创建的交换机及队列
Topic模式
RabbitMQ 的 Topic 模式 是一种基于主题路由的消息模式,允许使用路由键进行模糊匹配来发布消息。相比 Direct 模式,Topic 模式提供了更灵活的消息路由机制,适用于更复杂的场景。
Topic 模式的核心特点
-
模糊匹配:
- 消息的路由键可以是一个点分隔的字符串(如
order.created.us
),通过绑定键中的通配符来实现模糊匹配。
- 消息的路由键可以是一个点分隔的字符串(如
-
支持通配符:
*
:匹配一个单词(由点.
分隔)。#
:匹配零个或多个单词。
-
灵活性高:
- 消息可以根据多级主题(如区域、服务类型、操作类型等)进行分类和路由。
-
广播与定向的结合:
- 可以实现精确匹配(类似 Direct 模式)或主题广播(类似 Fanout 模式)。
Topic 模式的核心概念
-
交换机(Exchange):Topic 模式使用
topic
类型的交换机。 -
路由键(Routing Key):消息发送、绑定交换机与队列时指定的键,通常是点分隔的多级字符串(如
log.info
、order.created.us
)。 -
队列(Queue):接收和存储消息。
Topic 模式的工作原理
-
生产者发送消息:生产者向
topic
类型的交换机发送消息,并指定路由键。 -
队列绑定规则:队列通过路由键与交换机绑定,路由键可以使用通配符来定义匹配规则。
-
交换机路由消息:交换机会根据消息的路由键与队列进行匹配,将符合条件的消息发送到对应队列。
Topic 模式的通配符规则
-
*
(星号):- 匹配一个单词(由
.
分隔)。 - 例如:
- 消息路由键:
log.info
- 队列绑定路由键:
log.*
- 匹配成功。
- 消息路由键:
- 匹配一个单词(由
-
#
(井号):- 匹配零个或多个单词。
- 例如:
- 路由键:
order.created.us
- 队列绑定路由键:
order.#
- 匹配成功。
- 路由键:
Topic 模式的应用场景
-
日志系统:按照日志的类别(如
info
、error
、warning
)或模块(如auth
、order
)路由消息。 -
分布式任务:根据任务的类型或区域分发任务(如
order.created.us
)。 -
通知系统:按照不同主题(如用户通知、系统警报)发送消息。
Topic 模式的优缺点
优点:
- 支持复杂的消息路由规则。
- 灵活性高,适合动态场景。
- 支持广播和定向投递的结合。
缺点:
- 配置复杂度略高。
- 对通配符匹配的性能要求高,可能影响路由效率。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class TopicProducer {
//交换机名称
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) {
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//创建两个路由键
String routingkey1 = "message.info.one";
String routingkey2 = "message.error.one";
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,广播模式下需要指定一个交换机来进行消息广播
* 2.第二个参数是routingKey,即路由键,topic模式下交换机会通过路由键发布消息,队列绑定时可通过模糊匹配路由键来接收消息
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
*
* 这里使用两个路由键来发布两个消息
* */
channel.basicPublish(EXCHANGE_NAME,routingkey1,null, "topic模式消息-info".getBytes());
channel.basicPublish(EXCHANGE_NAME,routingkey2,null, "topic模式消息-error".getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者一
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class TopicConsumerOne {
//交换机名称
private final static String EXCHANGE_NAME = "topic_exchange";
//用于接收info路由键所发布消息的队列
private final static String QUEUE_INFO = "topic_queue_info";
//用于接收error路由键所发布消息的队列
private final static String QUEUE_ERROR = "topic_queue_error";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明队列
channel.queueDeclare(QUEUE_INFO, true, false, false, null);
channel.queueDeclare(QUEUE_ERROR, true, false, false, null);
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* */
/**
* 使用通配符路由键来绑定交换机与队列:
*
* 通配符
* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词
* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词
* 如:
* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等
* audit.* 只能匹配 audit.irs
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "*.info.#");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "*.error.#");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println("topic模式消费者1接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO, true, consumer);
channel.basicConsume(QUEUE_ERROR, true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
消费者二
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class TopicConsumerTwo {
//交换机名称
private final static String EXCHANGE_NAME = "topic_exchange";
//用于接收info路由键所发布消息的队列
private final static String QUEUE_INFO = "topic_queue_info";
//用于接收error路由键所发布消息的队列
private final static String QUEUE_ERROR = "topic_queue_error";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息,测试完成后在IDEA中手动结束主方法即可
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
/**
* 从通道声明指定交换机
* 参数1: 交换机名称,没有自动创建
* 参数2: 交换机类型 topic-模糊匹配路由模式
*
* */
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 声明队列
channel.queueDeclare(QUEUE_INFO, true, false, false, null);
channel.queueDeclare(QUEUE_ERROR, true, false, false, null);
/**
* 绑定交换机和队列,以实现消息路由,通过交换机发布的消息只会分配给其绑定的队列
* 从左到右的参数分别是:队列名、交换机名、绑定的路由键
*
* 这里使用不同的路由键,同时将两个队列绑定到交换机
* */
/**
* 使用通配符路由键来绑定交换机与队列:
*
* 通配符
* * (star) can substitute for exactly one word. 匹配不多不少恰好1个词
* # (hash) can substitute for zero or more words. 匹配零个、一个或多个词
* 如:
* audit.# 匹配audit、audit.irs 、或者audit.irs.corporate等
* audit.* 只能匹配 audit.irs
* */
channel.queueBind(QUEUE_INFO, EXCHANGE_NAME, "*.info");
channel.queueBind(QUEUE_ERROR, EXCHANGE_NAME, "*.error");
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println("topic模式消费者2接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(QUEUE_INFO, true, consumer);
channel.basicConsume(QUEUE_ERROR, true, consumer);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动两个消费者,再启动生产者,效果为消费者一控制台有输出,消费者二控制台无输出。
生产者发布消息的路由键为
message.info.one
message.error.one
消费者一的路由键为
*.info.#
*.error.#
*
匹配不多不少恰好1个词,#
匹配零个、一个或多个词,所以消费者一可以匹配成功。而消费者二的路由键:
*.info
*.error
只能匹配类似于message.info与message.error格式的路由键,故无法接收到消息。
消费者一控制台输出:
topic模式消费者1接收消息: topic模式消息-info!
topic模式消费者1接收消息: topic模式消息-error!
下图为管理界面topic模式创建的交换机及队列
Rpc模式
RabbitMQ的RPC(Remote Procedure Call) 模式允许生产者发布消息后,接收消费者的回调信息(类似http的请求与响应),就像调用本地方法一样接收返回值。RabbitMQ 提供了一个简单但强大的机制来实现 RPC 功能。
RPC 模式的核心概念
-
客户端(Client):即生产者,发起 RPC 请求,发送消息到队列并等待服务端返回结果。
-
服务端(Server):即消费者,接收 RPC 请求,对请求进行处理并将结果返回给客户端。
-
队列:客户端(生产者)将请求发送到队列中,服务端监听该队列以接收请求。
-
回调队列(Callback Queue):客户端为接收服务端(消费者)返回的结果而设置的专用队列。
-
Correlation ID(相关 ID):用于标识每个 RPC 请求和其对应的响应,使客户端能正确处理返回结果。
RPC 模式的工作流程
-
客户端发送请求:
- 创建一个唯一的回调队列。
- 生成一个唯一的
Correlation ID
,用于标识请求。 - 将消息发送到指定的请求队列,并设置消息的
replyTo
属性为回调队列。
-
服务端处理请求:
- 从请求队列中获取消息。
- 执行处理逻辑并生成结果。
- 将结果发送到客户端指定的回调队列,带上原始消息的
Correlation ID
。
-
客户端接收响应:
- 监听回调队列。
- 检查返回消息的
Correlation ID
是否与请求的Correlation ID
匹配。 - 返回结果给调用方。
RPC 模式的优缺点
优点:
-
实现简单:通过 RabbitMQ 提供的基本功能可以实现完整的 RPC 流程。
-
松耦合:客户端和服务端无需直接通信,降低了依赖。
-
支持并发:多个服务端可以监听同一请求队列,实现任务负载均衡。
缺点:
-
性能限制:消息的发送和接收增加了额外的延迟,不适合高实时性要求的场景。
-
资源开销:每个请求需要单独的回调队列,消耗更多的资源。
-
缺乏强一致性保证:消息的丢失或服务端失败可能导致请求无响应。
应用场景
-
任务分发和结果收集:将复杂的任务分发到多个服务处理,收集处理结果。
-
远程调用:在微服务架构中,调用其他服务的接口。
-
计算密集型任务:将大规模计算任务分发到多个节点执行。
生产者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RpcProducer {
//rpc队列
private final static String REQUEST_QUEUE = "rpc_queue";
public static void main(String[] args) {
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
//定义临时队列,用于消息回调
String replyQueueName = channel.queueDeclare().getQueue();
final String corrId = UUID.randomUUID().toString(); // 生成唯一的 Correlation ID
// 设置请求属性
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName) // 设置回调队列
.build();
//发送请求到请求队列
channel.basicPublish("", REQUEST_QUEUE, props, "Rpc模式消息-Hello World".getBytes("UTF-8"));
//创建阻塞队列用于接收响应
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
//消费回调队列中的消息
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response.offer(new String(delivery.getBody(), "UTF-8")); // 放入响应队列
}
}, consumerTag -> {});
//等待响应并返回
String result = response.take();
//取消订阅回调队列
channel.basicCancel(ctag);
System.out.println("回调消息:"+result);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class RpcConsumer {
//rpc队列
private final static String REQUEST_QUEUE = "rpc_queue";
public static void main(String[] args) {
//这里不关闭连接,否则接收不到消息并无法回调,测试完成后在IDEA中手动结束主方法即可
try {
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
// 设置每次只处理一个消息
channel.basicQos(1);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
*
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body, "utf-8");
System.out.println("rpc模式消费者接收消息: " + msg + "!");
// 发送响应到回调队列
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), replyProps, "Rpc消费者接收成功".getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*
* 这里同时消费两个队列的消息
*/
channel.basicConsume(REQUEST_QUEUE, false, consumer);
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动生产者,再启动消费者
消费者控制台输出:
rpc模式消费者接收消息: Rpc模式消息-Hello World!
生产者控制台输出:
回调消息:Rpc消费者接收成功
生产者启动并发布消息后,会等待消费者的回调消息,当消费者成功消费后,生产者接收到回调消息并打印控制台
过期时间设置
过期时间TTL
表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL
较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL
值,就成为dead message被投递到死信队列, 消费者将无法再收到该消息。
设置队列过期时间
这里以简单模式为例演示
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
import java.util.HashMap;
import java.util.Map;
public class QueueTtlProducer {
//队列名称
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
/**
* channel.queueDeclare()方法为声明队列,参数明细如下:
* 1、queue 队列名称
* 2、durable 是否持久化,如果持久化,mq重启后队列还在
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
*/
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
// 消息内容
String message = "Hello World";
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class QueueTtlConsumer {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
// 声明队列
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",10000);//x-message-ttl为队列的超时属性,这里设置为10秒过期
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("超时队列接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
进行测试
先启动生产者,查看管理页面,多出了声明的ttl_queue
队列,拥有超时属性,并且队列内存在一个未消费的消息:
不启动消费者,等待10秒后页面自动刷新,消息已消失:
生产者启动10秒后再启动消费者,其控制台无任何输出信息,如果在10秒内启动消费者,则消费者会收到消息:
超时队列接收消息: Hello World!
设置消息过期时间
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。
生产者
依旧以简单模式为例,这里使用BasicProperties
来为消息设置超时时间:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class MessageTtlProducer {
//队列名称
private final static String QUEUE_NAME = "messageTtl_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用上面创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
//声明队列时不设置超时
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息内容
String message = "Hello World";
//设置消息的过期时间,此处相当于为每一个消息单独设置属性
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.contentEncoding("UTF-8") // 编码方式
.expiration("10000")// 过期时间
.build();
/**
* 使用通道将消息发布到队列中,参数说明如下:
* 1.第一个参数是exchange,即交换机名称,简单模式不需要设置交换机,此处为空即可(为空时,使用的是rabbitmq的默认交换机AMQP default)
* 2.第二个参数是routingKey,即路由键,简单模式下不使用交换机时,此处参数与队列名相同,会自动匹配目标队列
* 3.第三个参数为BasicProperties,即消息属性,可以为要发送的消息设置一些条件,比如消息类型、过期时间等等
* 4.第四个参数是消息本身,即消息的内容,这里是一个字节数组,表示消息的正文
* */
channel.basicPublish("", QUEUE_NAME, basicProperties, message.getBytes());
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
}
}
}
消费者
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
public class MessageTtlConsumer {
//要与生产者的队列名保持一致
private final static String QUEUE_NAME = "messageTtl_queue";
public static void main(String[] args){
//使用try-with-resources模式,确保无论是否发生异常,最后都会关闭Connection与Channel
try (
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel()
) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
/**
* 获取消息,并且处理,这个方法类似事件监听,当接收到消息后此方法将被自动调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("超时队列接收消息: " + msg + "!");
}
};
/**
* 通过通道监听队列消息并实现消费,参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现手动回复
* 3、callback,消费方法,当消费者接收到消息要执行的方法,将上面的consumer传入其中
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动生产者,查看管理页面看到新创建的队列,由于只是为消息设置超时,队列没有了蓝色的TTL标识:
如果在生产者设置的超时时间内启动消费者,则其控制台输出如下,否则消息超时消费者接收不到:
超时队列接收消息: Hello World!
其他属性设置
上面列举了队列与消息的超时属性设置,除此之外还有很多其他可设置项
队列扩展属性
常见队列属性
在 RabbitMQ 中,队列可以通过 queueDeclare
的 arguments
参数来配置扩展属性。以下是常见的队列扩展属性及其含义:
x-message-ttl
- 含义:设置队列中消息的存活时间(以毫秒为单位)。
- 用法:
arguments.put("x-message-ttl", 10000);
消息超过这个时间未被消费就会过期并被移除队列。
x-expires
- 含义:设置队列的存活时间(以毫秒为单位)。如果队列在指定时间内未被使用(没有消费者连接、没有消息存储等),队列将被自动删除。
- 用法:
arguments.put("x-expires", 60000);
队列将在 60 秒后自动删除。
x-max-length
- 含义:限制队列中最大消息数量。如果队列中的消息数达到限制,新发布的消息将被丢弃或替换最早的消息(与
x-overflow
配合使用)。 - 用法:
arguments.put("x-max-length", 1000);
队列最多存储 1000 条消息。
x-max-length-bytes
- 含义:限制队列中消息总大小(以字节为单位)。如果总大小超过限制,新发布的消息将被丢弃或替换最早的消息。
- 用法:
arguments.put("x-max-length-bytes", 10485760);
队列的消息总大小限制为 10 MB。
x-overflow
- 含义:设置队列的溢出行为,当队列达到
x-max-length
或x-max-length-bytes
时的处理方式。 - 取值:
"drop-head"
:丢弃最早的消息(FIFO 式移除)。"reject-publish"
:拒绝新发布的消息。
- 用法:
arguments.put("x-overflow", "drop-head");
x-dead-letter-exchange
- 含义:设置队列的死信交换机。队列中的死信消息(如过期、被拒绝、队列满等)将被转发到指定的交换机。
- 用法:
arguments.put("x-dead-letter-exchange", "dead_exchange");
当消息变成死信时,它们将路由到dead_exchange
。
x-dead-letter-routing-key
- 含义:设置死信消息的路由键(配合
x-dead-letter-exchange
使用)。 - 用法:
arguments.put("x-dead-letter-routing-key", "dead_key");
死信消息将使用dead_key
进行路由。
x-max-priority
- 含义:设置队列的最大优先级,启用消息优先级队列。
- 用法:
arguments.put("x-max-priority", 10);
队列支持消息优先级,优先级范围为 0 到 10。
x-queue-mode
- 含义:设置队列模式。
- 取值:
"default"
:默认模式,所有消息存储在内存和磁盘上。"lazy"
:惰性模式,尽可能将消息存储到磁盘以减少内存消耗。
- 用法:
arguments.put("x-queue-mode", "lazy");
队列切换到惰性模式。
x-queue-master-locator
- 含义:在 RabbitMQ 集群环境中,指定队列主副本的位置策略。
- 取值:
"min-masters"
:选择负载最低的节点作为主副本。"client-local"
:选择与客户端最近的节点作为主副本。
- 用法:
arguments.put("x-queue-master-locator", "min-masters");
自定义插件属性
- RabbitMQ 支持通过插件扩展的属性,例如延迟消息插件(
rabbitmq-delayed-message-exchange
)会引入新的属性:x-delayed-type
:指定延迟队列的交换机类型(如direct
、fanout
、topic
)。- 用法:
arguments.put("x-delayed-type", "direct");
示例代码
以下代码示例展示了如何为队列配置多种属性:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
public class QueueDeclareExample {
private final static String QUEUE_NAME = "example_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 配置队列属性
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 消息过期时间
arguments.put("x-max-length", 100); // 最大消息数
arguments.put("x-dead-letter-exchange", "dead_exchange"); // 死信交换机
arguments.put("x-queue-mode", "lazy"); // 惰性队列
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
System.out.println("Queue declared with custom properties.");
}
}
}
注意事项
- 属性顺序依赖:某些属性依赖其他属性,例如
x-dead-letter-routing-key
需要同时设置x-dead-letter-exchange
。 - 持久化:队列的持久化属性与扩展属性分开设置,队列扩展属性不会影响持久化行为。
- 集群环境:某些属性(如
x-queue-master-locator
)仅在集群环境中有效。
通过合理配置队列属性,可以更好地满足业务需求并提升 RabbitMQ 的性能和可靠性。
消息扩展属性
在 RabbitMQ 中,可以通过 AMQP.BasicProperties
配置发布消息时的多种属性。除了示例代码中设置的消息过期时间 (expiration
) 外,还可以为消息配置其他重要属性。
常见消息属性
contentType
(内容类型)
-
描述:指定消息的 MIME 类型,例如
text/plain
、application/json
等。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder() .contentType("application/json") .build();
contentEncoding
(内容编码)
-
描述:指定消息内容的编码方式,例如
UTF-8
、gzip
等。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder() .contentEncoding("UTF-8") .build();
expiration
(过期时间)
-
描述:设置消息的生存时间(以毫秒为单位)。消息在队列中超过指定时间后会变为死信。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .expiration("60000") // 60 秒 .build();
priority
(优先级)
-
描述:指定消息的优先级,配合队列的
x-max-priority
属性使用。值范围是 0(最低优先级)到队列配置的最大优先级。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder() .priority(5) .build();
correlationId
(关联 ID)
-
描述:用于将请求和响应进行关联,通常在 RPC 模式中使用。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .correlationId("12345") .build();
replyTo
(回调队列名称)
-
描述:指定响应消息的回调队列,用于 RPC 模式。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .replyTo("response_queue") .build();
messageId
(消息 ID)
-
描述:消息的唯一标识符,用于幂等性校验或追踪。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .messageId("msg-001") .build();
timestamp
(时间戳)
-
描述:消息的创建时间,通常由生产者设置。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .timestamp(new Date()) .build();
type
(消息类型)
-
描述:指定消息的类型,用于消费者区分不同消息的处理逻辑。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .type("order_created") .build();
userId
(用户 ID)
-
描述:用于验证发布消息的用户。RabbitMQ 会检查
userId
是否与连接的用户一致。 -
示例:
basicProperties = new AMQP.BasicProperties.Builder() .userId("guest") .build();
appId
(应用 ID)
-
描述:标识发布消息的应用程序。
-
示例:
basicProperties = new AMQP.BasicProperties.Builder() .appId("my_app") .build();
headers
(自定义头部信息)
-
描述:消息的元数据,存储为键值对格式,可以传递扩展信息。
-
示例:
Map<String, Object> headers = new HashMap<>(); headers.put("source", "web"); headers.put("destination", "api"); basicProperties = new AMQP.BasicProperties.Builder() .headers(headers) .build();
完整示例代码
以下代码展示了如何为消息设置多种属性并发布到队列:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class MessagePropertiesProducer {
private final static String QUEUE_NAME = "properties_queue";
public static void main(String[] args) {
try (
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel()
) {
// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息内容
String message = "Hello RabbitMQ with properties!";
// 配置消息属性
Map<String, Object> headers = new HashMap<>();
headers.put("format", "json");
headers.put("source", "application");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("UTF-8")
.expiration("60000")
.priority(5)
.correlationId("12345")
.replyTo("response_queue")
.messageId("msg-001")
.timestamp(new Date())
.type("custom_message")
.userId("guest")
.appId("my_app")
.headers(headers)
.build();
// 发布消息
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
System.out.println("Message sent with properties!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
注意事项
- 属性优先级:队列的属性可能会覆盖消息的属性。例如,队列的
x-message-ttl
会优先于消息的expiration
。 - 类型匹配:某些属性需要特定格式,如
priority
必须是整数,headers
是键值对。 - 用户权限:使用
userId
属性时,RabbitMQ 会严格校验用户身份,需确保设置正确。
通过设置这些属性,可以增强消息的功能性和可靠性,满足更多业务需求。
死信队列
概念
死信队列是 RabbitMQ 中的一种特殊队列,用于存储被拒绝或无法被正常处理的消息。消息变成“死信”并被转发到死信队列,方便后续分析和处理。
死信队列是消息处理失败时的补救措施。可以使用死信队列进行日志分析、故障排查或重新投递机制。
消息变成死信的三种情况
-
消息被消费者拒绝且
requeue=false
:- 消费者显式拒绝消息(使用
channel.basicReject
或channel.basicNack
),并指定不重新入队。
- 消费者显式拒绝消息(使用
-
消息在队列中TTL(Time-To-Live)过期:
- 设置消息或队列的 TTL,当消息超时未被消费时,进入死信队列。
-
队列达到最大长度限制:
- 队列中消息数量超过最大限制,最早的消息被丢弃,转发到死信队列。
死信队列配置
配置死信队列需要为队列设置以下参数:
x-dead-letter-exchange
:
指定死信队列对应的交换机。x-dead-letter-routing-key
(可选):
指定死信消息在死信交换机上的路由键。
生产者示例
声明一个正常的交换机,用于投递消息,使用direct模式。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtil;
public class DLXProducer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) {
try (
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel()
) {
// 声明正常交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
String routingKey = "normal_key";
String message = "测试死信消息";
// 发送消息
channel.basicPublish(NORMAL_EXCHANGE, routingKey, null, message.getBytes());
System.out.println("发送消息: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者一
声明正常的交换机与队列,再声明死信交换机与队列,然后进行绑定
私信队列使用步骤:
- 声明正常的交换机与正常队列,通过一个路由键将二者绑定。
- 声明死信交换机与死信队列,通过一个路由键将二者绑定。
- 为正常的队列配置扩展参数:
x-dead-letter-exchange
与x-dead-letter-routing-key
,分别为所声明的死信交换机与死信路由键。
代码展示
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DLXConsumer {
//普通队列
private static final String NORMAL_QUEUE = "normal_queue";
//普通交换机
private static final String NORMAL_EXCHANGE = "normal_exchange";
//普通路由键
private static final String NORMAL_KEY= "normal_key";
//死信队列
private static final String DLX_QUEUE = "dlx_queue";
//死信交换机
private static final String DLX_EXCHANGE = "dlx_exchange";
//死信路由键
private static final String DLX_KEY= "dlx_key";
public static void main(String[] args) {
try{
//创建连接与通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明普通交换机与死信交换机
channel.exchangeDeclare(DLX_EXCHANGE, "direct");
channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");
// 声明死信队列
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
// 声明普通队列并为其配置死信交换机及死信路由键,该队列的消息变为'死信'后会被投入死信交换机
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DLX_EXCHANGE);// 指定死信交换机
arguments.put("x-dead-letter-routing-key",DLX_KEY);// 指定死信路由键
channel.queueDeclare(NORMAL_QUEUE, true, false, false,arguments);
//绑定死信交换机与死信队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_KEY);
//绑定普通交换机与普通队列
channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_KEY);
// 消费普通队列消息,并模拟拒绝消息,autoAck设置为false来进行手动消息确认
channel.basicConsume(NORMAL_QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("接收到正常队列消息: " + message);
// 拒绝消息并将其发送到死信队列
channel.basicReject(envelope.getDeliveryTag(), false);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者二
监听死信队列,消费其中的消息
import com.rabbitmq.client.*;
import util.ConnectionUtil;
import java.io.IOException;
/**
* 用于消费死信队列中的消息
* */
public class DQConsumer {
//前面声明的死信队列
private static final String DLX_QUEUE = "dlx_queue";
public static void main(String[] args){
try {
//使用之前创建的连接工具,来获取到连接
Connection connection = ConnectionUtil.getConnection();
//创建会话通道,生产者和mq服务所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(DLX_QUEUE, true, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("接收消息: " + msg + "!");
}
};
//消费消息
channel.basicConsume(DLX_QUEUE, true, consumer);
}catch (Exception ex) {
ex.printStackTrace();
System.out.println("接收消息出现异常...");
}
}
}
测试
先启动消费者DLXConsumer,然后启动生产者DLXProducer,生产者启动后会将消息进行发布,然后DLXConsumer中会拒绝消息,此时消息被放入声明的死信队列中,下图的Ready
为1,表示死信队列中存有1条未消费的消息:
然后启动用于消费死信队列的消费者DQConsumer,其控制台输出:
接收消息: 测试死信消息!
然后在管理页面等待5秒自动刷新,结果如下: