当前位置: 首页 > article >正文

node == RabbitMQ入门教程

目录

RabbitMQ简介

为什么使用RabbitMQ?

安装并运行RabbitMQ

1.查找镜像

2.拉取镜像

3.查看镜像、启动并配置映射

4. 打开可视化页面

消息队列代码示例

生产者 

尝试使用面板 发送接收消息 

消费者 

消息持久化 

生产者

消费者

自动重连代码示例

消息错误处理

总结 

RabbitMQ方法

1. amqp.connect(url, callback)

2. connection.close(callback)

3.connection.createChannel(callback)

4.channel.assertQueue(queueName, options, callback)

5.channel.sendToQueue(queueName, content, options, callback)

6.channel.consume(queueName, callback, options, callback)

7.channel.ack(message, allUpTo)

8.channel.nack(message, allUpTo, requeue)

注意事项

为什么队列持久化和消息持久化都需要?

为什么单独消息持久化不够?

noAck: false 的含义


RabbitMQ简介

RabbitMQ是消息代理(Message Broker),它支持多种异步消息处理方式,最常见的有:

  • Work Queue:将消息缓存到一个队列,默认情况下,多个worker按照Round Robin的方式处理队列中的消息。每个消息只会分配给单个worker。
  • Publish/Subscribe:每个订阅消息的消费者都会收到消息,因此每个消息通常会分配给多个worker,每个worker对消息进行不同的处理。

RabbitMQ还支持Routing、Topics、以及Remote procedure calls (RPC)等方式。

对于不同的消息处理方式,有一点是相同的,RabbitMQ是介于消息的生产者和消费者的中间节点,负责缓存和分发消息。RabbitMQ接收来自生产者的消息,缓存到内存中,按照不同的方式分发给消费者。RabbitMQ还可以将消息写入磁盘,保证持久化,这样即使RabbitMQ意外崩溃了,消息数据不至于完全丢失。

为什么使用RabbitMQ?

最简单的一点在于,它支持Work Queue等不同的消息处理方式,可以用于不同的业务场景。

使用消息队列,可以将不算紧急、但是非常消耗资源的计算任务,以消息的方式插入到RabbitMQ的队列中,然后使用多个处理模块处理这些消息。

这样做最大的好处在于:提高了系统峰值处理能力。因为,来不及处理的消息缓存在RabbitMQ中,避免了同时进行大量计算导致系统因超负荷运行而崩溃。而那些来不及处理的消息,会在峰值过去之后慢慢处理掉。

另一个好处在于解耦。消息的生产者只需要将消息发送给RabbitMQ,这些消息什么时候处理完,不会影响生产者的响应性能。

安装并运行RabbitMQ

使用Docker运行RabbitMQ非常简单,只需要执行一条简单的命令:

1.查找镜像

docker search rabbitMq

2.拉取镜像

docker pull rabbitmq

3.查看镜像、启动并配置映射

 
docker run \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
--name mq \
--hostname localhost \
-p 15672:15672 \
-p 5672:5672 \
-d \
-v mq-data:/var/lib/rabbitmq \
rabbitmq

这里的 -v mq-data:/var/lib/rabbitmq 部分做了以下事情:

  • mq-data: 这是您为 Docker 卷指定的名称。您可以根据需要给它起任何名字。
  • /var/lib/rabbitmq: 这是 RabbitMQ 容器中的数据目录,您的数据将被存储在这里。

这个命令将启动一个名为 mq 的 RabbitMQ 容器,使用默认的 guest 用户和密码,并将容器的 15672 和 5672 端口映射到宿主机的相应端口。同时,它将创建一个名为 mq-data 的 Docker 卷,并将其挂载到容器的 /var/lib/rabbitmq 目录,以便持久化存储 RabbitMQ 的数据。

如果您想要在创建容器之前先创建卷,并查看卷的详细信息或者对其进行管理,您可以使用以下命令:

# 创建一个名为 mq-data 的 Docker 卷
docker volume create mq-data

# 查看所有 Docker 卷
docker volume ls

# 查看特定卷的详细信息
docker volume inspect mq-data

使用 Docker 卷的好处是,您可以更容易地管理数据,并且可以在不同的容器之间共享数据,或者在容器重新创建时保留数据。

4. 打开可视化页面

rabbitmq-plugins enable rabbitmq_management: 这个命令用于启用 RabbitMQ 的管理插件。

  • rabbitmq-plugins: 是 RabbitMQ 提供的一个命令行工具,用于管理 RabbitMQ 插件。
  • enable: 是 rabbitmq-plugins 工具的一个子命令,用于启用插件。
  • rabbitmq_management: 是 RabbitMQ 的管理插件,它提供了一个 Web UI 和 REST API,用于监控和管理 RabbitMQ 服务器。

