当前位置: 首页 > article >正文

C++开发基础之使用librabbitmq库实现RabbitMQ消息队列通信

在这里插入图片描述

1. 前言

RabbitMQ是一个流行的开源消息队列系统,支持多种消息协议,广泛用于构建分布式系统和微服务架构。可以在不同应用程序之间实现异步消息传递。在本文中,我们将熟悉如何使用C++与RabbitMQ进行消息通信。

2. 准备工作

在 Windows 平台上通过 vcpkg 安装 librabbitmq,并在 C++ 中使用该库实现 RabbitMQ 消息的发布和接收。librabbitmq 是官方的 C 客户端库,支持与 RabbitMQ 服务器的通信。在这里插入图片描述

2.1 安装 vcpkg

如果还没有安装 vcpkg,请按照以下步骤安装:

  1. 克隆 vcpkg 仓库:

    git clone https://github.com/microsoft/vcpkg.git
    
  2. 进入 vcpkg 目录并运行安装脚本:

    cd vcpkg
    .\bootstrap-vcpkg.bat
    
  3. 使用 vcpkg 安装 RabbitMQ C 库(librabbitmq):

    vcpkg install librabbitmq
    

在这里插入图片描述

安装完成后,将 vcpkg 集成到项目中:

vcpkg integrate install

这样,librabbitmq 库会被自动链接到 Visual Studio 中的 C++ 项目。
在这里插入图片描述

2.2 配置 Visual Studio 项目

在 Visual Studio 中创建新的 C++ 项目,确保项目中包含了 vcpkg 的安装路径。vcpkg 会自动配置项目,使其能够找到并链接到 librabbitmq 库。并且链接器附加依赖项添加rabbitmq.4.lib便于程序查找rabbitmq.4.dll库引用。
在这里插入图片描述

3. 实现消息发送和接收程序

我们将编写两个程序,一个用于发送消息,一个用于接收消息。这些程序将演示如何使用 librabbitmq 库连接到 RabbitMQ 服务器、声明交换机、绑定队列并发送或接收消息。

3.1 启动rabbitmq Server

在这里插入图片描述

3.2 发送消息的程序(Producer)

以下是消息发送者的完整代码,它会循环发送多条消息到指定的 RabbitMQ 交换机和队列。

#include <iostream>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>
#include <thread>
#include <chrono>

// 用于处理 AMQP 错误并输出错误信息
void die_on_error(amqp_rpc_reply_t x, const char* context) {
    if (x.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Error in " << context << ": "
                  << amqp_error_string2(x.library_error) << std::endl;
        exit(1);
    }
}

int main() {
    const std::string hostname = "localhost";  // RabbitMQ 服务器地址
    const int port = 5672;  // RabbitMQ 默认端口
    const std::string exchange = "example_exchange";  // 交换机名称
    const std::string routing_key = "example_key";  // 路由键,用于绑定队列

    // 初始化连接
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);
    if (!socket) {
        std::cerr << "Creating TCP socket failed" << std::endl;
        return 1;
    }

    // 打开 TCP 连接
    int status = amqp_socket_open(socket, hostname.c_str(), port);
    if (status) {
        std::cerr << "Opening TCP socket failed" << std::endl;
        return 1;
    }

    // 登录 RabbitMQ
    die_on_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
    amqp_channel_open(conn, 1);  // 打开信道
    die_on_error(amqp_get_rpc_reply(conn), "Opening channel");

    // 声明交换机(类型为 direct)
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"),
                          0, 0, 0, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");

    // 循环发送多条消息
    for (int i = 1; i <= 1000; ++i) {  // 发送 1000 条消息
        std::string message = "Hello, RabbitMQ! Message number: " + std::to_string(i);
        amqp_bytes_t message_bytes = amqp_cstring_bytes(message.c_str());

        // 设置消息属性
        amqp_basic_properties_t props;
        props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG;
        props.content_type = amqp_cstring_bytes("text/plain");
        props.delivery_mode = 2;  // 持久化模式

        // 发送消息到交换机
        int result = amqp_basic_publish(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()),
                                        0, 0, &props, message_bytes);
        if (result < 0) {
            std::cerr << "Error publishing message " << i << std::endl;
        } else {
            std::cout << "Message " << i << " published: " << message << std::endl;
        }

        // 每次发送后等待 1 秒
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    // 清理连接
    amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS);
    amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
    amqp_destroy_connection(conn);

    return 0;
}

执行结果
在这里插入图片描述

3.3 接收消息的程序(Consumer)

以下是接收消息的完整代码,使用 amqp_consume_message 接收并打印消息内容。

#include <iostream>
#include <string>
#include <amqp.h>
#include <amqp_tcp_socket.h>

