当前位置: 首页 > article >正文

第十一章:服务器信道管理模块

目录

第一节:模块介绍

第二节:通信协议

第三节:信道模块实现

        3-1.类型别名定义

        3-2.Channel类

        3-3.ChannelManager类

下期预告:


        该模块在mqserver目录下实现。

第一节:模块介绍

        服务器信道的作用是处理来自于客户端的各种请求,然后返回一个响应,那么客户端都有哪些请求呢?比如:交换机的声明与创建、队列的声明与创建、绑定与解绑等。

        请求的种类如此多,信道要怎么识别这些请求,执行对应的任务呢?这就需要用到muduo库的协议分发器了:只要将每种请求各自封装成proto的结构体,然后实现每个请求的业务函数,并将对应请求的结构体和业务函数进行注册绑定,那么服务器在收到请求之后就会自动识别请求的结构体类型,去调用绑定的业务函数了。

        为了服务器收到的请求都是有效的,客户端发送的请求也必须是请求封装的结构体类型,所以先封装这些结构体,统一通讯的格式。

第二节:通信协议

        打开mqcommon目录,创建mq_proto.proto文件,并添加以下内容:

syntax = "proto3";
package zd;

import "mq_msg.proto";

// 网络协议不同的请求/应答格式

// 创建信道请求
message ChannelOpenRequest
{
    string rid = 1; // 请求唯一id
    string channel_id = 2;
};

// 关闭信道请求
message ChannelCloseRequest
{
    string rid = 1;
    string channel_id = 2;
};

// 声明交换机请求
message ExchangeDeclareRequest
{
    string rid = 1;
    string channel_id = 2; // 信道id--表示请求交给哪个信道处理

    // -----交换机属性-----

    string exchange_name = 3;
    ExchangeType exchange_type = 4; // 交换机类型
    bool durable = 5;
    bool autodelete = 6; // 自动删除标志
    map<string,string> args = 7; // 扩展参数args
};

// 移除交换机请求
message ExchangeDeleteRequest
{
    string rid = 1;
    string channel_id = 2;
    string exchange_name = 3;
};

// 声明队列请求
message MsgQueueDeclareRequest
{
    string rid = 1;
    string channel_id = 2; // 信道id--表示请求交给哪个信道处理

    string msgqueue_name = 3;
    bool durable = 4;
    bool exclusive = 5;
    bool autodelete = 6;
    map<string,string> args = 7;
};

// 删除队列请求
message MsgQueueDeleteRequest
{
    string rid = 1;
    string channel_id = 2;
    string msgqueue_name = 3;
};

// 绑定请求
message QueueBindRequest
{
    string rid = 1;
    string channel_id = 2;

    string exchange_name = 3;
    string msgqueue_name = 4;
    string binding_key = 5;
};

// 解绑请求
message QueueUnbindRequest
{
    string rid = 1;
    string channel_id = 2;

    string exchange_name = 3;
    string msgqueue_name = 4;
};

// 生产消息请求
message BasicPublishRequest
{
    string rid = 1;
    string channel_id = 2;

    string exchange_name = 3;
    BasicProperties properties = 4;
    string body = 5;
};

// 订阅消息请求
message BasicConsumerRequest
{
    string rid = 1;
    string channel_id = 2;

    string msgqueue_name = 3;
    string consumer_tag = 4;
    bool auto_ack = 5;
};

// 取消订阅请求
message BasicCancelRequest
{
    string rid = 1;
    string channel_id = 2;

    string msgqueue_name = 3;
    string consumer_tag = 4;
};

// 消息确认请求
message BasicAckRequest
{
    string rid = 1;
    string channel_id = 2;

    string msgqueue_name = 3;
    string message_id = 4;
};

// 服务器向客户端推送消息的格式
message BasicConsumerResponse
{
    string rid = 1;
    string channel_id = 2;

    string consumer_tag = 3;
    BasicProperties properties = 4;
    string body = 5;
};

// 基础应答:其他的请求的响应
message BaseResponse
{
    string rid = 1;        // 针对哪个请求的应答
    string channel_id = 2; // 请求被哪个信道处理
    bool ok = 3;           // 请求是否处理成功
};

        可以看到,每一个请求/响应都给予了对方需要的数据,例如声明交换机的请求有交换机名称、持久化标志、自动删除标志、模式、其他参数。这个请求由客户端发送到服务器后,服务器就可以使用这些信息声明一个交换机了。

        然后将.proto文件变成c++文件:

protoc --cpp_out=./ mq_proto.proto

        成功之后会出现两个新的文件:

                        ​​​​​​​        ​​​​​​​        ​​​​​​​        

        