接下来就可以访问页面端了

http://localhost:15672

username:guest

password:guest

消息队列代码示例

下面,我们使用Node.js实现一个简单消息队列。


 

生产者 

消息的生产者:send.js

  • 创建连接
  • 创建通道 发送消息必须有通道
  • 创建队列
  • 发送消息
  • 关闭通道
// 引入amqplib模块,用于与RabbitMQ服务器建立连接
var amqp = require("amqplib/callback_api");

// 尝试连接到RabbitMQ服务器(用户名 guest 密码 guest)
amqp.connect("amqp://guest:guest@localhost", function (error0, connection) {
  // 检查连接过程中是否有错误
  if (error0) {
    throw error0;
  }

  // 创建一个通道,通过通道可以 发送和接受消息
  connection.createChannel(function (error1, channel) {
    // 检查通道创建过程中是否有错误
    if (error1) {
      throw error1;
    }

    // 定义队列名称和消息内容
    var queue = "hello";
    var msg = "Hello World!";

    // 声明一个队列,如果队列不存在则创建它,如果队列存在,则不会创建新的队列
    channel.assertQueue(queue, {
      durable: false, // 队列不持久化 (如果设置为true则重启后不会丢失)
    });

    // 向队列发送消息
    channel.sendToQueue(queue, Buffer.from(msg));

    // 打印消息到控制台,确认消息已发送
    console.log(" [x] Sent %s", msg);
  });

  // 在500毫秒后关闭连接并退出进程
  setTimeout(function () {
    connection.close();
    process.exit(0);
  }, 500);
});

查看队列 

点击获取消息 可以看到刚才发送的Hello World消息

尝试使用面板 发送接收消息 

  • Publish message 发送消息
  • Payload: 发送内容
  • 点击发送,发送成功

  • get  Message 获取消息
  • Message获取的数量改为2

消费者 

  • 创建连接
  • 创建通道 接收消息必须有通道
  • 创建队列
  • 接收消息(自动确认)
// 引入amqplib模块,用于与RabbitMQ服务器建立连接
var amqp = require("amqplib/callback_api");

// 尝试连接到RabbitMQ服务器,携带用户名和密码
amqp.connect("amqp://guest:guest@localhost", function (error0, connection) {
  // 检查连接过程中是否有错误
  if (error0) {
    throw error0;
  }

  // 创建一个通道
  connection.createChannel(function (error1, channel) {
    // 检查通道创建过程中是否有错误
    if (error1) {
      throw error1;
    }

    // 定义队列名称
    var queue = "hello";

    // 声明一个队列,如果队列不存在则创建它
    channel.assertQueue(queue, {
      durable: false, // 队列不持久化
    });

    // 打印等待消息的日志
    console.log(" [*] 等待接收 %s 队列中的消息。要退出请按 CTRL+C", queue);

    // 从队列中消费消息
    channel.consume(
      queue,
      function (msg) {
        // 打印接收到的消息内容
        console.log(" [x] Received %s", msg.content.toString());
      },
      {
        // 表示自动确认消息,true为自动确认,false为手动确认
        //(如果设置为false,则需要手动确认消息,否则消息会被重复消费 例如channel.ack(msg))
        //   channel.ack(msg) 表示确认消息
        noAck: true,
      }
    );
  });
});

注意

因为没有关闭所以通道还在
处理完毕后在启动就不会接受了 因为自动确认了就不会在接受

消息持久化 

我们用到了amqplib模块,用于与RabbitMQ进行通信,对于具体接口的细节,可以查看文档。

在调用sendToQueue时,将persistent属性设为true,这样RabbitMQ关闭时,消息会被保存到磁盘。测试这一点很简单:

  • 关闭receiver
  • 启动sender,发送消息给RabbitMQ
  • 重启RabbitMQ(sudo docker restart rabbitmq)
  • 启动receiver,会发现它可以接收sender在RabbitMQ重启之前发送的消息

由于RabbitMQ容器将保存数据的目录以数据卷的形式保存在本地主机,因此即使将RabbitMQ容器删除(sudo docker rm -f rabbitmq)后重新运行,效果也是一样的。

另外,这段代码采用了Node.js最新的异步代码编写方式:Async/Await,因此非常简洁,感兴趣的同学可以了解一下。

生产者

const amqplib = require("amqplib");

(async () => {
  try {
    //   队列的名称
    const queue = "hello";
    // 消息的内容
    const msg = "Hello World!";

    // 连接到RabbitMQ服务器
    const conn = await amqplib.connect("amqp://guest:guest@localhost");

    //创建一个通道,通道是进行通信的基本单位,通道可以发送消息&&接收消息
    const ch1 = await conn.createChannel();

    // 声明一个队列,如果队列不存在则创建它,如果队列存在,则不会创建新的队列
    // (开启队列持久化,如果为true 重启后不会消失)
    await ch1.assertQueue(queue, { durable: true });

    // 发送消息 (队列名字,消息内容,消息持久化)
    ch1.sendToQueue(queue, Buffer.from(msg), { persistent: true });

    // 打印发送的消息
    console.log(" [x] Sent %s", msg);
  } catch (error) {
    console.log(error);
  }
})();

