当前位置: 首页 > article >正文

【RabbitMQ 项目】服务端:信道模块

文章目录

  • 一.概念辨析
    • 1.什么是信道
    • 2.为什么要有信道
    • 3.怎么实现信道
  • 二.实现思路
    • 1.定义信道
    • 2.定义连接信道管理类
  • 三.代码实践

一.概念辨析

1.什么是信道

信道是在用户层的一个逻辑概念,是比 TCP 连接更加细粒度的通信通道,客户端想要和服务端通信,必须先创建一个信道,通过信道来请求各种各样的服务。服务端在收到客户端创建信道的请求后,也要在本地创建一个相应的信道,来给客户端提供对应的服务。
即客户端通过信道给用户提供服务,服务端通过信道给客户端提供服务,信道就是提供服务的窗口。

2.为什么要有信道

信道只是一个逻辑的概念,只是在用户层管理的一些数据结构,真正网络发送数据的还是 TCP 连接,一个信道必然要和一个 TCP 连接关联,一个 TCP 连接可以被多个信道使用。做这种更加细粒度的划分,是为了复用 TCP 连接,实现 TCP 长连接。
因为用户有时候想要创建多个通信的句柄,如果直接创建 TCP 连接,资源不能得到充分利用,频繁创建和关闭 TCP 连接也影响性能,用一个逻辑上的信道代替,多个信道复用一个 TCP 连接,使资源得到充分利用

3.怎么实现信道

信道是用户层的一个概念,怎么实现?

  1. 客户端和服务端都要有描述信道的数据结构并管理起来。
  2. 网络协议定义的所有请求和响应,都带有信道 id 这个字段,指明我要和你服务端上的哪个信道通信
    例如:
    在客户端打开信道的 Request,含义是我客户端创建一个信道,你服务端也要创建一个相应的信道
    在客户端关闭信道的 Request,含义是我客户端关闭了信道,你服务端也要关闭对应的
    在创建交换机,队列等的 Request,含义是我的请求来自客户端的这个信道,你服务端也要用对应的信道给我提供服务
    这样就实现了信道和信道之间的独立性,互不干扰

二.实现思路

1.定义信道

成员变量:

  1. 虚拟机句柄:用于本地业务处理
  2. 协议处理句柄:用于发送响应
  3. TCP 连接(Muduo 库中的 TCPConnection 的智能指针):给协议处理句柄发送响应
  4. 关联的多个消费者:因为信道关闭时,要把与之关联的消费者移除,防止内存泄漏。那么什么时候会新增关联的消费者?当本信道收到订阅队列的消息时候。说明一下,这个字段信道不一定会用到,因为这个信道也有可能是为生产客户端服务的,生产客户端没有消费者
  5. 消费者管理句柄:因为涉及到新增消费者和删除消费者,所以需要消费者管理句柄
  6. 线程池句柄:消费消息的任务属于支线任务,muduo 库中的工作线程不想自己来做,因为它主要工作是监控 IO 事件以及从接收缓冲区读数据,所以消费消息的任务让线程池来做
    成员方法:
  7. 交换机声明与删除:先使用虚拟机句柄业务处理,再构建响应用协议处理句柄发送
  8. 队列声明与删除:与交换机声明删除类似,但是要声明队列还要初始化队列消费者管理句柄,删除队列后还要移除订阅该队列的消费者
  9. 绑定与解绑:绑定前需要判断 bingKey 是否合法
  10. 发布消息:先用虚拟机句柄处理业务,然后把消费消息的任务丢进线程池。怎么消费?先用消费者管理句柄,选出订阅该队列的消费者,然后调用消费者的回调函数,构建响应并发送
  11. 订阅队列:当信道收到订阅队列的请求,间接说明这个信道是给消费客户端服务的,因为生产客户端不会发送这样的请求。要做的就是用消费者管理句柄新增一个消费者,同时添加信道关联的消费者
  12. 取消订阅:和订阅队列做相反的工作
  13. 确认应答:用虚拟机句柄确认应答,再构建响应并发送
  14. 析构函数:使用消费者管理句柄删除信道关联的所有消费者

2.定义连接信道管理类

前置说明:
信道我们以连接为单位进行管理,并不提供所有信道的管理类,没有意义,因为信道是依托于连接的,连接没了,其中的所有信道也就没了

