当前位置: 首页 > article >正文

在nodejs中使用RabbitMQ(一)安装,使用

安装 

1、安装RabbitMQ,推荐直接使用docker安装。

docker container run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v ./data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin1234 rabbitmq:4.0-management

5672端口,rabbitmq服务

15672端口,rabbitmq可视化管理界面 

2、windows,linux不用容器,可根据官网教程按步骤进行安装。Installing on RPM-based Linux | RabbitMQ

RabbitMQ 介绍

RabbitMQ 是一个开源的消息代理软件(也称为消息队列),它实现了高级消息队列协议(AMQP)。RabbitMQ 提供了可靠的消息传递机制,适用于构建分布式系统、微服务架构以及需要解耦组件的应用程序。它支持多种消息传递模式,并且具有高度的可扩展性和灵活性。官方还提供了多语言支持:Python,Java,Ruby,PHP,C#,JavaScript,Go。

RabbitMQ 的基本概念

1.生产者(Producer):左侧绿色方框代表消息的生产者,生产者将消息发送到 RabbitMQ 服务器。

2.连接(Connection)和通道(Channel):消息首先通过一个连接进入 RabbitMQ,连接内部包含多个通道。每个通道是一个轻量级的连接,用于减少开销并进行通信。

3.RabbitMQ 服务(RabbitMQ Server)和虚拟主机(Virtual Host):中央部分展示了 RabbitMQ 服务器及其内部结构。RabbitMQ 服务器中,可以创建多个虚拟主机(Virtual Host),每个虚拟主机是一个独立的消息命名空间。

4.交换器(Exchange):在每个虚拟主机内,有多个交换器。交换器负责接收来自生产者的消息,并根据预定的路由规则将消息分发到不同的队列。常见的交换机类型包括:

Direct:基于精确匹配的路由键进行消息路由。
Fanout:广播消息到所有绑定的队列。
Topic:基于通配符匹配的路由键进行消息路由。
Headers:基于消息头中的键值对进行消息路由。

5.队列(Queue):消息根据路由规则被分发到对应的队列中。队列用于存储和管理消息,等待消费者来获取消息。

6.消费者(Consumer):右侧黄色方框代表消费者。通过连接和通道,消费者从 RabbitMQ 服务器的队列中获取和处理消息。

 示例

一个生产者对应一个消费者。

 一个生产者对应多个消费者。

如何防止数据丢失

1、优先处理每一步错误,如,队列创建,exchange路由创建,消息是否发送成功。

2、持久化队列数据 durable: true,持久化队列中消息 persistent: true。

3、手动确认消息是否接收,在数据处理完后确认,channel.ack(msg)。 (注:要防止数据重复处理)
 

producer.ts 在代码中没有创建exchange,会使用默认exchange。

import RabbitMQ from 'amqplib/callback_api';


function start() {
  RabbitMQ.connect("amqp://admin:admin1234@localhost:5672?heartbeat=60", function (err0, conn) {
    if (err0) {
      console.error("[AMQP]", err0.message);
      return setTimeout(start, 1000);
    }

    conn.on("error", function (err1) {
      if (err1.message !== "Connection closing") {
        console.error("[AMQP] conn error", err1.message);
      }
    });

    conn.on("close", function () {
      console.error("[AMQP] reconnecting");
      return setTimeout(start, 1000);
    });

    console.log("[AMQP] connected");

    conn.createChannel(async (err2, channel) => {
      if (err2) {
        console.error("[AMQP]", err2.message);
        return setTimeout(start, 1000);
      }

      const queueName = 'queue1';

      // 创建一个队列
      channel.assertQueue(queueName, {
        durable: true, //队列持久化
      }, (err, ok) => {
        if (err) {
          console.log('队列创建失败!');
        }
        console.log(err, ok);
      });

      for (let i = 0; i < 30; ++i) {
        console.log('message send!', channel.sendToQueue(
          queueName,
          Buffer.from(`发送消息,${i}${Math.ceil(Math.random() * 100000)}`),
          { persistent: true, correlationId: 'ooooooooooooooo' },// 消息持久化,重启后存在
          // (err: any, ok: Replies.Empty)=>{}
        ));
      }
    });

    setTimeout(() => {
      conn.close();
      process.exit(0);
    }, 1000);
  });
}

start();

consumer.ts 消费者可以启动零到多个,因为设置了持久化如果没有消费者,数据会保存等待消费。

import RabbitMQ, { type Replies } from 'amqplib/callback_api';

RabbitMQ.connect('amqp://admin:admin1234@localhost:5672', (err0, conn) => {
  if (err0) {
    console.error(err0);
    return;
  }

  conn.createChannel(function (err1, channel) {
    const queueName = 'queue1';

    channel.assertQueue(queueName, { durable: true });

    console.log('[*] waiting...');

    // 一次只有一个未确认消息,防止消费者过载
    channel.prefetch(1);

    channel.consume(
      queueName,
      function (msg) {
        console.log('接收到的消息', msg, msg?.content.toString());

        /*  
         // 手动确认取消channel.ack(msg); noAck:false,
   
         // 自动确认消息
         // if (msg) {
         //   channel.ack(msg);
         // } 
        */
      },
      {
        noAck: true, // 是否自动确认消息
        // noAck: false
      },
      (err: any, ok: Replies.Empty) => {
        console.log(err, ok);
      },
    );

  });

  conn.on("error", function (err1) {
    if (err1.message !== "Connection closing") {
      console.error("[AMQP] conn error", err1.message);
    }
  });

  conn.on("close", function () {
    console.error("[AMQP] reconnecting");
  });
});


http://www.kler.cn/a/543251.html

相关文章:

  • MATLAB 生成脉冲序列 pulstran函数使用详解
  • 如何使用CSS Grid实现两列布局?
  • 星动纪元ERA-42:端到端原生机器人大模型的里程碑式突破
  • 【图片合并转换PDF】如何将每个文件夹下的图片转化成PDF并合并成一个文件?下面基于C++的方式教你实现
  • 运用 LangChain 编排任务处理流水线,实现多轮对话场景
  • [特殊字符] 基于 FastAPI 和 React 构建车牌号识别网站
  • SPI为什么不需要加上拉电阻
  • DeepSeek-V3网络模型架构图解
  • kafka介绍,kafka集群环境搭建,kafka命令测试,C++实现kafka客户端
  • 如何选择合适的搜索关键词优化工具?
  • 按键可视化工具——Keyviz
  • 开源堡垒机 JumpServer 社区版实战教程:一步步构建企业安全运维环境
  • SQL Server:查看当前连接数和最大连接数
  • 【Vue3 入门到实战】13. 常用 API
  • 探索技术新边界:让 HTML 电子凭证与二维码、PDF 完美融合
  • 网络安全 理清 安全 边界
  • 计算机毕业设计制造业MES生产管理平台 MES 生产制造源码+文档+运行视频+讲解视频)
  • 【前端】ES6新特性汇总
  • AI直播的未来:智能化、自动化与个性化并存
  • AI时代的前端开发:效率、协作与ScriptEcho
  • C++设计模式 —— 单例模式
  • TiDB Vector 本地部署的亲身体验与心得
  • Druid GetConnectionTimeoutException解决方案之一
  • 基础连接已经关闭: 服务器关闭了本应保持活动状态的连接
  • Cursor无法安装插件解决方法
  • 操作系统|ARM和X86的区别,存储,指令集