消费者

noAck 设置为false 需要手动确认消息(ch1.ack(msg))

如果不手动确认消息会还在会重复 

const amqplib = require("amqplib");

(async () => {
  try {
    //   队列的名称
    const queue = "hello";

    // 连接到RabbitMQ服务器
    const conn = await amqplib.connect("amqp://guest:guest@localhost");

    //创建一个通道,通道是进行通信的基本单位,通道可以发送消息&&接收消息
    const ch1 = await conn.createChannel();

    // 声明一个队列,如果队列不存在则创建它,如果队列存在,则不会创建新的队列
    // (开启队列持久化,如果为true 重启后不会消失)
    await ch1.assertQueue(queue, { durable: true });

    // 打印等待接收的消息
    console.log(" [*] 等待接收 %s 队列中的消息。要退出请按 CTRL+C", queue);

    // 从队列中消费消息
    ch1.consume(
      queue,
      function (msg) {
        console.log(" [x] Received %s", msg.content.toString());
        //   手动确认消息
        // ch1.ack(msg);
      }
      //   { noAck: true }
    );
  } catch (error) {
    console.log(error);
  }
})();

自动重连代码示例

在生产环境中,RabbitMQ难免会出现重启的情况,比如更换磁盘或者服务器、负载过高导致崩溃。因为RabbitMQ可以将消息写入磁盘,所以数据是”安全”的。但是,代码中必须实现自动重连机制,否则RabbitMQ停止时会导致Node.js应用崩溃。这里提供一个自动重连的代码示例,给大家参考:

消息生产者

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 连接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.sendToQueue(queue, new Buffer("Hello,word!"),
        {
            // RabbitMQ重启时,消息会被保存到磁盘
            persistent: true
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

消息消费者

const amqp = require("amqplib");

const queue = "demo";

var connection;

// 连接RabbitMQ
async function connectRabbitMQ()
{
    try
    {
        connection = await amqp.connect("amqp://localhost");
        console.info("connect to RabbitMQ success");

        const channel = await connection.createChannel();
        await channel.assertQueue(queue);
        await channel.consume(queue, async function(message)
        {
            console.log(message.content.toString());
            channel.ack(message);
        });

        connection.on("error", function(err)
        {
            console.log(err);
            setTimeout(connectRabbitMQ, 10000);
        });

        connection.on("close", function()
        {
            console.error("connection to RabbitQM closed!");
            setTimeout(connectRabbitMQ, 10000);
        });

    }
    catch (err)
    {
        console.error(err);
        setTimeout(connectRabbitMQ, 10000);
    }
}


connectRabbitMQ();

这样的话,即使RabbitMQ重启,sender和receiver也可以自动重新连接RabbitMQ。如果你希望监控RabbitMQ是否出错,

消息错误处理

nack 拒绝消息 

  • // msg: 消息对象,
  • // 第二个参数: true 小于等于当前消息的都会被拒绝,false 只拒绝当前消息
  • // 第三个参数: false 拒绝的消息是否重新入队,是true 则重新入队 false 则丢弃
    // 从队列中消费消息
    ch1.consume(
      queue,
      function (msg) {
        try {
          console.log(" [x] Received %s", msg.content.toString());
          //   手动确认消息
          ch1.ack(msg);
        } catch (error) {
          // 拒绝消息 (
          //     msg: 消息对象,
          //    第二个参数: true 小于等于当前消息的都会被拒绝,false 只拒绝当前消息
          //  第三个参数: false 拒绝的消息是否重新入队)
          ch1.nack(msg, false, false);
          console.log(error);
        }
      },
      { noAck: false } // 关闭自动确认消息
    );

总结 

RabbitMQ方法

1. amqp.connect(url, callback)

功能: 连接到 RabbitMQ 服务器。

  • url: 连接字符串,例如 amqp://guest:guest@localhost
  • callback(err, connection): 回调函数,err 为错误对象,connection 为连接对象。

返回: 连接对象。

2. connection.close(callback)

功能: 关闭连接。

参数callback(err) : 回调函数,err 为错误对象。

返回: 无。

3.connection.createChannel(callback)

功能: 创建一个通道。

参数callback(err, channel): 回调函数,err 为错误对象,channel 为通道对象。

返回: 通道对象。

4.channel.assertQueue(queueName, options, callback)

功能: 声明一个队列,如果队列不存在则创建它。

参数:

  • queueName: 队列名称。
  • options: 队列选项,例如 { durable: true } 表示队列持久化。
  • callback(err, ok): 回调函数,err 为错误对象,ok 为队列信息对象。

返回: 队列信息对象。

5.channel.sendToQueue(queueName, content, options, callback)

功能: 向队列发送消息。

参数:

  • queueName: 队列名称。
  • content: 消息内容,通常为 Buffer 对象。
  • options: 发送选项,例如 { persistent: true } 表示消息持久化。
  • callback(err): 回调函数,err 为错误对象。

返回: 无。

6.channel.consume(queueName, callback, options, callback)

功能: 从队列中消费消息。

参数:

  • queueName: 队列名称。
  • callback(msg): 消息处理回调函数,msg 为消息对象。
  • options: 消费选项,例如 { noAck: true } 表示不需要发送确认消息。
  • callback(err, ok): 回调函数,err 为错误对象,ok 为消费结果。

返回: 消费结果。

7.channel.ack(message, allUpTo)

功能: 确认消息已被处理。

参数:

  • message: 消息对象。
  • allUpTo: 是否确认所有未确认的消息。

返回: 无。

8.channel.nack(message, allUpTo, requeue)

功能: 拒绝消息。

参数:

  • message: 消息对象。
  • allUpTo: 是否拒绝所有未确认的消息。
  • requeue: 拒绝的消息是否重新入队。

返回: 无。

注意事项

直接发送到队列: 生产者和消费者必须使用同一个队列。

  • 队列 (Queue): 消息存储的地方。生产者将消息发送到队列,消费者从队列中获取消息
  • 生产者: 将消息发送到指定的队列。
  • 消费者: 从指定的队列中接收消息。

为什么队列持久化和消息持久化都需要?

队列持久化 (durable: true):

  • 确保队列存在: 即使 RabbitMQ 服务器重启,队列也不会丢失。
  • 一致性: 确保消息可以被正确地路由到预期的队列。

消息持久化 (persistent: true):

  • 确保消息存在: 即使 RabbitMQ 服务器重启,消息也不会丢失。

为什么单独消息持久化不够?

  • 队列不存在: 如果队列在服务器重启后不存在,即使消息被标记为持久化,这些消息也无法被存储在任何队列中,最终会被丢弃。
  • 路由问题: 如果队列在重启后被重新声明为非持久化队列,消息仍然会被丢弃,因为它们无法被正确路由到预期的队列。

noAck: false 的含义

  • 消息确认 (Acknowledgment): 当消费者成功处理完一条消息后,需要向 RabbitMQ 发送一个确认信号(acknowledgment)。这告诉 RabbitMQ 该消息已经被成功处理,可以安全地从队列中移除。
  • noAck: false: 设置为 false 表示消费者需要显式地发送确认信号。这意味着消费者必须手动调用 ack 方法来确认消息已被处理。
  • noAck: true: 设置为 true 表示消费者不需要发送确认信号。RabbitMQ 会在消息发送给消费者后立即认为该消息已被处理并从队列中移除,无论消费者是否真正处理了该消息。

http://www.kler.cn/a/419383.html

相关文章:

  • 解决jupyter notebook 新建或打开.ipynb 报500 : Internal Server Error(涉及jinja2兼容性问题)
  • phpmyadmin导出wordpress数据教程
  • 008静态路由-特定主机路由
  • vue 2 父组件根据注册事件,控制相关按钮显隐
  • 「Mac畅玩鸿蒙与硬件35」UI互动应用篇12 - 简易日历
  • 一体化数据安全平台uDSP 入选【年度创新安全产品 TOP10】榜单
  • 手机控制载货汽车一键启动无钥匙进入广泛应用
  • 综合实验——用户远程登陆并更改文件
  • 网络七层杀伤链
  • 网络安全-夜神模拟器如何通过虚拟机的Burp Suite代理应用程序接口
  • python学习笔记9-零散知识点
  • vue3 路由跳转携带参数以及其他页面接收参数
  • 数据库学习记录03
  • 鸿蒙开发:自定义一个任意位置弹出的Dialog
  • React第十节组件之间传值之context
  • 扩散模型赋能3D 视觉的综述报告
  • 通信原理实验:PCM编译码
  • CEF127 编译指南 Linux篇 - 安装Git和Python(三)
  • Supervisor使用教程
  • Ubuntu20.04离线安装全教程(包括DellR940重置Raid 5、安装Ubuntu、设置root、安装nvidia英伟达显卡驱动及设置防火墙白名单)
  • C#窗体小程序计算器
  • matlab2024a安装
  • PHP 去掉特殊不可见字符 “\u200e“
  • Electron + vue3 打包之后不能跳转路由
  • 【网络篇】HTTP知识
  • vue基础之2:搭建vue开发环境、Hello小案例