成员变量:

  1. < 信道 id,信道智能指针 > 的哈希表
    成员方法:
  2. 添加信道
  3. 删除信道
  4. 根据 id 获取信道
    补充说明:
    不实现这个管理类也可以,直接在 Connection 中使用哈希表组织信道更直观。因为信道是只依附于 Connection 的,不像消费者,它既和队列关联,又和信道关联,我们不得不选择一个专门的类来管理消费者并且内部以队列为单元组织,方便推送消息时选择一个消费者

三.代码实践

#pragma once
#include "VirtualHost.hpp"
#include "Consumer.hpp"
#include "muduo/net/TcpConnection.h"
#include "muduo/protobuf/codec.h"
#include "../common/ThreadPool.hpp"
#include "../common/protocol.pb.h"
#include "Route.hpp"
#include <mutex>

namespace ns_channel
{
    class Channel;
    using ChannelPtr = std::shared_ptr<Channel>;

    using VirtualHostPtr = std::shared_ptr<ns_data::VirtualHost>;
    using ConsumerManagerPtr = std::shared_ptr<ns_consumer::ConsumerManager>;
    using ThreadPoolPtr = std::shared_ptr<ns_tp::ThreadPool>;
    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;

    class Channel
    {
    private:
        std::string _id;
        std::unordered_map<std::string, ns_consumer::ConsumerPtr> _consumers; // 一个信道内可能关联多个消费者,信道关闭时需要把与之关联的消费者都删除
        muduo::net::TcpConnectionPtr _connPtr;                                // 本地业务处理完后,要构建响应发送
        ProtobufCodecPtr _codecPtr;                                           // 构建响应后要添加协议数据
        VirtualHostPtr _vhPtr;                                                // 本地业务处理
        ConsumerManagerPtr _consumerManagerPtr;                               // 信道关闭时要删除消费者,队列有新消息时要选择消费者
        ThreadPoolPtr _threadPoolPtr;                                         // 处理推送消息的任务
        std::mutex _mtx;                                                      // 用来保护_consumers
    public:
        Channel(const std::string &id, const muduo::net::TcpConnectionPtr &connPtr, const ProtobufCodecPtr &codecPtr,
                const VirtualHostPtr &vhPtr, const ConsumerManagerPtr &consumerManagerPtr, const ThreadPoolPtr &threadPoolPtr)
            : _id(id),
              _consumers(),
              _connPtr(connPtr),
              _codecPtr(codecPtr),
              _vhPtr(vhPtr),
              _consumerManagerPtr(consumerManagerPtr),
              _threadPoolPtr(threadPoolPtr),
              _mtx()
        {
        }

        ~Channel()
        {
            LOG(DEBUG) << "channel析构" << endl;
            for (auto &kv : _consumers)
            {
                _consumerManagerPtr->removeConsumer(kv.second->_qname, kv.second->_id);
                LOG(DEBUG) << "由于信道关闭,关联的消费者也被移除,consumerId: " << kv.second->_id << endl;
            }
        }

        /************
         * 以下用于处理生产客户端的请求
         * ***********/
        void declareExchange(const ns_protocol::DeclareExchangeRequest &req)
        {
            // 业务处理
            bool ret = _vhPtr->declareExchange(req.exchange_name(), req.exchange_type(), req.is_durable());
            // 响应
            sendCommonResponse(req.request_id(), ret);
        }

        void deleteExchange(const ns_protocol::DeleteExchangeRequest &req)
        {
            _vhPtr->deleteExchange(req.exchange_name());
            sendCommonResponse(req.request_id(), true);

        }

        /*************
         * 声明队列
         * 记得要初始化队列消费者管理句柄
         * ***********/
        void declareMsgQueue(const ns_protocol::DeclareMsgQueueRequest &req)
        {
            // 业务处理
            bool ret = _vhPtr->declareMsgQueue(req.queue_name(), req.is_durable());
            if (ret)
            {
                _consumerManagerPtr->initQueueConsumerManager(req.queue_name());
            }
            // 响应
            sendCommonResponse(req.request_id(), ret);
        }

        /***************
         * 删除队列
         * 记得要删除队列关联的消费者
         * *************/
        void deleteMsgQueue(const ns_protocol::DeleteMsgQueueRequest &req)
        {
            _vhPtr->deleteMsgQueue(req.queue_name());
            _consumerManagerPtr->removeQueueConsumerManager(req.queue_name());
            sendCommonResponse(req.request_id(), true);
        }

        /**********
         * 绑定与解绑
         * ************/
        void bind(const ns_protocol::BindRequest &req)
        {
            if (!ns_route::Router::isLegalBindingKey(req.binding_key()))
            {
                LOG(INFO) << "Binding的bindingKey非法, bindingKey: " << req.binding_key() << endl;
                sendCommonResponse(req.request_id(), false);
                return;
            }
            bool ret = _vhPtr->bind(req.ename(), req.qname(), req.binding_key());
            sendCommonResponse(req.request_id(), ret);
        }