第三节:信道模块实现

        打开mqserver目录,创建mq_channel.hpp文件,打开并将"老三套"写入:

#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__
#include <memory>
#include <mutex>

#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_threadpool.hpp"

#include "mq_consumer.hpp"
#include "mq_virtualhost.hpp"
#include "mq_route.hpp"

#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"

namespace zd
{};

#endif

        3-1.类型别名定义

        使用using定义一些别名,方便使用:

    using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;

    using ChannelOpenRequestPtr = std::shared_ptr<ChannelOpenRequest>;
    using ChannelCloseRequestPtr = std::shared_ptr<ChannelCloseRequest>;
    using ExchangeDeclareRequestPtr = std::shared_ptr<ExchangeDeclareRequest>;
    using ExchangeDeleteRequestPtr = std::shared_ptr<ExchangeDeleteRequest>;
    using MsgQueueDeclareRequestPtr = std::shared_ptr<MsgQueueDeclareRequest>;
    using MsgQueueDeleteRequestPtr = std::shared_ptr<MsgQueueDeleteRequest>;
    using QueueBindRequestPtr = std::shared_ptr<QueueBindRequest>;
    using QueueUnbindRequestPtr = std::shared_ptr<QueueUnbindRequest>;
    using BasicPublishRequestPtr = std::shared_ptr<BasicPublishRequest>;
    using BasicAckRequestPtr = std::shared_ptr<BasicAckRequest>;
    using BasicConsumerRequestPtr = std::shared_ptr<BasicConsumerRequest>;
    using BasicCancelRequestPtr = std::shared_ptr<BasicCancelRequest>;

        ProtobufCodec是proto的协议处理器,使用它send数据时,会自动将数据序列化并加上应用层报头。

        下面就是第二节定义的各种请求类型的智能指针。

        3-2.Channel类

        要管理信道首先得要有信道,class Channel要包含信道的信息:

    class Channel
    {
        public:
            using ptr = std::shared_ptr<Channel>;

            Channel(const std::string& id,
                    const VirtualHost::ptr& vhp,
                    const QueueConsumerManger::ptr& cmp,
                    const ProtobufCodecPtr& codec,
                    const muduo::net::TcpConnectionPtr& conn,
                    const threadpool::ptr& pool):
            _channel_id(id),_vhp(vhp),_cmp(cmp),
            _codec(codec),_conn(conn),_pool(pool)
            {}
        private:
            std::string _channel_id;       // 信道唯一id
            Consumer::ptr _consumer;       // 信道关联的消费者 
            muduo::net::TcpConnectionPtr _conn; // 信道使用的连接  
            ProtobufCodecPtr _codec;       // protobuf协议处理器
            QueueConsumerManger::ptr _cmp; // 消费者管理句柄
            VirtualHost::ptr _vhp;         // 虚拟机管理句柄
            threadpool::ptr _pool;         // 线程池
    };  

        信道还需要提供各种业务,使用请求的给予的信息调用管理句柄的对应接口即可。

        每个请求完成后都需要给客户端发送响应,所以先把响应业务实现了:

            void basicResponse(bool ret,const std::string& rid,const std::string& cid)
            {
                BaseResponse br;
                br.set_rid(rid);
                br.set_channel_id(cid);
                br.set_ok(ret);
                _codec->send(_conn,br);
            }

        交换机管理:

            // 声明交换机
            bool declareExchange(const ExchangeDeclareRequestPtr& req)
            {
                // 使用虚拟机声明交换机
                bool ret = _vhp->declareExchange(req->exchange_name(),
                                                 req->exchange_type(),
                                                 req->durable(),
                                                 req->autodelete(),
                                                 req->args());
                // 发送响应
                basicResponse(ret,req->rid(),req->channel_id());
                return ret;
            }
            // 移除交换机
            void destoryExchange(const ExchangeDeleteRequestPtr& req)
            {
                _vhp->deleteExchange(req->exchange_name());
                basicResponse(true,req->rid(),req->channel_id());
            }

        队列管理:

            // 声明队列
            void declareMsgQueue(const MsgQueueDeclareRequestPtr& req)
            {
                // 先声明队列
                bool ret = _vhp->declareMsgQueue(req->msgqueue_name(),
                                                 req->durable(),
                                                 req->exclusive(),
                                                 req->autodelete(),
                                                 req->args());
                if(ret == false) basicResponse(ret,req->rid(),req->channel_id());
                // 再初始化队列的消息管理句柄
                _cmp->initQueueConsumer(req->msgqueue_name());
                // 发送响应
                basicResponse(ret,req->rid(),req->channel_id());
            }
            // 移除队列
            void destoryMsgQueue(const MsgQueueDeleteRequestPtr& req)
            {
                // 移除队列消息管理句柄
                _cmp->destoryQueueConsumer(req->msgqueue_name());
                // 移除队列
                _vhp->deleteMsgQueue(req->msgqueue_name());
                basicResponse(true,req->rid(),req->channel_id());
            }

        绑定管理:

            // 绑定
            void queueBind(const QueueBindRequestPtr& req)
            {
                bool ret = _vhp->bind(req->exchange_name(),req->msgqueue_name(),req->binding_key());
                basicResponse(ret,req->rid(),req->channel_id());
            }
            // 解绑
            void queueUnBind(const QueueUnbindRequestPtr& req)
            {
                _vhp->unBind(req->exchange_name(),req->msgqueue_name());
                basicResponse(true,req->rid(),req->channel_id());

            }

        消息管理:

            // 发布消息
            void basicPublish(const BasicPublishRequestPtr& req)
            {
                // 0.验证routing_key的合法性
                bool ret = Router::isLegalRoutingkey(req->properties().routing_key());
                if(ret == false) return basicResponse(false,req->rid(),req->channel_id());;
                // 1.先检查交换机存在
                ret = _vhp->exchangeExists(req->exchange_name());
                if(ret == false) return basicResponse(false,req->rid(),req->channel_id());;
                // 2.获取交换机信息
                Exchange::ptr ep = _vhp->getOneExchange(req->exchange_name());
                // 3.获取交换机绑定信息 
                MsgQueueBindingMap  epb = _vhp->exchangeBindings(req->exchange_name());

                std::string routing_key;
                if(req->has_properties()) // properties 不为空
                    routing_key = req->properties().routing_key();

                // 4.交换路由,给匹配成功的队列添加消息
                for(const auto& it:epb)
                {
                    ret = Router::route(ep->type,routing_key,it.second->binding_key);
                    if(ret == true)
                    {
                        _vhp->basicPublish(it.first,req->mutable_properties(),req->body());
                        // 向线程池添加一个消费任务(向队列的订阅者推送消息)
                        auto task = std::bind(&Channel::consume,this,it.first);
                        _pool->push(task,this,it.first);
                    } 
                } 
                basicResponse(true,req->rid(),req->channel_id());
            }

            // 确认/删除消息
            void basicAck(const BasicAckRequestPtr& req)
            {
               _vhp->basicAck(req->msgqueue_name(),req->message_id());
               basicResponse(true,req->rid(),req->channel_id());
            }

        发布消息后,信道要把消息发布给队列的订阅者(就是服务器消费者的回调函数),这个任务交给线程池完成,信道继续它的执行:

            // 线程池的执行任务
            void consume(const std::string& qname)
            {
                // 1.从队列中取出一条消息
                MessagePtr msgp = _vhp->basicConsume(qname);
                if(msgp.get() == nullptr)
                {
                    LOG("队列 %s 推送消息失败,没有对应的消息",qname.c_str());
                    return;
                }
                // 2.从队列消费者中取出一个消费者
                Consumer::ptr consumer = _cmp->choose(qname);
                if(consumer.get() == nullptr)
                {
                    LOG("队列 %s 推送消息失败,队列没有消费者",qname.c_str());
                    return;
                }
                // 3.调用该消费者的消息处理回调函数(就是把消息给客户端)
                consumer->callback(consumer->tag,msgp->mutable_payload()->mutable_properties(),msgp->payload().body());
                // 4.如果消费者是自动确认,直接删除消息
                if(consumer->auto_ack == true)
                {
                    _vhp->basicAck(qname,msgp->payload().properties().id());
                }
            }

        订阅处理:

            // 订阅队列
            void basicConsume(const BasicConsumerRequestPtr& req)
            {
                bool ret = _vhp->msgqueueExists(req->msgqueue_name());
                if(ret == false) return basicResponse(false,req->rid(),req->channel_id());
                // 创建队列的消费者
                auto cb = std::bind(&Channel::consumerCallback,this,req->rid(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);
                // 订阅之后,这个channel就是消费者
                _consumer = _cmp->create(req->msgqueue_name(),req->consumer_tag(),req->auto_ack(),cb);
                basicResponse(true,req->rid(),req->channel_id());
            }
            // 取消订阅
            void basicCancel(const BasicCancelRequestPtr& req)
            {
                _cmp->remove(req->msgqueue_name(),req->consumer_tag());
            }

        订阅之后,这个信道对于服务器来说就变成消费者了,之前说过这个消费者的消息处理回调函数是固定的——把消息推送给客户端,所以再实现一下它的回调函数:

            // 消费者channel的消息处理回调函数
            void consumerCallback(const std::string& rid,const std::string& tag,const BasicProperties* bp,const std::string& body)
            {
                // 将消息组织成响应发送给对应的客户端--客户端才是真正的消费者
                BasicConsumerResponse resp;
                resp.set_rid(rid);
                resp.set_channel_id(_channel_id);
                resp.set_body(body);
                resp.set_consumer_tag(tag);
                if(bp)
                {
                    resp.mutable_properties()->set_id(bp->id());
                    resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());
                    resp.mutable_properties()->set_routing_key(bp->routing_key());
                }
                _codec->send(_conn,resp);
            }

       析构函数,如果信道是一个消费者,那么在信道关闭时把关联的消费者从消费者模块中移除:

            ~Channel()
            {
                if(_consumer.get() != nullptr)
                {
                    _cmp->remove(_consumer->qname,_consumer->tag);
                }
            }

        3-3.ChannelManager类

        class ChannelManager是信道的管理类,对于每个连接都私有自己的信道管理,这样服务器收到某个连接的请求后,就可以直接在这个连接的信道找具体是哪个信道的请求。

        先定义它需要的成员变量:

    class ChannelManager
    {
        public:
            using ptr = std::shared_ptr<ChannelManager>;
        private:
            std::mutex _mtx;
            std::unordered_map<std::string,Channel::ptr> _channels; // 信道管理模块
            threadpool::ptr _pool;
    };

        它只对外提供3个接口。

        创建信道:

        当服务器收到创建信道的请求后就会调用它。

            // 创建/打开一个信道
            bool ChannelOpen(
                const std::string& id,
                const VirtualHost::ptr& vhp,
                const QueueConsumerManger::ptr& cmp,
                const ProtobufCodecPtr& codec,
                const muduo::net::TcpConnectionPtr& conn,
                const threadpool::ptr& pool
                )
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _channels.find(id);
                if(it != _channels.end())
                {
                    return false;
                }
                Channel::ptr channel = std::make_shared<Channel>(id,vhp,cmp,codec,conn,pool);
                _channels.insert(std::make_pair(id,channel));
                return true;
            }

        关闭信道:

            // 删除/关闭一个信道
            void ChannelClose(const std::string& id)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                _channels.erase(id);
            }

        获取信道:

            // 获取一个信道
            Channel::ptr getOneChannel(const std::string& id)
            {
                std::unique_lock<std::mutex> lock(_mtx);
                auto it = _channels.find(id);
                if(it == _channels.end())
                    return nullptr;
                return it->second;
            }

         至此服务器的信道管理模块就完成了,这个模块也不好进行测试,只能测试一下信道的创建和关闭这种简单的接口。

