在nodejs中使用RabbitMQ(七)实现生产者确认
-
生产者:批量发送消息(每批10条),每条消息附带唯一
correlationId
,并监听确认队列(ackQueue
)。 -
消费者:处理消息后,通过
ackQueue
返回确认消息(携带原correlationId
)。 -
超时重试:若某批消息在指定时间内未全部确认,未确认的消息会重新加入待发送队列。
producer.ts
import amqp from 'amqplib';
async function start() {
const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');
const channel = await connection.createChannel();
const queue = 'queue11';
const ackQueue = 'queue11_ack';
await channel.assertQueue(queue, { durable: true });
await channel.assertQueue(ackQueue, { durable: true });
async function produce(limit: number, data: string[], timeout: number = 10000) {
let message = [...data];
if (message.length > limit) {
message = message.slice(0, limit);
} else if (message.length < limit) {
limit = message.length;
}
// 消息确认
let cache: Array<{
correlationId: string,
message: string,
isDelete: boolean,
}> = new Array(limit)
.fill(null)
.map((_, index) => {
return {
correlationId: Math.random().toString().slice(2, -1),
message: message[index],
isDelete: false,
};
});
for (let i = 0; i < limit; ++i) {
channel.sendToQueue(queue, Buffer.from(cache[i].message), {
correlationId: cache[i].correlationId,
replyTo: ackQueue
});
}
const consume = await channel.consume(ackQueue, (message) => {
if (!message) {
console.error('message is null', message);
return;
}
let index = cache.findIndex((item) => item.correlationId === message.properties.correlationId);
if (index !== -1) {
cache[index].isDelete = true;
console.log('confirmed success:', `"${message.content.toString()}"`, cache.every(item => item.isDelete));
} else {
console.log('confirmed fail:', `"${message.content.toString()}"`, cache, cache.every(item => item.isDelete), message.properties.correlationId);
}
channel.ack(message);
});
const sleep = (time: number) => {
return new Promise<void>(resolve => setTimeout(() => resolve(), time));
}
let stop = false;
const interval = async () => {
await sleep(0);
if (cache.every(item => item.isDelete) || stop) {
return;
} else {
await interval();
}
}
await Promise.race([
interval(), // 监听本批次消息是否已经处理完成
sleep(timeout), // 本批次消息最长处理时间
]);
stop = true;
await channel.cancel(consume.consumerTag);
// 没有收到确认的消息返回下一批处理继续处理
return cache.filter(item => !item.isDelete).map(item => item.message);
}
// 发送1000条数据,分100批,每批10个
let msg = new Array(100).fill(null).map((_, index) => `${index} message ${Math.random().toString().slice(2, -1)}`);
while (msg.length) {
let res = await produce(10, msg.slice(0, 10), 6000);
msg = [...res, ...msg.slice(10, msg.length)];
console.log('完成一批:', msg.length, '发送结果:', res.length, res);
}
}
start();
consumer.ts
import amqp from 'amqplib';
async function produce() {
const connection = await amqp.connect('amqp://admin:admin1234@localhost:5672//mirror?heartbeat=60');
const channel = await connection.createChannel();
const queue = 'queue11';
const ackQueue = 'queue11_ack';
await channel.assertQueue(queue, { durable: true });
await channel.assertQueue(ackQueue, { durable: true });
channel.consume(queue, (message) => {
if (message) {
console.log(message?.content.toString(), message?.properties?.replyTo, message?.properties?.correlationId);
// 消息处理完后,向 ackQueue 发送确认消息
channel.sendToQueue(ackQueue, message?.content, {
// 使用相同的 correlationId 来标识确认消息
correlationId: message?.properties?.correlationId,
// 将原 replyTo 信息传递回来
// replyTo: queue,
});
// 确认 queue11 中的消息
channel.ack(message);
} else {
console.error('message is null', message);
}
}, { noAck: false });
}
produce();