        void unbind(const ns_protocol::UnbindRequest &req)
        {
            _vhPtr->unbind(req.ename(), req.qname());
            sendCommonResponse(req.request_id(), true);
        }

        void publishMessage(const ns_protocol::PublishMessageRequest &req)
        {

            const std::string ename = req.exchange_name();
            // 获取交换机相关的所有绑定
            std::unordered_map<std::string, ns_data::BindingPtr> bindings;
            bool isExchangeExists = _vhPtr->getExchangeBindings(ename, &bindings);
            if (!isExchangeExists)
            {
                LOG(INFO) << "用户将消息发布到一个不存在的交换机上, exchangeName: " << ename << endl;
                sendCommonResponse(req.request_id(), false);
                return;
            }

            // 判断routingKey是否合法
            const std::string &routingKey = req.msg().saved_info().routing_key();
            if (!ns_route::Router::isLegalRoutingKey(routingKey))
            {
                LOG(INFO) << "消息的routingKey不合法, routingKey: " << routingKey << endl;
                sendCommonResponse(req.request_id(), false);
                return;
            }

            // 交换路由,选出队列并发布
            auto exchangePtr = _vhPtr->getExchange(ename);
            assert(exchangePtr);
            ns_protocol::ExchangeType type = exchangePtr->_type;
            for (const auto &kv : bindings)
            {
                auto &bindingPtr = kv.second;
                if (ns_route::Router::isMatched(routingKey, bindingPtr->_bindingKey, type))
                {
                    LOG(DEBUG) << "消息路由到" << kv.first << "上, " << routingKey << "<=>" << bindingPtr->_bindingKey << endl;

                    const auto &msg = req.msg();
                    const std::string &qname = kv.first;
                    _vhPtr->publish(qname, msg.saved_info().id(), routingKey,
                                    msg.saved_info().body(), msg.saved_info().delivery_mode());

                    // 把消费的任务交给线程池
                    auto task = std::bind(&Channel::consume, this, qname);
                    _threadPoolPtr->push(task);
                }
            }
            sendCommonResponse(req.request_id(), true);

        }

        /***********
         * 以下用于处理消费客户端请求
         * **************/
        void subscribeQueue(ns_protocol::SubscribeQueueRequest &req)
        {
            std::unique_lock<std::mutex> lck(_mtx);

            if (_consumers.count(req.consumer_id()))
            {
                sendCommonResponse(req.request_id(), true);
            }
            auto consumerPtr = _consumerManagerPtr->addConsumer(req.consumer_id(),
                                                                req.qname(),
                                                                std::bind(&Channel::consumerCallback, this,
                                                                          std::placeholders::_1, std::placeholders::_2,
                                                                          std::placeholders::_3),
                                                                req.auto_ack());
            if (consumerPtr == nullptr)
            {
                sendCommonResponse(req.request_id(), false);
            }
            else
            {
                _consumers[consumerPtr->_id] = consumerPtr;
                LOG(DEBUG) << "信道新增关联消费者, consumerId: " << consumerPtr->_id << endl;
                sendCommonResponse(req.request_id(), true);
            }

            // 把消费的任务交给线程池
            auto task = std::bind(&Channel::consume, this, req.qname());
            _threadPoolPtr->push(task);
        }

        void cancelSubscribe(ns_protocol::CancelSubscribeRequest &req)
        {

            _consumerManagerPtr->removeConsumer(req.qname(), req.consumer_id());

            std::unique_lock<std::mutex> lck(_mtx);
            _consumers.erase(req.consumer_id());

            sendCommonResponse(req.request_id(), true);
        }

        void ackMessage(ns_protocol::AckRequest &req)
        {
            _vhPtr->ack(req.qname(), req.msg_id());
            sendCommonResponse(req.request_id(), true);
        }

    private:
        void sendCommonResponse(const std::string &responseId, bool ok)
        {
            ns_protocol::CommomResponse resp;
            resp.set_channel_id(_id);
            resp.set_response_id(responseId);
            resp.set_ok(ok);
            _codecPtr->send(_connPtr, resp);
        }

