当前位置: 首页 > article >正文

【RabbitMQ 项目】服务端:消费者管理模块

文章目录

  • 一.概念辨析
    • 1.什么是消费者?
    • 2.服务端为什么要管理消费者?
    • 3.怎么管理消费者?
    • 4.需要管理生产者吗?
  • 二.编写思路
    • 1.定义消费者
    • 2.定义队列消费者管理管理类
    • 3.定义消费者管理类
  • 三.代码实践

一.概念辨析

1.什么是消费者?

在服务端的视角,每收到一个订阅队列的请求,就多了一个消费者。在消费客户端同样如此,每订阅一个队列,就多了一个消费者。所以这里的消费者指的不是消费客户端,而是可以从指定队列中取走消息的角色。

一个消费客户端中可以有多个消费者,因为可以同时订阅多个队列。消费者和队列是绑定在一起的,不过要注意,一个消费者对应一个队列,但一个队列可以对应多个消费者,这样就可以负载均衡式选择消费者消费消息

2.服务端为什么要管理消费者?

服务端为什么要有消费者?
消费客户端通过信道订阅了一个队列,你需要把这个操作记录下来,以便后续知道消息要推送给哪个客户端的哪个信道。怎么记录呢?新增一个消费者。所以消费者就是一个表征订阅队列的角色
为什么要管理消费者?
当一个队列中新增消息后,就要选择一个订阅该队列的消费者进行消费,服务端肯定不止一个消费者,所以自然要管理起来

3.怎么管理消费者?

消费者与信道,队列都有关,信道关闭,信道内的消费者也要销毁。队列中有新消息,需要选择一个与关联的消费者推送消息。
所以既可以以队列为单元管理消费者,也可以连接为单元管理,选择哪个呢?
我选择的是以队列为单元管理,因为选择消费者推送消息这个动作很频繁,以队列为单元管理消费者提高消费者查找效率。至于连接,只需在其中记录关联的消费者,信道关闭使用消费者管理句柄删除消费者即可

4.需要管理生产者吗?

不需要,因为服务端不会像对消费者那样主动发送数据,只会对生产者发送响应,所以生产者从哪里给我发来请求,我再从哪里响应回去就行。

二.编写思路

1.定义消费者

成员变量:

  1. 唯一标识 id
  2. 订阅的队列名称
  3. 消息处理的回调函数:服务端的“消费”,指的是把消息从队列中取出来,然后构建响应,发送给消费者所在的消费客户端,这个构建响应并发送的过程由消费者提供的回调函数来完成,这个回调函数,哪个模块创建的消费者,哪个模块负责设置(其实就是后面讲的信道模块)
  4. 自动应答标志:所谓自动应答就是不需要消费客户端发送 ACK,而是服务端自己把消息发出去后就删除本地消息

2.定义队列消费者管理管理类

成员变量:

  1. 队列名称
  2. 消费者的 vector 数组
    为什么要选数组,我们说需要为队列负载均衡地选择一个消费者进行消费,采用的方法就是下标轮转,所以要支持随机访问,数组就很合适
  3. 轮转下标
    成员方法:
  4. 新增消费者
  5. 删除消费者
  6. 获取一个消费者

3.定义消费者管理类

成员变量:

  1. 队列消费者管理句柄数组
    成员方法:
  2. 构造函数:根据传入的队列名数组,初始化队列消费者管理句柄
  3. 初始化队列消费者管理句柄:新增队列时使用
  4. 向指定队列新增消费者
  5. 从指定队列删除消费者:
  6. 从指定队列获取一个消费者:负载均衡式消费
  7. 删除队列消费者管理句柄:删除队列时使用

三.代码实践

#pragma once
#include "../common/Log.hpp"
#include "../common/message.pb.h"
#include <functional>
#include <memory>
#include <atomic>
#include <mutex>
#include <vector>
#include <unordered_map>
namespace ns_consumer
{
    using namespace ns_log;

    class Consumer;
    class QueueConsumerManager;

    using ConsumerPtr = std::shared_ptr<Consumer>;
    using QueueConsumerManagerPtr = std::shared_ptr<QueueConsumerManager>;
    using MessagePtr = std::shared_ptr<ns_data::Message>;
    using ConsumerCallback_t = std::function<void(const std::string& qname, const std::string& consumerId, MessagePtr msgPtr)>;
    struct Consumer
    {
        std::string _id;
        std::string _qname;
        ConsumerCallback_t _callback;
        bool _autoAck;

        Consumer(const std::string id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
            : _id(id),
              _qname(qname),
              _callback(callback),
              _autoAck(autoAck)
        {
            LOG(DEBUG) << "创建消费者: " << _id << endl;
        }

        ~Consumer()
        {
            LOG(DEBUG) << "析构消费者: " << _id << endl;
        }
    };