下期预告:

        之后是连接管理模块的实现,这个模块很简单。


http://www.kler.cn/a/564150.html

相关文章:

  • 验证环境中为什么要用virtual interface
  • 【R包】pathlinkR转录组数据分析和可视化利器
  • 常用空间数据结构对比
  • visual studio 2022 C++ OpenCV开发环境配置(详细教程)
  • 通过AI大模型 下达指令控制物理设备实现完全自动化
  • JavaScript 深浅拷贝全面解析
  • 《模拟器过检测教程:Nox、雷电、Mumu、逍遥模拟器 Magisk、LSposed 框架安装与隐藏应用配置》
  • JAVA多商户家政同城上门服务预约服务抢单派单+自营商城系统支持小程序+APP+公众号+h5
  • 如何通过JS实现关闭网页时清空该页面在本地电脑的缓存存储?
  • C/C++易错点:函数指针与指针函数的核心区别与避坑指南
  • nandflash坏块管理
  • 算法系列之动态规划
  • 计算机毕业设计SpringBoot+Vue.js大型商场应急预案管理系统(源码+文档+PPT+讲解)
  • 【Day47 LeetCode】图论问题 Ⅴ
  • 第七届信息科学、电气与自动化工程国际学术会议(ISEAE 2025)
  • 鸿蒙开发第4篇__关于在鸿蒙应用中使用Java语言进行设计
  • 多线程3:MFC中用户界面线程的相关操作
  • 【Javascript】js精度丢失
  • 检索增强生成(RAG)技术详解
  • Vulnhub靶场 Kioptrix: Level 1.3 (#4) 练习