当前位置: 首页 > article >正文

C++使用开源ConcurrentQueue库处理自定义业务数据类

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。

ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在接收到四种不同的业务数据时,需要按数据分类写进RocketMQ。

0x02 自定义类用来存放和区分数据流

  • 设计BusinessFlowMsg
  1. 该类有定义消息的类型
  2. 该类中设计ST_BusinessSign结构体消息头,用来区分消息和获取消息体的长度
  3. BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP

#include <string.h>

#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04

#pragma pack(push)
#pragma pack(1)

// 消息头
typedef struct s_BusinessSign
{
    int sign; // 业务标识
    unsigned int length; // 消息体的长度
}ST_BusinessSign;

#pragma pack(pop)

class BusinessFlowMsg
{
public:
    BusinessFlowMsg() = default;
    ~BusinessFlowMsg() = default;

    char* get_data()
    {
        return _data;
    }

    int data_size()
    {
        return businessSign.length;
    }

    char* get_body()
    {
        return _data + sizeof(ST_BusinessSign);
    }

    int body_size()
    {
        return data_size() - sizeof(ST_BusinessSign);
    }

    ST_BusinessSign* header()
    {
        return &businessSign;
    }

    bool set_data(const char* data, int length, int sign)
    {
        if(length > (max_body_len + sizeof(ST_BusinessSign)))
        {
            return false;
        }

        businessSign.sign = sign;
        businessSign.length = length;
        memcpy(_data + sizeof(ST_BusinessSign), data, length);

        return true;
    }

private:
    enum
    {
        max_body_len = 512 * 1024 // 512KB
    };

    ST_BusinessSign businessSign;
    char _data[max_body_len];
};

#endif // BUSINESSFLOWMSG_HPP

0x03 创建PngUnit类模拟接到不同的业务数据

  • PngUnit类型创建了四个线程来模拟不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H

#include <thread>
#include <mutex>

class PngUnit
{
public:
    PngUnit();
    ~PngUnit() = default;
    void start();
    void sendPNG(int sign);
    void sendAIS(int sign);
    void sendRoute(int sign);
    void sendVoyage(int sign);

private:
    std::thread m_th_png;
    std::thread m_th_ais;
    std::thread m_th_route;
    std::thread m_th_voyage;

    // std::mutex queue_mutex;  // 互斥锁
};

#endif // PNGUNIT_H

#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"

PngUnit::PngUnit()
{

}

void PngUnit::start()
{
    // m_th = std::thread([this](){
    //     sendPNG(100);
    // });

    if(m_th_png.joinable())
    {
        printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");
        return;
    }

    m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));
    m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));
    m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));
    m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}

void PngUnit::sendPNG(int sign)
{
    while (true)
    {
        BusinessFlowMsg pngMsg;
        const char* pngFile = "1234567890ABCDEF";
        int fileLen = strlen(pngFile) + 1;

        pngMsg.set_data(pngFile, fileLen, sign);

        {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg))
            {
                printf("Failed to set PNG message data");
            }
        }

        sleep(1);
    }
}

void PngUnit::sendAIS(int sign)
{
    json11::Json::object obj = {
        {"message", "AIS"},
        {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
        BusinessFlowMsg aisMsg;
        int jsonStrLen = jsonStr.size();
        aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

        {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg))
            {
                printf("Failed to set AIS message data");
            }
        }

        sleep(2);
    }
}

void PngUnit::sendRoute(int sign)
{
    json11::Json::object obj = {
        {"message", "Route"},
        {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
        BusinessFlowMsg routeMsg;
        int jsonStrLen = jsonStr.size();
        routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

        {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg))
            {
                printf("Failed to set ROUTE message data");
            }
        }

        sleep(3);
    }
}

void PngUnit::sendVoyage(int sign)
{
    json11::Json::object obj = {
        {"message", "Voyage"},
        {"response", "success"}
    };
    std::string jsonStr = json11::Json(obj).dump();

    while (true)
    {
        BusinessFlowMsg voyageMsg;
        int jsonStrLen = jsonStr.size();
        voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);

        {
            // std::lock_guard<std::mutex> lock(queue_mutex);
            if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg))
            {
                printf("Failed to set VOYAGE message data");
            }
        }

        sleep(4);
    }
}