        /*************
         * 这个任务是给线程池做的
         * 消费队列消息:从队列中取出一条消息,选择一个订阅队列的消费者,然后推动给它
         * 注意该接口会多次消费,直到队列消息被消费完
         * *******************/
        void consume(const std::string &qname)
        {
            while (true)
            {
                auto msgPtr = _vhPtr->consume(qname); // 线程安全的
                if (msgPtr == nullptr)
                {
                    return;
                }

                // 选择一个消费者
                auto consumerPtr = _consumerManagerPtr->chooseConsumer(qname);
                if (consumerPtr == nullptr)
                {
                    return;
                }

                // 让消费者去处理消息并发送
                consumerPtr->_callback(consumerPtr->_qname, consumerPtr->_id, msgPtr);

                // 如果该消费者是自动应答则立马ack
                if (consumerPtr->_autoAck)
                {
                    _vhPtr->ack(qname, msgPtr->saved_info().id());
                }
            }
        }

        void consumerCallback(const std::string &qname, const std::string &consumerId, ns_data::MessagePtr msgPtr)
        {
            ns_protocol::PushMessageResponse resp;
            resp.set_channel_id(_id);
            resp.set_consumer_id(consumerId);
            resp.set_qname(qname);
            resp.mutable_msg()->mutable_saved_info()->set_id(msgPtr->saved_info().id());
            resp.mutable_msg()->mutable_saved_info()->set_body(msgPtr->saved_info().body());
            resp.mutable_msg()->mutable_saved_info()->set_delivery_mode(msgPtr->saved_info().delivery_mode());
            resp.mutable_msg()->mutable_saved_info()->set_routing_key(msgPtr->saved_info().routing_key());

            _codecPtr->send(_connPtr, resp);
        }
    };

    /******************************
     * 信道管理句柄,注意以Connection为单元进行管理
     * ***************************/
    class ConnectionChannelManager
    {
    private:
        std::mutex _mtx;
        std::unordered_map<std::string, ChannelPtr> _channels;

    public:
        ConnectionChannelManager()
        {
        }

        bool openChannel(const std::string &id, const muduo::net::TcpConnectionPtr &connPtr,
                         const ProtobufCodecPtr &codecPtr, const VirtualHostPtr &vhPtr,
                         const ConsumerManagerPtr &consumerManagerPtr, const ThreadPoolPtr &threadPoolPtr)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_channels.count(id))
            {
                LOG(WARNING) << "信道已经打开, channelId: " << id << endl;
                return false;
            }
            auto channelPtr = std::make_shared<Channel>(id, connPtr, codecPtr, vhPtr, consumerManagerPtr, threadPoolPtr);
            _channels[id] = channelPtr;
            return true;
        }

        void closeChannel(const std::string &id)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            _channels.erase(id);
        }

        ChannelPtr getChannel(const std::string &id)
        {
            std::unique_lock<std::mutex> lck(_mtx);
            if (_channels.count(id) == 0)
            {
                LOG(WARNING) << "信道不存在, channelId: " << id;
                return nullptr;
            }
            return _channels[id];
        }
    };
}

http://www.kler.cn/a/321677.html

相关文章:

  • HarmonyOS ArkTs 解决流式传输编码问题
  • RK3568平台(I2C篇)i2c_transfer接口详解
  • 【代码大模型】Is Your Code Generated by ChatGPT Really Correct?论文阅读
  • ubuntu下openssl签名证书制作流程及验证demo
  • Python教程笔记(3)
  • STM32芯片EXIT外部中断的配置与原理
  • Java调用第三方接口、http请求详解,一文学会
  • Sqlserver事务行版本控制指南
  • 面向pymupdf4llm与MinerU 面试题
  • OpenHarmony(鸿蒙南向)——平台驱动指南【HDMI】
  • 倾斜单体化重建异形和异形建筑思路整理
  • 力扣583-两个字符串的删除操作(Java详细题解)
  • Spring Boot的核心技术有哪些?
  • AIGC引领数智未来:企业架构演进的深度解析与实践路径,The Open Group 2024生态系统架构·可持续发展年度大会专题报道
  • 深入理解 CompletableFuture 的底层原理
  • 使用npm link 把一个本地项目变成依赖,引入到另一个项目中
  • xlsx库插件读取excel文件
  • 在使用 Docker 时,用户可能会遇到各种常见的错误和问题
  • 使用python进行自然语言处理的示例
  • jmeter-请求参数加密-MD5加密
  • 美食共享圈:Spring Boot校园周边美食平台
  • uniapp踩坑 tabbar页面数据刷新了但视图没有更新
  • 【1分钟学会】JSON
  • Sentinel-1 数据处理时如何手动下载高程数据
  • 形象解释暂停方法和旁路方法
  • 力扣30. 串联所有单词的子串