在nodejs中使用RabbitMQ(一)安装,使用
安装
1、安装RabbitMQ,推荐直接使用docker安装。
docker container run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ./data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin1234 rabbitmq:4.0-management
5672端口,rabbitmq服务
15672端口,rabbitmq可视化管理界面
2、windows,linux不用容器,可根据官网教程按步骤进行安装。Installing on RPM-based Linux | RabbitMQ
RabbitMQ 介绍
RabbitMQ 是一个开源的消息代理软件(也称为消息队列),它实现了高级消息队列协议(AMQP)。RabbitMQ 提供了可靠的消息传递机制,适用于构建分布式系统、微服务架构以及需要解耦组件的应用程序。它支持多种消息传递模式,并且具有高度的可扩展性和灵活性。官方还提供了多语言支持:Python,Java,Ruby,PHP,C#,JavaScript,Go。
RabbitMQ 的基本概念
1.生产者(Producer):左侧绿色方框代表消息的生产者,生产者将消息发送到 RabbitMQ 服务器。
2.连接(Connection)和通道(Channel):消息首先通过一个连接进入 RabbitMQ,连接内部包含多个通道。每个通道是一个轻量级的连接,用于减少开销并进行通信。
3.RabbitMQ 服务(RabbitMQ Server)和虚拟主机(Virtual Host):中央部分展示了 RabbitMQ 服务器及其内部结构。RabbitMQ 服务器中,可以创建多个虚拟主机(Virtual Host),每个虚拟主机是一个独立的消息命名空间。
4.交换器(Exchange):在每个虚拟主机内,有多个交换器。交换器负责接收来自生产者的消息,并根据预定的路由规则将消息分发到不同的队列。常见的交换机类型包括:
Direct:基于精确匹配的路由键进行消息路由。
Fanout:广播消息到所有绑定的队列。
Topic:基于通配符匹配的路由键进行消息路由。
Headers:基于消息头中的键值对进行消息路由。
5.队列(Queue):消息根据路由规则被分发到对应的队列中。队列用于存储和管理消息,等待消费者来获取消息。
6.消费者(Consumer):右侧黄色方框代表消费者。通过连接和通道,消费者从 RabbitMQ 服务器的队列中获取和处理消息。
示例
一个生产者对应一个消费者。
一个生产者对应多个消费者。
如何防止数据丢失
1、优先处理每一步错误,如,队列创建,exchange路由创建,消息是否发送成功。
2、持久化队列数据 durable: true,持久化队列中消息 persistent: true。
3、手动确认消息是否接收,在数据处理完后确认,channel.ack(msg)。 (注:要防止数据重复处理)
producer.ts 在代码中没有创建exchange,会使用默认exchange。
import RabbitMQ from 'amqplib/callback_api';
function start() {
RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
if (err0) {
console.error("[AMQP]", err0.message);
return setTimeout(start, 1000);
}
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
conn.createChannel(async (err2, channel) => {
if (err2) {
console.error("[AMQP]", err2.message);
return setTimeout(start, 1000);
}
const queueName = 'queue1';
// 创建一个队列
channel.assertQueue(queueName, {
durable: true, //队列持久化
}, (err, ok) => {
if (err) {
console.log('队列创建失败!');
}
console.log(err, ok);
});
for (let i = 0; i < 30; ++i) {
console.log('message send!', channel.sendToQueue(
queueName,
Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
{ persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// (err: any, ok: Replies.Empty)=>{}
));
}
});
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
});
}
start();
consumer.ts 消费者可以启动零到多个,因为设置了持久化如果没有消费者,数据会保存等待消费。
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
const queueName = 'queue1';
channel.assertQueue(queueName, { durable: true });
console.log('[*] waiting...');
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
channel.consume(
queueName,
function (msg) {
console.log('接收到的消息', msg, msg?.content.toString());
/*
// 手动确认取消channel.ack(msg); noAck:false,
// 自动确认消息
// if (msg) {
// channel.ack(msg);
// }
*/
},
{
noAck: true, // 是否自动确认消息
// noAck: false
},
(err: any, ok: Replies.Empty) => {
console.log(err, ok);
},
);
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});