0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ

  • RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H

#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"

class RocketMQUtils
{
public:
    static RocketMQUtils* Instance();

private:
    RocketMQUtils();
    ~RocketMQUtils()=default;
    RocketMQUtils(const RocketMQUtils &) = delete;
    RocketMQUtils& operator=(const RocketMQUtils &) = delete;

    RocketMQUtils(RocketMQUtils &&) = delete;
    RocketMQUtils& operator=(RocketMQUtils &&) = delete;

public:
    void start();
    void push();
    void poll();
    bool write(char *data, int len, int sign);

public:
    moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;

private:
    static RocketMQUtils* _instance;

    std::thread _pushThread;
};

#endif // ROCKETMQUTILS_H

#include "rocketmqutils.h"
#include <unistd.h>

RocketMQUtils * RocketMQUtils::_instance = nullptr;

RocketMQUtils *RocketMQUtils::Instance()
{
    if(_instance == nullptr)
    {
        _instance = new RocketMQUtils();
    }
    return _instance;
}

RocketMQUtils::RocketMQUtils()
{

}

void RocketMQUtils::start()
{
    if(_pushThread.joinable())
    {
        return;
    }
    _pushThread = std::thread(&RocketMQUtils::push, this);
}

void RocketMQUtils::push()
{
    while (true)
    {
        BusinessFlowMsg busiMsg;

        if(g_businessQueue.try_dequeue(busiMsg))
        {
            write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);
        }
        else
        {
            printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");
            sleep(2);
        }
    }
}

void RocketMQUtils::poll()
{

}

bool RocketMQUtils::write(char *data, int len, int sign)
{
    std::string msg(data, len);
    if (sign == MSG_ROCKETMQ_PNG)
    {
        printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_AIS)
    {
        printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_ROUTE)
    {
        printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else if (sign == MSG_ROCKETMQ_VOYAGE)
    {
        printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());
    }
    else
    {
        printf("[%s:%d] data sign error\n", __FILE__, __LINE__);
    }
}

0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"

using namespace std;

int main()
{
    cout << "==Start==" << endl;
    RocketMQUtils* rocketmq = RocketMQUtils::Instance();
    rocketmq->start();

    std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();
    ptrPngUnit->start();

    getchar();

    cout << "==Over==" << endl;

    return 0;
}

在这里插入图片描述


http://www.kler.cn/a/388175.html

相关文章:

  • 在 PhpStorm 中配置命令行直接运行 PHP 的步骤
  • GaussDB SQL调优之改写SQL消除子查询
  • Android NDK开发实战之环境搭建篇(so库,Gemini ai)
  • 数据库回滚:大祸临头时
  • 怎么管理电脑usb接口,分享四种USB端口管理方法
  • 科普CMOS传感器的工作原理及特点
  • MySQL的其他函数
  • Oracle简介、环境搭建和基础DML语句
  • 网络安全从入门到精通(特别篇IIl):应急响应之病毒蠕虫处置流程
  • 深度学习-张量相关
  • 解决 IntelliJ IDEA Maven 项目 JDK 版本自动变为 1.5 的问题
  • 硬件设备网络安全问题与潜在漏洞分析及渗透测试应用
  • 开源竞争-利用kimi编程助手搭建小程序(11)
  • 解决编译 fast-lio-lc 算法时遇到的error方法
  • uniapp uni-calendar日历实现考勤统计功能
  • 大数据机器学习算法与计算机视觉应用04:多项式
  • macOS开发环境配置与应用开发(详细讲解)
  • 蔚来Android面试题及参考答案(3万字长文)
  • Python小白学习教程从入门到入坑------第二十九课 访问模式(语法进阶)
  • .NET WPF CommunityToolkit.Mvvm框架
  • Vue数据响应式原理
  • Cent OS-7的Apache服务配置
  • 【Python进阶】Python网络协议与套接字编程:构建客户端和服务器
  • IntelliJ IDEA快速接入LLMs大模型API
  • GISBox VS ArcGIS:分别适用于大型和小型项目的两款GIS软件
  • Gen-RecSys——一个通过生成和大规模语言模型发展起来的推荐系统