在nodejs中使用RabbitMQ(五)死信队列,延迟队列
死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种机制,用于处理无法成功消费或不能按预期处理的消息。简单来说,死信队列用于存储那些不能被正常消费或处理的消息,以便后续审查或重新处理。
死信队列用于处理以下几种情况的消息:
1、消息过期:如果消息的 TTL(存活时间)到期,消息会被认为是死信,自动转发到死信队列。
2、消息被拒绝(nack
):消费者拒绝消息时,如果消息被标记为不重新排队(requeue: false
),消息会被送往死信队列。
3、队列满:如果队列已经达到最大长度,且还有新消息进入,RabbitMQ 会丢弃老的消息并将其送到死信队列。
4、消息无法路由:如果消息没有匹配到任何队列的路由规则,它会被送往死信队列。
配置死信队列
RabbitMQ 提供了两种主要机制来设置死信队列:
1、死信交换机(Dead Letter Exchange,DLX):
- 设置一个专门的交换机(Dead Letter Exchange,DLX),并将死信消息转发到该交换机。
- 可以为队列设置
x-dead-letter-exchange
属性来指定死信交换机。
2、死信路由键(Dead Letter Routing Key):
- 可以设置死信消息的路由键,以便将消息路由到适当的死信队列。
- 通过
x-dead-letter-routing-key
属性来配置。
3、 消息过期时间(TTL):
- 你可以设置队列的消息存活时间(TTL)。消息在过期后会变为死信并转发到死信队列。
- 通过
x-message-ttl
属性来设置消息的生存时间。
死信队列创建
1、一个生产者1,一个exchange交换机1,一个消费者1(主队列)。
2、一个exchange交换机2,一个消费者2(死信队列)。
3、消费者1要配置队列参数'x-dead-letter-exchange','x-dead-letter-routing-key', 'x-message-ttl'。
生产者1通过exchange1发送消息给消费者1,如果消息不能正常处理,会通过exchange2转发给消费者2.
示例
一个生产者,将消息发送给两个消费者,消费者消息如果处理失败,会将消息转发给死信队列。
producer.ts
import RabbitMQ from 'amqplib';
async function start() {
try {
const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
let channel = null;
try {
channel = await conn.createChannel();
} catch (err) {
console.error("[AMQP]", err);
return setTimeout(start, 1000);
}
const exchangeName = 'exchange8';
await channel.assertExchange(
exchangeName,
'direct',
{
durable: true,
},
);
const deadExchangeName = 'exchange8_dead_letter';
await channel.assertExchange(
deadExchangeName,
'direct',
{
durable: true,
},
);
let routeKey = 'route.key';
for (let i = 0; i < 4; ++i) {
// console.log('message send!', channel.sendToQueue(
// queueName,
// Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
// { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// // (err: any, ok: Replies.Empty)=>{}
// ));
let num = Math.ceil(Math.random() * 100000);
console.log('消息发送是否成功', num, routeKey, channel.publish(
exchangeName,
routeKey,
Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),
{
persistent: true,
},
));
}
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
} catch (err) {
console.error("[AMQP]", err);
return setTimeout(start, 1000);
}
}
start();
dead_letter.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
console.log('[*] waiting...');
const deadExchangeName = 'exchange8_dead_letter';
const deadRouteKey = 'route.key.dead.letter';
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// 死信队列
const deadQueueName = 'queue8';
channel.assertQueue(deadQueueName, {
durable: true, // 保证死信队列持久化
});
channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);
channel.consume(deadQueueName, (msg) => {
console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());
if (msg) {
channel.ack(msg);
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false,
arguments: {}
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
consumer.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
console.log('[*] waiting...');
const exchangeName = 'exchange8';
const routeKey = 'route.key';
const deadExchangeName = 'exchange8_dead_letter';
const deadRouteKey = 'route.key.dead.letter';
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// 主队列
const queueName = 'queue7';
channel.assertQueue(queueName, {
durable: true, // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
arguments: {
// 对应的死信交换机(空字符串表示默认交换机)
'x-dead-letter-exchange': deadExchangeName,
// 死信队列的路由键
'x-dead-letter-routing-key': deadRouteKey,
// 队列消息过期时间1分钟
'x-message-ttl': 60000,
},
});
channel.bindQueue(
queueName,
exchangeName,
routeKey,
{},
(err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
},
);
channel.consume(queueName, function (msg) {
if (Math.ceil(Math.random() * 100) > 50 && msg) {
console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
channel.ack(msg);
} else if (msg) {
console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
// 第二个参数,false拒绝当前消息
// 第二个参数,true拒绝小于等于当前消息
// 第三个参数,3false从队列中清除
// 第三个参数,4true从新在队列中排队
channel.nack(msg, false, false);
} else {
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false,
arguments: {}
}, (err: any, ok: Replies.Empty) => {
console.log(err, ok);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
consumer2.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
console.log('[*] waiting...');
const exchangeName = 'exchange8';
const routeKey = 'route.key';
const deadExchangeName = 'exchange8_dead_letter';
const deadRouteKey = 'route.key.dead.letter';
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// 主队列
const queueName = 'queue9';
channel.assertQueue(queueName, {
durable: true, // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
arguments: {
// 对应的死信交换机(空字符串表示默认交换机)
'x-dead-letter-exchange': deadExchangeName,
// 死信队列的路由键
'x-dead-letter-routing-key': deadRouteKey,
// 队列消息过期时间1分钟
'x-message-ttl': 60000,
},
});
channel.bindQueue(
queueName,
exchangeName,
routeKey,
{},
(err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
},
);
channel.consume(queueName, function (msg) {
if (Math.ceil(Math.random() * 100) > 50 && msg) {
console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
channel.ack(msg);
} else if (msg) {
console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
// 第二个参数,false拒绝当前消息
// 第二个参数,true拒绝小于等于当前消息
// 第三个参数,3false从队列中清除
// 第三个参数,4true从新在队列中排队
channel.nack(msg, false, false);
} else {
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false,
arguments: {}
}, (err: any, ok: Replies.Empty) => {
console.log(err, ok);
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
延迟队列
方法一
rabbitmq本身没有直接提供延迟队列,可以通过安装插件实现(注:目前支持到4.0.2)。
rabbitmq_delayed_message_exchange
文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
方法二
通过死信队列实现消息的延迟处理。主队列并不会被直接消费,而是通过设置 x-message-ttl
(即消息的过期时间)来控制消息的存活时间。消息在主队列中的 TTL 到期后,它会被转发到死信队列(Dead Letter Queue)中,在死信队列中处理消息。
相比于上述死信队列的实现,在consumer.ts中删除了channel.consume消息接收相关代码。
producer.ts
import RabbitMQ from 'amqplib';
async function start() {
try {
const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
return setTimeout(start, 1000);
});
console.log("[AMQP] connected");
let channel = null;
try {
channel = await conn.createChannel();
} catch (err) {
console.error("[AMQP]", err);
return setTimeout(start, 1000);
}
const exchangeName = 'exchange9';
await channel.assertExchange(
exchangeName,
'direct',
{
durable: true,
},
);
const deadExchangeName = 'exchange9_dead_letter';
await channel.assertExchange(
deadExchangeName,
'direct',
{
durable: true,
},
);
let routeKey = 'route.key';
for (let i = 0; i < 4; ++i) {
// console.log('message send!', channel.sendToQueue(
// queueName,
// Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
// { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
// // (err: any, ok: Replies.Empty)=>{}
// ));
let num = Math.ceil(Math.random() * 100000);
console.log('消息发送是否成功', num, routeKey, channel.publish(
exchangeName,
routeKey,
Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),
{
persistent: true,
},
));
}
setTimeout(() => {
conn.close();
process.exit(0);
}, 1000);
} catch (err) {
console.error("[AMQP]", err);
return setTimeout(start, 1000);
}
}
start();
consumer.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
console.log('[*] waiting...');
const exchangeName = 'exchange9';
const routeKey = 'route.key';
const deadExchangeName = 'exchange9_dead_letter';
const deadRouteKey = 'route.key.dead.letter';
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// 主队列
const queueName = 'queue10';
channel.assertQueue(queueName, {
durable: true, // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
arguments: {
// 对应的死信交换机(空字符串表示默认交换机)
'x-dead-letter-exchange': deadExchangeName,
// 死信队列的路由键
'x-dead-letter-routing-key': deadRouteKey,
// 队列消息过期时间10s
'x-message-ttl': 10000,
},
});
channel.bindQueue(
queueName,
exchangeName,
routeKey,
{},
(err, ok) => {
console.log(queueName, '队列绑定结果', err, ok);
},
);
// channel.consume(queueName, function (msg) {
// if (Math.ceil(Math.random() * 100) > 50 && msg) {
// console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
// channel.ack(msg);
// } else if (msg) {
// console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
// // 第二个参数,false拒绝当前消息
// // 第二个参数,true拒绝小于等于当前消息
// // 第三个参数,3false从队列中清除
// // 第三个参数,4true从新在队列中排队
// channel.nack(msg, false, false);
// } else {
// }
// }, {
// // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
// noAck: false,
// arguments: {}
// }, (err: any, ok: Replies.Empty) => {
// console.log(err, ok);
// });
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});
dead_letter.ts
import RabbitMQ, { type Replies } from 'amqplib/callback_api';
RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
if (err0) {
console.error(err0);
return;
}
conn.createChannel(function (err1, channel) {
console.log('[*] waiting...');
const deadExchangeName = 'exchange9_dead_letter';
const deadRouteKey = 'route.key.dead.letter';
// 一次只有一个未确认消息,防止消费者过载
channel.prefetch(1);
// 死信队列
const deadQueueName = 'queue11';
channel.assertQueue(deadQueueName, {
durable: true, // 保证死信队列持久化
});
channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);
channel.consume(deadQueueName, (msg) => {
console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());
if (msg) {
channel.ack(msg);
}
}, {
// noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
noAck: false,
arguments: {}
});
});
conn.on("error", function (err1) {
if (err1.message !== "Connection closing") {
console.error("[AMQP] conn error", err1.message);
}
});
conn.on("close", function () {
console.error("[AMQP] reconnecting");
});
});