当前位置: 首页 > article >正文

在nodejs中使用RabbitMQ(五)死信队列,延迟队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种机制,用于处理无法成功消费或不能按预期处理的消息。简单来说,死信队列用于存储那些不能被正常消费或处理的消息,以便后续审查或重新处理。

死信队列用于处理以下几种情况的消息:

1、消息过期:如果消息的 TTL(存活时间)到期,消息会被认为是死信,自动转发到死信队列。

2、消息被拒绝(nack:消费者拒绝消息时,如果消息被标记为不重新排队(requeue: false),消息会被送往死信队列。

3、队列满:如果队列已经达到最大长度,且还有新消息进入,RabbitMQ 会丢弃老的消息并将其送到死信队列。

4、消息无法路由:如果消息没有匹配到任何队列的路由规则,它会被送往死信队列。

配置死信队列

RabbitMQ 提供了两种主要机制来设置死信队列:

1、死信交换机(Dead Letter Exchange,DLX): 

  • 设置一个专门的交换机(Dead Letter Exchange,DLX),并将死信消息转发到该交换机。
  • 可以为队列设置 x-dead-letter-exchange 属性来指定死信交换机。

    2、死信路由键(Dead Letter Routing Key): 

    • 可以设置死信消息的路由键,以便将消息路由到适当的死信队列。
    • 通过 x-dead-letter-routing-key 属性来配置。

    3、 消息过期时间(TTL):

    • 你可以设置队列的消息存活时间(TTL)。消息在过期后会变为死信并转发到死信队列。
    • 通过 x-message-ttl 属性来设置消息的生存时间。

    死信队列创建

    1、一个生产者1,一个exchange交换机1,一个消费者1(主队列)。

    2、一个exchange交换机2,一个消费者2(死信队列)。

    3、消费者1要配置队列参数'x-dead-letter-exchange','x-dead-letter-routing-key', 'x-message-ttl'。

    生产者1通过exchange1发送消息给消费者1,如果消息不能正常处理,会通过exchange2转发给消费者2. 

    示例

    一个生产者,将消息发送给两个消费者,消费者消息如果处理失败,会将消息转发给死信队列。

    producer.ts

    import RabbitMQ from 'amqplib';
    
    
    async function start() {
      try {
        const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");
    
        conn.on("error", function (err1) {
          if (err1.message !== "Connection closing") {
            console.error("[AMQP] conn error", err1.message);
          }
        });
    
        conn.on("close", function () {
          console.error("[AMQP] reconnecting");
          return setTimeout(start, 1000);
        });
    
        console.log("[AMQP] connected");
    
        let channel = null;
        try {
          channel = await conn.createChannel();
        } catch (err) {
          console.error("[AMQP]", err);
          return setTimeout(start, 1000);
        }
    
        const exchangeName = 'exchange8';
        await channel.assertExchange(
          exchangeName,
          'direct',
          {
            durable: true,
          },
        );
    
        const deadExchangeName = 'exchange8_dead_letter';
        await channel.assertExchange(
          deadExchangeName,
          'direct',
          {
            durable: true,
          },
        );
    
        let routeKey = 'route.key';
        for (let i = 0; i < 4; ++i) {
          // console.log('message send!', channel.sendToQueue(
          //   queueName,
          //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
          //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
          //   // (err: any, ok: Replies.Empty)=>{}
          // ));
    
          let num = Math.ceil(Math.random() * 100000);
          console.log('消息发送是否成功', num, routeKey, channel.publish(
            exchangeName,
            routeKey,
            Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),
            {
              persistent: true,
            },
          ));
        }
    
        setTimeout(() => {
          conn.close();
          process.exit(0);
        }, 1000);
      } catch (err) {
        console.error("[AMQP]", err);
        return setTimeout(start, 1000);
      }
    }
    
    start();
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';
    
    
    RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
      if (err0) {
        console.error(err0);
        return;
      }
    
      conn.createChannel(function (err1, channel) {
        console.log('[*] waiting...');
    
        const deadExchangeName = 'exchange8_dead_letter';
        const deadRouteKey = 'route.key.dead.letter';
    
        // 一次只有一个未确认消息,防止消费者过载
        channel.prefetch(1);
    
        // 死信队列
        const deadQueueName = 'queue8';
        channel.assertQueue(deadQueueName, {
          durable: true,  // 保证死信队列持久化
        });
    
        channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);
    
        channel.consume(deadQueueName, (msg) => {
          console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());
          if (msg) {
            channel.ack(msg);
          }
        }, {
          // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
          noAck: false,
          arguments: {}
        });
      });
    
      conn.on("error", function (err1) {
        if (err1.message !== "Connection closing") {
          console.error("[AMQP] conn error", err1.message);
        }
      });
    
      conn.on("close", function () {
        console.error("[AMQP] reconnecting");
      });
    });
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';
    
    
    RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
      if (err0) {
        console.error(err0);
        return;
      }
    
      conn.createChannel(function (err1, channel) {
        console.log('[*] waiting...');
    
        const exchangeName = 'exchange8';
        const routeKey = 'route.key';
        const deadExchangeName = 'exchange8_dead_letter';
        const deadRouteKey = 'route.key.dead.letter';
    
        // 一次只有一个未确认消息,防止消费者过载
        channel.prefetch(1);
    
        // 主队列
        const queueName = 'queue7';
        channel.assertQueue(queueName, {
          durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
          arguments: {
            // 对应的死信交换机(空字符串表示默认交换机)
            'x-dead-letter-exchange': deadExchangeName,
            // 死信队列的路由键
            'x-dead-letter-routing-key': deadRouteKey,
            // 队列消息过期时间1分钟
            'x-message-ttl': 60000,
          },
        });
    
        channel.bindQueue(
          queueName,
          exchangeName,
          routeKey,
          {},
          (err, ok) => {
            console.log(queueName, '队列绑定结果', err, ok);
          },
        );
    
        channel.consume(queueName, function (msg) {
    
          if (Math.ceil(Math.random() * 100) > 50 && msg) {
            console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
            channel.ack(msg);
          } else if (msg) {
            console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
            // 第二个参数,false拒绝当前消息
            // 第二个参数,true拒绝小于等于当前消息
            // 第三个参数,3false从队列中清除
            // 第三个参数,4true从新在队列中排队
            channel.nack(msg, false, false);
          } else {
    
          }
        }, {
          // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
          noAck: false,
          arguments: {}
        }, (err: any, ok: Replies.Empty) => {
          console.log(err, ok);
        });
    
      });
    
      conn.on("error", function (err1) {
        if (err1.message !== "Connection closing") {
          console.error("[AMQP] conn error", err1.message);
        }
      });
    
      conn.on("close", function () {
        console.error("[AMQP] reconnecting");
      });
    });
    

    consumer2.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';
    
    
    RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
      if (err0) {
        console.error(err0);
        return;
      }
    
      conn.createChannel(function (err1, channel) {
        console.log('[*] waiting...');
    
        const exchangeName = 'exchange8';
        const routeKey = 'route.key';
        const deadExchangeName = 'exchange8_dead_letter';
        const deadRouteKey = 'route.key.dead.letter';
    
        // 一次只有一个未确认消息,防止消费者过载
        channel.prefetch(1);
    
        // 主队列
        const queueName = 'queue9';
        channel.assertQueue(queueName, {
          durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
          arguments: {
            // 对应的死信交换机(空字符串表示默认交换机)
            'x-dead-letter-exchange': deadExchangeName,
            // 死信队列的路由键
            'x-dead-letter-routing-key': deadRouteKey,
            // 队列消息过期时间1分钟
            'x-message-ttl': 60000,
          },
        });
    
        channel.bindQueue(
          queueName,
          exchangeName,
          routeKey,
          {},
          (err, ok) => {
            console.log(queueName, '队列绑定结果', err, ok);
          },
        );
    
        channel.consume(queueName, function (msg) {
    
          if (Math.ceil(Math.random() * 100) > 50 && msg) {
            console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
            channel.ack(msg);
          } else if (msg) {
            console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
            // 第二个参数,false拒绝当前消息
            // 第二个参数,true拒绝小于等于当前消息
            // 第三个参数,3false从队列中清除
            // 第三个参数,4true从新在队列中排队
            channel.nack(msg, false, false);
          } else {
    
          }
        }, {
          // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
          noAck: false,
          arguments: {}
        }, (err: any, ok: Replies.Empty) => {
          console.log(err, ok);
        });
    
      });
    
      conn.on("error", function (err1) {
        if (err1.message !== "Connection closing") {
          console.error("[AMQP] conn error", err1.message);
        }
      });
    
      conn.on("close", function () {
        console.error("[AMQP] reconnecting");
      });
    });
    

     延迟队列

    方法一

    rabbitmq本身没有直接提供延迟队列,可以通过安装插件实现(注:目前支持到4.0.2)。

    rabbitmq_delayed_message_exchange

     文档:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

     方法二

     通过死信队列实现消息的延迟处理。主队列并不会被直接消费,而是通过设置 x-message-ttl(即消息的过期时间)来控制消息的存活时间。消息在主队列中的 TTL 到期后,它会被转发到死信队列(Dead Letter Queue)中,在死信队列中处理消息。

    相比于上述死信队列的实现,在consumer.ts中删除了channel.consume消息接收相关代码。

     producer.ts

    import RabbitMQ from 'amqplib';
    
    
    async function start() {
      try {
        const conn = await RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60");
    
        conn.on("error", function (err1) {
          if (err1.message !== "Connection closing") {
            console.error("[AMQP] conn error", err1.message);
          }
        });
    
        conn.on("close", function () {
          console.error("[AMQP] reconnecting");
          return setTimeout(start, 1000);
        });
    
        console.log("[AMQP] connected");
    
        let channel = null;
        try {
          channel = await conn.createChannel();
        } catch (err) {
          console.error("[AMQP]", err);
          return setTimeout(start, 1000);
        }
    
        const exchangeName = 'exchange9';
        await channel.assertExchange(
          exchangeName,
          'direct',
          {
            durable: true,
          },
        );
    
        const deadExchangeName = 'exchange9_dead_letter';
        await channel.assertExchange(
          deadExchangeName,
          'direct',
          {
            durable: true,
          },
        );
    
        let routeKey = 'route.key';
        for (let i = 0; i < 4; ++i) {
          // console.log('message send!', channel.sendToQueue(
          //   queueName,
          //   Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
          //   { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
          //   // (err: any, ok: Replies.Empty)=>{}
          // ));
    
          let num = Math.ceil(Math.random() * 100000);
          console.log('消息发送是否成功', num, routeKey, channel.publish(
            exchangeName,
            routeKey,
            Buffer.from(`"发送消息, index:${i}, number:${num}, routeKey:${JSON.stringify(routeKey)}"`),
            {
              persistent: true,
            },
          ));
        }
    
        setTimeout(() => {
          conn.close();
          process.exit(0);
        }, 1000);
      } catch (err) {
        console.error("[AMQP]", err);
        return setTimeout(start, 1000);
      }
    }
    
    start();
    

    consumer.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';
    
    
    RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
      if (err0) {
        console.error(err0);
        return;
      }
    
      conn.createChannel(function (err1, channel) {
        console.log('[*] waiting...');
    
        const exchangeName = 'exchange9';
        const routeKey = 'route.key';
        const deadExchangeName = 'exchange9_dead_letter';
        const deadRouteKey = 'route.key.dead.letter';
    
        // 一次只有一个未确认消息,防止消费者过载
        channel.prefetch(1);
    
        // 主队列
        const queueName = 'queue10';
        channel.assertQueue(queueName, {
          durable: true,  // 队列是否持久化,确保队列在 RabbitMQ 重启后仍然存在
          arguments: {
            // 对应的死信交换机(空字符串表示默认交换机)
            'x-dead-letter-exchange': deadExchangeName,
            // 死信队列的路由键
            'x-dead-letter-routing-key': deadRouteKey,
            // 队列消息过期时间10s
            'x-message-ttl': 10000,
          },
        });
    
        channel.bindQueue(
          queueName,
          exchangeName,
          routeKey,
          {},
          (err, ok) => {
            console.log(queueName, '队列绑定结果', err, ok);
          },
        );
    
        // channel.consume(queueName, function (msg) {
    
        //   if (Math.ceil(Math.random() * 100) > 50 && msg) {
        //     console.log(`队列'${queueName}'接收到的消息`, msg?.content.toString());
        //     channel.ack(msg);
        //   } else if (msg) {
        //     console.log(`队列'${queueName}'拒绝接收消息`, msg?.content.toString());
        //     // 第二个参数,false拒绝当前消息
        //     // 第二个参数,true拒绝小于等于当前消息
        //     // 第三个参数,3false从队列中清除
        //     // 第三个参数,4true从新在队列中排队
        //     channel.nack(msg, false, false);
        //   } else {
    
        //   }
        // }, {
        //   // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
        //   noAck: false,
        //   arguments: {}
        // }, (err: any, ok: Replies.Empty) => {
        //   console.log(err, ok);
        // });
    
      });
    
      conn.on("error", function (err1) {
        if (err1.message !== "Connection closing") {
          console.error("[AMQP] conn error", err1.message);
        }
      });
    
      conn.on("close", function () {
        console.error("[AMQP] reconnecting");
      });
    });
    

    dead_letter.ts

    import RabbitMQ, { type Replies } from 'amqplib/callback_api';
    
    
    RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
      if (err0) {
        console.error(err0);
        return;
      }
    
      conn.createChannel(function (err1, channel) {
        console.log('[*] waiting...');
    
        const deadExchangeName = 'exchange9_dead_letter';
        const deadRouteKey = 'route.key.dead.letter';
    
        // 一次只有一个未确认消息,防止消费者过载
        channel.prefetch(1);
    
        // 死信队列
        const deadQueueName = 'queue11';
        channel.assertQueue(deadQueueName, {
          durable: true,  // 保证死信队列持久化
        });
    
        channel.bindQueue(deadQueueName, deadExchangeName, deadRouteKey);
    
        channel.consume(deadQueueName, (msg) => {
          console.log(`死信队列'${deadQueueName}'接收到的消息:`, msg?.content.toString());
          if (msg) {
            channel.ack(msg);
          }
        }, {
          // noAck: true, // 是否自动确认消息,为true不需要调用channel.ack(msg);
          noAck: false,
          arguments: {}
        });
      });
    
      conn.on("error", function (err1) {
        if (err1.message !== "Connection closing") {
          console.error("[AMQP] conn error", err1.message);
        }
      });
    
      conn.on("close", function () {
        console.error("[AMQP] reconnecting");
      });
    });
    

     

     


    http://www.kler.cn/a/547730.html

    相关文章:

  • 【DeepSeek】Ollama部署本地大模型DeepSeek-R1,交互界面Open-WebUI,RagFlow构建私有知识库
  • 类型通配符上限
  • Brian Kernighan 算法
  • HTML,API,RestFul API基础
  • 科普:“表格式 ”与“ 存储格式”
  • 【环境配置】Ubuntu 22.04 C++ header file not found using Vim with YouCompleteMe
  • Redis 设置密码无效问题解决
  • c++--变量内存分配
  • 【开源项目】Excalidraw手绘风格白板(保姆级)教程
  • 如何学BI大数据
  • Html、Markdown的信息提取
  • LabVIEW 中 dotnet.llb 库功能
  • 05-服务保护和分布式事务(Sentinel、Seata)
  • Linux文件管理:硬链接与软链接
  • 图论 - 一些经典小算法思想(无题目例子)
  • 《open3d qt 网格泊松采样成点云》
  • 关于Dest1ny:我的创作纪念日
  • JavaScript原型和原型链
  • 代码随想录二刷|动态规划3
  • 鸿蒙Harmony-应用状态-AppStorage详细介绍