【RabbitMQ】RabbitMQ的下载安装及使用
安装RabbitMQ
下载网站:https://www.rabbitmq.com/docs/install-windows
点击后,会直接定位到依赖介绍位置,告诉你需要安装Erlang
下载Erlang
Erlang也是一种编程语言,只是比较小众,但其拥有极为出色的性能
这个网站是到GitHub上下载的,可能需要点魔法,也可以去Erlang官网下载(能下,但慢)
下载RabbitMQ
下载Erlang的同时,也顺便下载RabbitMQ吧
或者直接使用别人下载好的包,比如我这提供的包
安装Erlang
运行下载好的exe文件
- 点击Next即可
- 选择安装路径,点击Next继续
- 点击Install安装
- 安装完后,点击Close即可
安装RabbitMQ
运行下载好的exe文件
- 点击Next即可
- 选择安装路径,点击Install安装
- 安装成功后,点击Next继续
- 点击Finish完成安装
安装插件
找到RabbitMQ目录下的sbin目录,打开CMD控制台,输入
rabbitmq-plugins.bat enable rabbitmq_management
命令
重启RabbitMQ服务后访问http://localhost:15672
默认账号密码均为
guest
整合RabbitMQ
- 导入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.24.0</version>
</dependency>
一对一队列模型
- 生产者发送消息
/**
* 一对一消息队列模型:生产者
*/
public class SingleProducer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂,用于创建到RabbitMQ服务器的连接
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ服务器地址
factory.setHost("localhost");
// 创建一个连接,用于和RabbitMQ服务器建立通信通道
try (Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel()) {
// 声明一个队列,队列名为hello
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 将消息发布到指定队列中
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
运行后,RabbitMQ管理后台会增加一个队列
- 消费者消费消息
/**
* 一对一消息队列模型:消费者
*/
public class SingleConsumer {
// 定义我们正在监听的队列名称
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建连接工厂并配置连接信息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 从工厂中获取一个新的连接
Connection connection = factory.newConnection();
// 创建一个新的通道
Channel channel = connection.createChannel();
// 声明一个队列,在该通道中声明我们要监听的队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建一个回调函数,用于处理从队列中接收到的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取消息体并转换为字符串
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 在通道上开始消费队列中的消息,接收的消息会传递给deliverCallback进行处理,会持续阻塞
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
未运行代码前
运行代码后
此时在不中断消费者代码运行的情况下,再去运行生产者代码,会发现消费者会持续消费生产者增加的消息
一对多队列模型
- 生产者发送消息
/**
* 一对多消息队列:生产者
*/
public class MultiProducer {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// durable参数设置为 true,服务器重启后队列不丢失
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
// 此处使用scanner循环输入消息,模拟生产者不定时多次生产
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.nextLine();
// 指定 MessageProperties.PERSISTENT_TEXT_PLAIN,表示消息持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
运行代码,可以模拟发送多条数据
- 消费者消费信息
/**
* 一对多消息队列:消费者
*/
public class MultiConsumer {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 设置持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
System.out.println(" [x] Received '" + message + "'");
doWork(message);
} finally {
System.out.println(" [x] Done");
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开始消费消息,传入队列名称。关闭自动确认,投递回调和消费者取消回调
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
// 模拟消息处理,消息中每有一个“.”就让线程暂停10s,模拟复杂的耗时工程
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(10000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
运行消费者代码
队列中的消息不是一次性全部接收的,而是需要等待消费者完成消费(处理完事务后)并主动确认完成后,才会继续发送下一条消息
这与一对多有什么关系呢?不急,我们停掉消费者代码运行先,然后让生产者进行生成消息
然后,直接拷贝一份消费者代码,命名为MultiConsumer2。此时运行两个消费者看效果
会发现,两个消费者会轮流接收队列消息,消费完(完成任务)后才会确认并接收新的队列消息,直至队列所有消息被消费完
下面交换机模型为初学者见解,可能存在理解错误,看看就好,我后面也会深入学习,但大概率不会再修改本文章,所以要专业的、准确的还是得看官方文档,这里会用就行,被误导了概不负责,谢谢
交换机模型
交换机是消息队列中的一个组件,有点类似于中转站,上面两个模型都是生产者创建消息队列,然后由消费者去接收指定消息队列中的消息,而交换机模型中,生产者不再创建指定的消息队列,而是创建一个交换机,由消费者去绑定交换机并创建消息队列,然后再接收生产者的消息。由直接变间接。
这就有点像网络路由一样,最初,两台电脑要互发消息,就必须各自开一个网口连接网线,三台电脑要交互就各开两个网口,随着电脑接入的越多,一台电脑上要的网口就越多,网线交错也就越复杂,这时为了更好梳理网线和减少网口,就有了集线器、交互器、路由器等,而RabbitMQ中的交换机也是同样道理,为了方便管理多个消息队列及其后续变动
交换机有direct
, topic
, headers
和 fanout
四种类型,因为headers交换机的性能较差,不太推荐使用,了解有该类型即可
Fanout交换机
fanout有扇出的意思,该类型交换机会把消息一次性扇出(发布)给所有与该交换机绑定的消息队列,适用于广播消息,如更新文章后,广播消息给所有订阅文章的用户
- 生产者发送消息
/**
* 交换机模型:生产者
*/
public class FanoutProducer {
// 定义交换机的名称
private static final String EXCHANGE_NAME = "exchange";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置工厂的主机地址
factory.setHost("localhost");
// 创建一个连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个交换机,交换机名称为exchange,类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发布消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
// 获取用户输入
String message = scanner.nextLine();
// 将消息发送到指定的交换机上,交换机名称为exchange,路由键为空,消息属性为null,消息内容为用户输入的字符串
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
此时运行生产者代码,并输入内容,会发现队列表中没有新增队列
但可以再Exchanges中看到名为exchange、类型为fanout的交换机信息
- 消费者消费消息
/**
* 交换机模型:消费者
*/
public class FanoutConsumer {
// 声明交换机的名称
private static final String EXCHANGE_NAME = "exchange";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置工厂的主机地址
factory.setHost("localhost");
// 创建一个连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个交换机,交换机名称为exchange,类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// String queueName = channel.queueDeclare().getQueue();
// 创建队列,名称为fanout_queue,并绑定到交换机上
String queueName = "yg1_queue";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [员工1] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
运行消费者代码,会发现队列表中新增了yg1_queue队列
此时拷贝多一份消费者代码,并修改队列名为yg2_queue
运行后,队列表会新增yg2_queue队列
此时生产者发送消息后,会同时发送给两位消费者进行处理
需要注意的是,如果生产者先发送消息,再创建消费者,因为还没有创建存储的消息队列,所以是无法存储消息的,即消费者无法接收队列创建前的旧消息;但如果消费者已经启动过一次了(RabbitMQ中已有其消息队列),那么生产者发送消息后再启动消费者,还是能接收到消息的;
就比如你没有QQ号,那么别人发送不了消息给你,当你创建好QQ号后,无论你是否上线,别人都能发送消息给你
以一对多队列模型为例
- 我们删除队列表中所有队列
- 然后只运行生产者代码,会发现它直接创建好了一个消息队列
- 此时发送消息后再启动消费者,消费者是能接收到队列消息的
因为消息只在消息队列中传递,交换机只是中间件。这里的生产者只创建交换机,不创建队列,队列有消费者创建
为什么要用交换机呢? (个人理解)
打个不恰当的比喻:消息队列就是Q群,此时你有n个Q群,你要给每个群都发送一个拼手气红包,让群友去争抢;你自然可以手动一个一个去发,但更好的方式是选择采用脚本(交换机),通过脚本(交换机)去给该账号下的每个群(消息队列)都去发送一个拼手气红包。这样的好处在于,后面不论是有新群还是有群被解散,你都无需理会,你只需在意是否是自己q号上的群(是否绑定在交换机上)
Direct交换机
fanout就像AOE技能,无差别的范围攻击,而Direct就像是指定性单体技能,即使有多个消息队列绑定在其上,也能根据路由键给指定消息队列发送消息,适用于指派任务,通过路由键分发任务给指定消息队列
- 生产者发送消息
/**
* direct交换机模型:生产者
*/
public class DirectProducer {
// 定义交换机名称
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接工厂的主机地址为本地主机
factory.setHost("localhost");
// 建立连接并创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 使用通道声明交换机,类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
// 读取用户输入内容,并以空格分隔
String userInput = scanner.nextLine();
String[] parts = userInput.split(" ");
if(parts.length < 1){
continue;
}
String message = parts[0];
// 路由键,用于确定消息被路由到哪个队列
String severity = parts[1];
// 发布消息到交换机
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [生产者] 发送 '" + severity + "':'" + message + "'");
}
}
}
}
运行生产者代码,同样不会生成消息队列而是创建类型为direct的交换机
- 消费者消费消息
/**
* direct交换机模型:消费者
*/
public class DirectConsumer {
// 定义我们正在监听的交换机名称
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接工厂的主机地址为本地主机
factory.setHost("localhost");
// 建立与 RabbitMQ 服务器的连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
// 声明一个 direct 类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 声明一个匿名队列,并获取队列名称
// String queueName = channel.queueDeclare().getQueue();
// 手动声明队列名称
String queueName = "cy_queue";
// 创建队列,并设置队列的持久化属性为 true
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列到交换机上,并指定绑定路由键为“cy”
channel.queueBind(queueName, EXCHANGE_NAME, "cy");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建一个 DeliverCallback 实例来处理接收到的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [残炎] 收到了 '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
// 开始消费队列中的消息,设置自动确认消息已被消费
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
运行消费者代码,会生成指定名称的队列
此时,生产者发送消息并指定路由键后,对应的消费者会收到消息,而不属于它 的消息不会收到
同样的,拷贝一份消费者代码并启动
生产者每次发送给不同目标发送消息,都会精准无误地转发到指定目标
其实上面也同样演示了一个问题,那就是发送消息给不存在的路由键目标,也就是还没拷贝第二份消费者(fy)代码时生产者给fy发送的消息,是直接丢弃的。
direct就像是能输入具体发送目标类型的红包脚本,它允许我们自行选择要发送的目标群类型,而不是账号下的所有群,毕竟有些群可能是不相干的人拉你进的,群友与你没有任何瓜葛,你又何必给他们发呢?又或者你只想给自己所有的家族群发(只要你有标记哪些群是家族群)
direct能否给不同队列发送消息?
可以的,官网明确说了,不同消息队列允许绑定相同的路由键,而我们发送消息只关注路由键是否存在,并不在意有几个队列绑定在同一路由键上,所以我们可以将同类型的消息队列绑定在同一路由键上
Topic交换机
topic交换机与direct交换机类型,也是指定性,只不过它不再是单体指定,而是允许指定多个目标(注意,这里的目标指的是路由键而非具体的消息队列)。
- 生产者发送消息
/**
* topic交换机模型:生产者
*/
public class TopicProducer {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception {
// 创建连接工厂并设置连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 建立连接并创建通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 使用通道声明交换机,类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String userInput = scanner.nextLine();
String[] parts = userInput.split(" ");
if (parts.length < 1) {
continue;
}
String message = parts[0];
String routingKey = parts[1];
// 发布消息到指定的交换机和路由键
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [生产者] 发送 '" + routingKey + "':'" + message + "'");
}
}
}
}
运行生产者代码,同样不会生成消息队列而是创建类型为topic的交换机
- 消费者消费消息
/**
* topic交换机模型:消费者
*/
public class TopicConsumer {
// 定义监听的交换机名称
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明使用的交换机,并指定类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 创建cy队列
String queueName = "cy_queue";
// 声明队列,并设置队列未持久化、非排他、非自动删除
channel.queueDeclare(queueName, true, false, false, null);
// 绑定队列到交换机上,并指定路由键模式为“#.cy.#”
channel.queueBind(queueName, EXCHANGE_NAME, "#.cy.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [cy组] 收到 '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
同样拷贝一份消费者代码并运行
与direct对比,除了routing key采用格式不同外,表面上好像并没有太大的区别
但topic可以以组合的发送而direct并不能,如下,cy组和fy组可以以任意形式组合,并发送对应消息给他们
这样,我们就可以一次性指定多个具体目标去处理指定消息
值得注意的一点,topic允许一个消息队列绑定多个绑定键,然后只要匹配其中一个即可收到消息
我们再拷贝一份消费者代码,修改如下,这里我们绑定的路由键包含cy与fy
此时无论我们给cy发,还是fy发,cyfy组都能收到消息
且需要注意的是,即使我们同时给cy和fy发消息(项目3 cy.fy),cyfy组都只会收到一条消息,不同出现重复接收的情况
topic就像是比direct更高级的脚本(官方说的,topic是比direct更复杂),direct这个脚本只能指定一个群类型,假设我除了想给所有家族群发,还想给我创建的群发,或者我管理的群,很显然,这需要我重新设置目标类型并再一次启动脚本。
这自然没有任何问题,无非就是需要操作多次的问题,可事实上,可能有些家族群是由我创建的或者由我管理的,那么就会出现一个问题,脚本会在某些群里发送多次红包,这很显然不符合我给指定目标群发送一个拼手气红包的目的。topic便是能处理这个问题的脚本,我能一次性设置家族群、我是群主、我是管理员三个类型,然后一次性给我Q号下满足这些类型的群发送一个拼手气红包,且及时有些群满足多个条件,也只会发送一个。