第十一章:服务器信道管理模块
目录
第一节:模块介绍
第二节:通信协议
第三节:信道模块实现
3-1.类型别名定义
3-2.Channel类
3-3.ChannelManager类
下期预告:
该模块在mqserver目录下实现。
第一节:模块介绍
服务器信道的作用是处理来自于客户端的各种请求,然后返回一个响应,那么客户端都有哪些请求呢?比如:交换机的声明与创建、队列的声明与创建、绑定与解绑等。
请求的种类如此多,信道要怎么识别这些请求,执行对应的任务呢?这就需要用到muduo库的协议分发器了:只要将每种请求各自封装成proto的结构体,然后实现每个请求的业务函数,并将对应请求的结构体和业务函数进行注册绑定,那么服务器在收到请求之后就会自动识别请求的结构体类型,去调用绑定的业务函数了。
为了服务器收到的请求都是有效的,客户端发送的请求也必须是请求封装的结构体类型,所以先封装这些结构体,统一通讯的格式。
第二节:通信协议
打开mqcommon目录,创建mq_proto.proto文件,并添加以下内容:
syntax = "proto3"; package zd; import "mq_msg.proto"; // 网络协议不同的请求/应答格式 // 创建信道请求 message ChannelOpenRequest { string rid = 1; // 请求唯一id string channel_id = 2; }; // 关闭信道请求 message ChannelCloseRequest { string rid = 1; string channel_id = 2; }; // 声明交换机请求 message ExchangeDeclareRequest { string rid = 1; string channel_id = 2; // 信道id--表示请求交给哪个信道处理 // -----交换机属性----- string exchange_name = 3; ExchangeType exchange_type = 4; // 交换机类型 bool durable = 5; bool autodelete = 6; // 自动删除标志 map<string,string> args = 7; // 扩展参数args }; // 移除交换机请求 message ExchangeDeleteRequest { string rid = 1; string channel_id = 2; string exchange_name = 3; }; // 声明队列请求 message MsgQueueDeclareRequest { string rid = 1; string channel_id = 2; // 信道id--表示请求交给哪个信道处理 string msgqueue_name = 3; bool durable = 4; bool exclusive = 5; bool autodelete = 6; map<string,string> args = 7; }; // 删除队列请求 message MsgQueueDeleteRequest { string rid = 1; string channel_id = 2; string msgqueue_name = 3; }; // 绑定请求 message QueueBindRequest { string rid = 1; string channel_id = 2; string exchange_name = 3; string msgqueue_name = 4; string binding_key = 5; }; // 解绑请求 message QueueUnbindRequest { string rid = 1; string channel_id = 2; string exchange_name = 3; string msgqueue_name = 4; }; // 生产消息请求 message BasicPublishRequest { string rid = 1; string channel_id = 2; string exchange_name = 3; BasicProperties properties = 4; string body = 5; }; // 订阅消息请求 message BasicConsumerRequest { string rid = 1; string channel_id = 2; string msgqueue_name = 3; string consumer_tag = 4; bool auto_ack = 5; }; // 取消订阅请求 message BasicCancelRequest { string rid = 1; string channel_id = 2; string msgqueue_name = 3; string consumer_tag = 4; }; // 消息确认请求 message BasicAckRequest { string rid = 1; string channel_id = 2; string msgqueue_name = 3; string message_id = 4; }; // 服务器向客户端推送消息的格式 message BasicConsumerResponse { string rid = 1; string channel_id = 2; string consumer_tag = 3; BasicProperties properties = 4; string body = 5; }; // 基础应答:其他的请求的响应 message BaseResponse { string rid = 1; // 针对哪个请求的应答 string channel_id = 2; // 请求被哪个信道处理 bool ok = 3; // 请求是否处理成功 };
可以看到,每一个请求/响应都给予了对方需要的数据,例如声明交换机的请求有交换机名称、持久化标志、自动删除标志、模式、其他参数。这个请求由客户端发送到服务器后,服务器就可以使用这些信息声明一个交换机了。
然后将.proto文件变成c++文件:
protoc --cpp_out=./ mq_proto.proto
成功之后会出现两个新的文件:
第三节:信道模块实现
打开mqserver目录,创建mq_channel.hpp文件,打开并将"老三套"写入:
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__
#include <memory>
#include <mutex>
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include "../mqcommon/mq_proto.pb.h"
#include "../mqcommon/mq_threadpool.hpp"
#include "mq_consumer.hpp"
#include "mq_virtualhost.hpp"
#include "mq_route.hpp"
#include "muduo/net/TcpConnection.h"
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
namespace zd
{};
#endif
3-1.类型别名定义
使用using定义一些别名,方便使用:
using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>; using ChannelOpenRequestPtr = std::shared_ptr<ChannelOpenRequest>; using ChannelCloseRequestPtr = std::shared_ptr<ChannelCloseRequest>; using ExchangeDeclareRequestPtr = std::shared_ptr<ExchangeDeclareRequest>; using ExchangeDeleteRequestPtr = std::shared_ptr<ExchangeDeleteRequest>; using MsgQueueDeclareRequestPtr = std::shared_ptr<MsgQueueDeclareRequest>; using MsgQueueDeleteRequestPtr = std::shared_ptr<MsgQueueDeleteRequest>; using QueueBindRequestPtr = std::shared_ptr<QueueBindRequest>; using QueueUnbindRequestPtr = std::shared_ptr<QueueUnbindRequest>; using BasicPublishRequestPtr = std::shared_ptr<BasicPublishRequest>; using BasicAckRequestPtr = std::shared_ptr<BasicAckRequest>; using BasicConsumerRequestPtr = std::shared_ptr<BasicConsumerRequest>; using BasicCancelRequestPtr = std::shared_ptr<BasicCancelRequest>;
ProtobufCodec是proto的协议处理器,使用它send数据时,会自动将数据序列化并加上应用层报头。
下面就是第二节定义的各种请求类型的智能指针。
3-2.Channel类
要管理信道首先得要有信道,class Channel要包含信道的信息:
class Channel { public: using ptr = std::shared_ptr<Channel>; Channel(const std::string& id, const VirtualHost::ptr& vhp, const QueueConsumerManger::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr& conn, const threadpool::ptr& pool): _channel_id(id),_vhp(vhp),_cmp(cmp), _codec(codec),_conn(conn),_pool(pool) {} private: std::string _channel_id; // 信道唯一id Consumer::ptr _consumer; // 信道关联的消费者 muduo::net::TcpConnectionPtr _conn; // 信道使用的连接 ProtobufCodecPtr _codec; // protobuf协议处理器 QueueConsumerManger::ptr _cmp; // 消费者管理句柄 VirtualHost::ptr _vhp; // 虚拟机管理句柄 threadpool::ptr _pool; // 线程池 };
信道还需要提供各种业务,使用请求的给予的信息调用管理句柄的对应接口即可。
每个请求完成后都需要给客户端发送响应,所以先把响应业务实现了:
void basicResponse(bool ret,const std::string& rid,const std::string& cid) { BaseResponse br; br.set_rid(rid); br.set_channel_id(cid); br.set_ok(ret); _codec->send(_conn,br); }
交换机管理:
// 声明交换机 bool declareExchange(const ExchangeDeclareRequestPtr& req) { // 使用虚拟机声明交换机 bool ret = _vhp->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->autodelete(), req->args()); // 发送响应 basicResponse(ret,req->rid(),req->channel_id()); return ret; } // 移除交换机 void destoryExchange(const ExchangeDeleteRequestPtr& req) { _vhp->deleteExchange(req->exchange_name()); basicResponse(true,req->rid(),req->channel_id()); }
队列管理:
// 声明队列 void declareMsgQueue(const MsgQueueDeclareRequestPtr& req) { // 先声明队列 bool ret = _vhp->declareMsgQueue(req->msgqueue_name(), req->durable(), req->exclusive(), req->autodelete(), req->args()); if(ret == false) basicResponse(ret,req->rid(),req->channel_id()); // 再初始化队列的消息管理句柄 _cmp->initQueueConsumer(req->msgqueue_name()); // 发送响应 basicResponse(ret,req->rid(),req->channel_id()); } // 移除队列 void destoryMsgQueue(const MsgQueueDeleteRequestPtr& req) { // 移除队列消息管理句柄 _cmp->destoryQueueConsumer(req->msgqueue_name()); // 移除队列 _vhp->deleteMsgQueue(req->msgqueue_name()); basicResponse(true,req->rid(),req->channel_id()); }
绑定管理:
// 绑定 void queueBind(const QueueBindRequestPtr& req) { bool ret = _vhp->bind(req->exchange_name(),req->msgqueue_name(),req->binding_key()); basicResponse(ret,req->rid(),req->channel_id()); } // 解绑 void queueUnBind(const QueueUnbindRequestPtr& req) { _vhp->unBind(req->exchange_name(),req->msgqueue_name()); basicResponse(true,req->rid(),req->channel_id()); }
消息管理:
// 发布消息 void basicPublish(const BasicPublishRequestPtr& req) { // 0.验证routing_key的合法性 bool ret = Router::isLegalRoutingkey(req->properties().routing_key()); if(ret == false) return basicResponse(false,req->rid(),req->channel_id());; // 1.先检查交换机存在 ret = _vhp->exchangeExists(req->exchange_name()); if(ret == false) return basicResponse(false,req->rid(),req->channel_id());; // 2.获取交换机信息 Exchange::ptr ep = _vhp->getOneExchange(req->exchange_name()); // 3.获取交换机绑定信息 MsgQueueBindingMap epb = _vhp->exchangeBindings(req->exchange_name()); std::string routing_key; if(req->has_properties()) // properties 不为空 routing_key = req->properties().routing_key(); // 4.交换路由,给匹配成功的队列添加消息 for(const auto& it:epb) { ret = Router::route(ep->type,routing_key,it.second->binding_key); if(ret == true) { _vhp->basicPublish(it.first,req->mutable_properties(),req->body()); // 向线程池添加一个消费任务(向队列的订阅者推送消息) auto task = std::bind(&Channel::consume,this,it.first); _pool->push(task,this,it.first); } } basicResponse(true,req->rid(),req->channel_id()); } // 确认/删除消息 void basicAck(const BasicAckRequestPtr& req) { _vhp->basicAck(req->msgqueue_name(),req->message_id()); basicResponse(true,req->rid(),req->channel_id()); }
发布消息后,信道要把消息发布给队列的订阅者(就是服务器消费者的回调函数),这个任务交给线程池完成,信道继续它的执行:
// 线程池的执行任务 void consume(const std::string& qname) { // 1.从队列中取出一条消息 MessagePtr msgp = _vhp->basicConsume(qname); if(msgp.get() == nullptr) { LOG("队列 %s 推送消息失败,没有对应的消息",qname.c_str()); return; } // 2.从队列消费者中取出一个消费者 Consumer::ptr consumer = _cmp->choose(qname); if(consumer.get() == nullptr) { LOG("队列 %s 推送消息失败,队列没有消费者",qname.c_str()); return; } // 3.调用该消费者的消息处理回调函数(就是把消息给客户端) consumer->callback(consumer->tag,msgp->mutable_payload()->mutable_properties(),msgp->payload().body()); // 4.如果消费者是自动确认,直接删除消息 if(consumer->auto_ack == true) { _vhp->basicAck(qname,msgp->payload().properties().id()); } }
订阅处理:
// 订阅队列 void basicConsume(const BasicConsumerRequestPtr& req) { bool ret = _vhp->msgqueueExists(req->msgqueue_name()); if(ret == false) return basicResponse(false,req->rid(),req->channel_id()); // 创建队列的消费者 auto cb = std::bind(&Channel::consumerCallback,this,req->rid(),std::placeholders::_1,std::placeholders::_2,std::placeholders::_3); // 订阅之后,这个channel就是消费者 _consumer = _cmp->create(req->msgqueue_name(),req->consumer_tag(),req->auto_ack(),cb); basicResponse(true,req->rid(),req->channel_id()); } // 取消订阅 void basicCancel(const BasicCancelRequestPtr& req) { _cmp->remove(req->msgqueue_name(),req->consumer_tag()); }
订阅之后,这个信道对于服务器来说就变成消费者了,之前说过这个消费者的消息处理回调函数是固定的——把消息推送给客户端,所以再实现一下它的回调函数:
// 消费者channel的消息处理回调函数 void consumerCallback(const std::string& rid,const std::string& tag,const BasicProperties* bp,const std::string& body) { // 将消息组织成响应发送给对应的客户端--客户端才是真正的消费者 BasicConsumerResponse resp; resp.set_rid(rid); resp.set_channel_id(_channel_id); resp.set_body(body); resp.set_consumer_tag(tag); if(bp) { resp.mutable_properties()->set_id(bp->id()); resp.mutable_properties()->set_delivery_mode(bp->delivery_mode()); resp.mutable_properties()->set_routing_key(bp->routing_key()); } _codec->send(_conn,resp); }
析构函数,如果信道是一个消费者,那么在信道关闭时把关联的消费者从消费者模块中移除:
~Channel() { if(_consumer.get() != nullptr) { _cmp->remove(_consumer->qname,_consumer->tag); } }
3-3.ChannelManager类
class ChannelManager是信道的管理类,对于每个连接都私有自己的信道管理,这样服务器收到某个连接的请求后,就可以直接在这个连接的信道找具体是哪个信道的请求。
先定义它需要的成员变量:
class ChannelManager { public: using ptr = std::shared_ptr<ChannelManager>; private: std::mutex _mtx; std::unordered_map<std::string,Channel::ptr> _channels; // 信道管理模块 threadpool::ptr _pool; };
它只对外提供3个接口。
创建信道:
当服务器收到创建信道的请求后就会调用它。
// 创建/打开一个信道 bool ChannelOpen( const std::string& id, const VirtualHost::ptr& vhp, const QueueConsumerManger::ptr& cmp, const ProtobufCodecPtr& codec, const muduo::net::TcpConnectionPtr& conn, const threadpool::ptr& pool ) { std::unique_lock<std::mutex> lock(_mtx); auto it = _channels.find(id); if(it != _channels.end()) { return false; } Channel::ptr channel = std::make_shared<Channel>(id,vhp,cmp,codec,conn,pool); _channels.insert(std::make_pair(id,channel)); return true; }
关闭信道:
// 删除/关闭一个信道 void ChannelClose(const std::string& id) { std::unique_lock<std::mutex> lock(_mtx); _channels.erase(id); }
获取信道:
// 获取一个信道 Channel::ptr getOneChannel(const std::string& id) { std::unique_lock<std::mutex> lock(_mtx); auto it = _channels.find(id); if(it == _channels.end()) return nullptr; return it->second; }
至此服务器的信道管理模块就完成了,这个模块也不好进行测试,只能测试一下信道的创建和关闭这种简单的接口。
下期预告:
之后是连接管理模块的实现,这个模块很简单。