    class QueueConsumerManager
    {
    private:
        const std::string _qname;
        std::vector<ConsumerPtr> _consumers;
        size_t _rotateOrder;
        std::mutex _mtx;

    public:
        QueueConsumerManager(const std::string &qname)
            : _qname(qname),
              _consumers(),
              _rotateOrder(0),
              _mtx()
        {
        }

        /***********
         * 新增消费者
         * ****************/
        ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            // 判断消费者是否重复
            for (auto &consumerPtr : _consumers)
            {
                if (consumerPtr->_id == id)
                {
                    return consumerPtr;
                }
            }
           ConsumerPtr ret = std::make_shared<Consumer>(id, qname, callback, autoAck);
           _consumers.push_back(ret);
           return ret;
        }

        /**************
         * 移除消费者
         * ***************/
        void removeConsumer(const std::string &cid)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            for (auto it = _consumers.begin(); it != _consumers.end(); ++it)
            {
                if ((*it)->_id == cid)
                {
                    _consumers.erase(it);
                    break;
                }
            }
        }

        /***************
         * 负载均衡地获取一个消费者
         * *************/
        ConsumerPtr chooseConsumer()
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_consumers.size() == 0)
            {
                return nullptr;
            }
            _rotateOrder %= _consumers.size();
            return _consumers[_rotateOrder++];
        }
    };

    class ConsumerManager
    {
    private:
        std::unordered_map<std::string, QueueConsumerManagerPtr> _qConsumerManagers;
        std::mutex _mtx;

    public:
        ConsumerManager(const std::vector<std::string> &qnames)
        {
            for (const auto &qname : qnames)
            {
                _qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
            }
        }

        /**************
         * 初始化队列消费者管理句柄--新增队列时调用
         * **************/
        void initQueueConsumerManager(const std::string &qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_qConsumerManagers.count(qname))
            {
                return;
            }
            _qConsumerManagers[qname] = std::make_shared<QueueConsumerManager>(qname);
        }

        /*******************
         * 销毁指定的队列消费者管理句柄--销毁队列时调用
         * ****************/
        void removeQueueConsumerManager(const std::string &qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            _qConsumerManagers.erase(qname);
        }

        /*************
         * 给指定队列新增消费者
         * ************/
        ConsumerPtr addConsumer(const std::string &id, const std::string &qname, ConsumerCallback_t callback, bool autoAck)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return nullptr;
            }
            return _qConsumerManagers[qname]->addConsumer(id, qname, callback, autoAck);
            
        }

        /***********
         * 删除指定队列的消费者
         * ****************/
        void removeConsumer(const std::string &qname, const std::string &cid)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return;
            }
            _qConsumerManagers[qname]->removeConsumer(cid);
        }

        /*****************
         * 获取指定队列的一个消费者
         * **************/
        ConsumerPtr chooseConsumer(const std::string& qname)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_qConsumerManagers.count(qname) == 0)
            {
                LOG(WARNING) << "QueueConsumerManager not found, qname: " << qname << endl;
                return nullptr;
            }
            return _qConsumerManagers[qname]->chooseConsumer();
        }
    };
}

http://www.kler.cn/a/330669.html

相关文章:

  • 【ROS2】Qt事件循环和ROS2订阅机制一起使用有什么注意事项?
  • krpano 实现文字热点中的三角形和竖杆
  • 51单片机——串口通信(重点)
  • GO随记:不使用主键id 如何分表与mysql大表
  • G1原理—2.G1是如何提升分配对象效率
  • 【单片机】实现一个简单的ADC滤波器
  • c#增删改查 (数据操作的基础)
  • Python 从入门到实战32(数据库MySQL)
  • VMware中Ubuntu系统Docker正常运行但网络不通(已解决)
  • java 的三种IO模型(BIO、NIO、AIO)
  • 蓝桥杯备赛---2.新建工程
  • 论文不同写作风格下的ChatGPT提示词分享
  • Linux学习之路 -- 线程 -- 线程池
  • RabbitMq生产者可靠性
  • Python安装流程(Windows + MAC)
  • 【CocosCreator 3.x】实现物体按指定轨迹移动
  • python开源代码自学问题解决(requests+openpyxl+pymysql)
  • 云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
  • 【笔记】1.4.4断裂判据
  • 分布式理论:拜占庭将军问题
  • 计算机视觉周边技术解析:从基础到前沿
  • 【算法】DFS 系列之 穷举/暴搜/深搜/回溯/剪枝(上篇)
  • (作业)第三期书生·浦语大模型实战营(十一卷王场)--书生入门岛通关第3关Git 基础知识
  • fish-speech语音大模型本地部署
  • python you-get下载视频
  • 【当当网】电子书城-02-验证码的实现