// 错误处理函数,用于输出错误信息
void die_on_error(amqp_rpc_reply_t x, const char* context) {
    if (x.reply_type != AMQP_RESPONSE_NORMAL) {
        std::cerr << "Error in " << context << ": "
                  << amqp_error_string2(x.library_error) << std::endl;
        exit(1);
    }
}

int main() {
    const std::string hostname = "localhost";  // RabbitMQ 服务器地址
    const int port = 5672;  // 端口
    const std::string queue = "example_queue";  // 队列名称
    const std::string exchange = "example_exchange";  // 交换机名称
    const std::string routing_key = "example_key";  // 路由键

    // 初始化连接
    amqp_connection_state_t conn = amqp_new_connection();
    amqp_socket_t* socket = amqp_tcp_socket_new(conn);

    if (!socket) {
        std::cerr << "Creating TCP socket failed" << std::endl;
        return 1;
    }

    // 打开 TCP 连接
    int status = amqp_socket_open(socket, hostname.c_str(), port);
    if (status) {
        std::cerr << "Opening TCP socket failed" << std::endl;
        return 1;
    }

    // 登录 RabbitMQ
    die_on_error(amqp_login(conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest"), "Logging in");
    amqp_channel_open(conn, 1);
    die_on_error(amqp_get_rpc_reply(conn), "Opening channel");

    // 声明交换机和队列,并绑定队列到交换机
    amqp_exchange_declare(conn, 1, amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes("direct"),
                          0, 0, 0, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Declaring exchange");

    amqp_queue_declare_ok_t* q = amqp_queue_declare(conn, 1, amqp_cstring_bytes(queue.c_str()), 0, 0, 0, 1, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Declaring queue");

    amqp_queue_bind(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_cstring_bytes(exchange.c_str()), amqp_cstring_bytes(routing_key.c_str()), amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Binding queue");

    // 开始消费消息
    amqp_basic_consume(conn, 1, amqp_cstring_bytes(queue.c_str()), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
    die_on_error(amqp_get_rpc_reply(conn), "Consuming");

    while (true) {
        amqp_rpc_reply_t res;
        amqp_envelope_t envelope;

        // 释放资源
        amqp_maybe_release_buffers(conn);
        res = amqp_consume_message(conn, &envelope, NULL, 0);

        // 检查并打印接收到的消息
        if (res.reply_type == AMQP_RESPONSE_NORMAL) {
            std::cout << "Received: " << std::string((char*)envelope.message.body.bytes, envelope.message.body.len) << std::endl;
            amqp_destroy_envelope(&envelope);
        } else {
            std::cerr << "Error consuming message" << std::endl;
            break;
        }
    }

执行结果
在这里插入图片描述
可以查看RabbitMQ的webUI,了解消息的投递和消费情况
在这里插入图片描述

4. 总结

我们已完成了在 Windows 平台上通过 vcpkg 安装 librabbitmq 并用 C++ 实现 RabbitMQ 消息发送和接收的完整教程。


http://www.kler.cn/a/388680.html

相关文章:

  • Nuxt 版本 2 和 版本 3 的区别
  • 微擎框架php7.4使用phpexcel导出数据报错修复
  • 【图像压缩感知】论文阅读:Self-supervised Scalable Deep Compressed Sensing
  • 项目集章程program charter
  • -1大于4?负数与无符号整数类型:size_t的比较问题(strlen)
  • FatLab:我的编程课程系列
  • ScheduledThreadPoolExecutor 定制化线程池任务调度及起底层原理
  • tcpdump 是一款功能强大的网络数据包分析工具
  • Centos安装Minio
  • Spring Boot中实现多数据源连接和切换的方案
  • QML-简单项目实战一
  • 【系统架构设计师(第2版)】五、软件工程基础知识
  • Chromium127编译指南 Mac篇(五)- 编译Chromium
  • SpringBoot连接多个redis
  • 基于SpringBoot和Vue的公司文档管理系统设计与开发(源码+定制+开发)
  • 如何使用Spring Cloud Gateway实现一个最简单的API网关示例
  • Docker入门系列——DockerFile的使用
  • 游戏引擎学习第四天
  • 杂七杂八之Swagger环境搭建(Java版本)
  • Spring Boot实现文件上传与OSS集成:从基础到应用
  • Go 使用 Redis 实现分布式锁
  • OpenAI CEO阿尔特曼预测AGI可能在五年内出现 对社会的影响远小于预期
  • ECharts 实现大屏地图功能
  • Kafka java 配置
  • Transformer-GRU、Transformer、CNN-GRU、GRU、CNN五模型多变量回归预测
  • Python的函数(补充浅拷贝和深拷贝)