Json-RPC项目框架(二)
目录
1. 项目实现;
1. 项目实现:
1.1 通信抽象实现:
(1) BaseMessage: 主要实现对消息处理;
主要包含设置和获取ID, 设置类型和获取类型, 消息检查, 以及序列化和反序列化操作.
class BaseMessage
{
public:
//大家需要的功能先实现;
using ptr = std::shared_ptr<BaseMessage>;//定义一个指针;
virtual ~BaseMessage(){}
virtual void setId(const std::string& id)
{
_rid = id;
}
virtual std::string rid()
{
return _rid;
}
virtual void setMType(MType mtype)
{
_mtype = mtype;
}
virtual MType mtype()
{
return _mtype;
}
//序列化;
virtual std::string serialize() = 0;
//反序列化;
virtual bool unserialize(const std::string& msg) = 0;
//对消息进行检验;
virtual bool check() = 0;
private:
MType _mtype;//消息类型
std::string _rid;//id号
};
(2) BaseBuffer缓冲区抽象:
主要是查看缓冲区剩余大小, 读取数据, 删除数据. 读取全部字符串的操作.
//缓冲区的抽象, 缓冲区大小, 读取数据, 删除数据, 读取并删除数据;
class BaseBuffer
{
public:
using ptr = std::shared_ptr<BaseBuffer>;//定义一个指针;
virtual size_t readableSize() = 0;//缓冲区可使用大小
virtual int32_t peekInt32() = 0;//读取前4字节数据;
virtual void retriveInt32() = 0;//删除前4字节数据;
virtual int32_t readInt32() = 0;//读取并且删除前4字节数据.
virtual std::string retrieveAsString(size_t len) = 0;
};
(3) BaseProtocol: 主要实现判断缓冲区是否可以参数具体数据大小, 对接收数据处理, 以及序列化接收到的数据.
class BaseProtocal
{
public:
using ptr = std::shared_ptr<BaseProtocal>;
virtual bool canProcessed(const BaseBuffer::ptr& buf) = 0;//可以进行传输数据的大小s;
virtual bool onMessage(const BaseBuffer::ptr& buf, BaseMessage::ptr& msg) = 0;//接收数据进行处理.
virtual std::string serialize(const BaseMessage::ptr& msg) = 0;//发送数据前进行序列化
};
(4) BaseConnection: 连接状态, 连接终止, 连接发送数据操作.
//连接, 发送数据, 关闭, 连接状态.
class BaseConnection
{
public:
using ptr = std::shared_ptr<BaseConnection>;
virtual void send(const BaseMessage::ptr& msg) = 0;
virtual void shutdown() = 0;
virtual bool connected() = 0;
};
(5) Server: 服务器端, 使用到包装器将Connenction和Close以及Message这三个回调进行封装. 服务器直接调用这些包装器即可.
using ConnectionCallback = std::function<void(const BaseConnection::ptr&)>;
using CloseCallback = std::function<void(const BaseConnection::ptr&)>;
using MessageCallback = std::function<void(const BaseConnection::ptr&, BaseMessage::ptr&)>;
//服务器; 连接回调, 关闭回调, 信息回调.
class BaseServer
{
public:
using ptr = std::shared_ptr<BaseServer>;
virtual void setConnectionCallback(const ConnectionCallback& cb)
{
_cb_connection = cb;
}
virtual void setCloseCallback(const CloseCallback& cb)
{
_cb_close = cb;
}
virtual void setMessageCallback(const MessageCallback& cb)
{
_cb_message = cb;
}
virtual void start() = 0;
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
(6) client: 客户端也是封装使用包装器将Connenction和Close以及Message这三个回调, 类就是连接处理, 连接关闭, 发送消息, 连接状态的实现.(这部分在继承类中纯虚函数里面实现.
//客户端, 连接回调, 关闭回调, 信息回调, 连接, 关闭, 发送, 连接状态.
class BaseClient
{
public:
using ptr = std::shared_ptr<BaseClient>;
virtual void setConnectionCallback(const ConnectionCallback& cb)
{
_cb_connection = cb;
}
virtual void setCloseCallback(const CloseCallback& cb)
{
_cb_close = cb;
}
virtual void setMessageCallback(const MessageCallback& cb)
{
_cb_message = cb;
}
virtual void connect() = 0;
virtual void shutdown() = 0;
virtual bool send(const BaseMessage::ptr& msg) = 0;
virtual BaseConnection::ptr connection() = 0;
virtual bool connected() = 0;
protected:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
1.2 消息抽象实现:
(1) JsonMessage: 进行序列化和反序列化操作. 继承上面BaseMessage进行多态对序列化和反序列化操作..
//序列化信息, 继承BaseMessage再进行重写;
class JsonMessage : public BaseMessage
{
public:
using ptr = std::shared_ptr<JsonMessage>;
//序列化;
virtual std::string serialize() override
{
std::string body;
bool ret = JSON::serialize(_body, body);
if(ret == false)
return std::string();
return body;
}
//反序列化;
virtual bool unserialize(const std::string& msg) override
{
return JSON::unserialize(msg, _body);
}
protected:
Json::Value _body;
};
(2) JsonRequest: 对上面进行处理Json数据进行请求,
//Json请求, 继承JsonMessage;
class JsonRequest : public JsonMessage
{
public:
using ptr = std::shared_ptr<JsonRequest>;
};
(3) JsonResponse: 处理的数据检查一下响应码以及响应码类型是否正确, 然后就是设置响应码或者返回响应码的操作.
//Json响应, 返回响应码, 设置响应码;
class JsonResponse : public JsonMessage
{
public:
using ptr = std::shared_ptr<JsonResponse>;
//对消息进行检验;
virtual bool check() override
{
//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.
if(_body[KEY_RCODE].isNull() == true)
{
ELOG("响应中没有响应状态码! ");
return false;
}
if(_body[KEY_RCODE].isIntegral() == false)
{
ELOG("响应状态码类型错误! ");
return false;
}
return true;
}
virtual RCode rcode()
{
return (RCode)_body[KEY_RCODE].asInt();
}
virtual void setRCode(RCode rcode)
{
_body[KEY_RCODE] = (int)rcode;
}
};
(4) RpcRequest: 继承JsonRequest, 主要是检查请求方法名称是否存在以及是否类型正确,
其次就是请求参数进行检查, 接着就是对请求方法以及请求参数的返回或者处理.
//rpc请求, 返回和设置请求方法, 返回和设置请求参数,
class RpcRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<RpcRequest>;
virtual bool check() override
{
//rpc请求中, 包含请求方法名称, 请求参数 对象;
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false)
{
ELOG("RPC请求中没有请求方法名称或请求类型错误 ");
return false;
}
if(_body[KEY_PARAMS].isNull() == true ||
_body[KEY_PARAMS].isObject() == false)
{
ELOG("RPC中没有请求参数或请求参数类型错误! ");
return false;
}
return true;
}
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string& method_name)
{
_body[KEY_METHOD] = method_name;
}
Json::Value params()
{
return _body[KEY_PARAMS];
}
void setparams(const Json::Value& params)
{
_body[KEY_PARAMS] = params;
}
};
(5) TopicRequest: 主题的请求处理, 继承JsonRequest, 里面check主要对主题名称, 主题操作类型进行处理, 以及在主题操作类型是TOPIC_PUBLISH时候但是主题信息不存在或错误的处理, 其次就是对主题名称, 主题操作类型以及主题消息进行返回或者设置.
//主题请求, 返回和设置主题请求名称, 返回和设置主题操作类型;
// 返回主题信息;
class TopicRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<TopicRequest>;
virtual bool check() override
{
//rpc请求中, 包含请求方法名称, 请求参数 对象;
if(_body[KEY_TOPIC_KEY].isNull() == true ||
_body[KEY_TOPIC_KEY].isString() == false)
{
ELOG("主题请求中没有主题名称或主题名称类型错误! ");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("主题请求中没有操作类型或操作类型错误! ");
return false;
}
//判断主题为发布主题.
if(_body[KEY_OPTYPE].asInt() == (int)TopicOpType::TOPIC_PUBLISH
&& (_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false))
{
ELOG("主题消息发布请求中没有消息内容字段或者消息内容字段错误");
return false;
}
return true;
}
std::string topicKey()
{
return _body[KEY_TOPIC_KEY].asString();
}
void setTopicKey(const std::string& key)
{
_body[KEY_TOPIC_KEY] = key;
}
TopicOpType Optype()
{
return (TopicOpType)_body[KEY_OPTYPE].asInt();
}
void setOptype(TopicOpType optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
std::string topicMsg()
{
return _body[KEY_TOPIC_MSG].asString();
}
void setTopicMsg(const std::string& msg)
{
_body[KEY_TOPIC_MSG] = msg;
}
};
(6) ServiceRequest: 服务器请求处理, 检查请求方法和请求类型是否正确, 确定不是请求发现的前提下进行判断服务的主机信息. 对请求方法以及请求类型以及主机信息进行返回以及设置.
//服务器请求, 请求方法, 请求类型,
typedef std::pair<std::string, int> Address;
class ServiceRequest : public JsonRequest
{
public:
using ptr = std::shared_ptr<ServiceRequest>;
virtual bool check() override
{
if(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false)
{
ELOG("服务器请求中没有方法名称或方法名称类型错误!! ");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("服务器请求中没有操作类型或操作类型错误! ");
return false;
}
//请求类型不是服务发现, 服务主机信息.
if(_body[KEY_OPTYPE].asInt() != (int)(ServiceOpType::SERVICE_DISCOVERY) &&
(_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isObject() == false ||
(_body[KEY_HOST][KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST][KEY_HOST_IP].isString() == false) ||
(_body[KEY_HOST][KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST][KEY_HOST_PORT].isIntegral()) == false))
{
ELOG("服务请求中主机地址错误! ");
return false;
}
return true;
}
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string& name)
{
_body[KEY_METHOD] = name;
}
ServiceOpType Optype()
{
return (ServiceOpType)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOpType optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
//返回主机信息;
Address host()
{
Address addr;
addr.first = _body[KEY_HOST][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][KEY_HOST_PORT].asInt();
return addr;
}
//设置主机信息;
void setHost(const Address& host)
{
Json::Value val;
val[KEY_HOST_IP] = host.first;
val[KEY_HOST_PORT] = host.second;
_body[KEY_HOST] = val;
}
};
(7) Rpc响应: 继承JosnResponse; 检查响应码和响应结果, 以及返回响应结果.
//RPC响应:
class RpcResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check() override
{
//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false)
{
ELOG("响应中没有响应状态码或响应状态码类型错误! ");
return false;
}
if(_body[KEY_RESULT].isNull() == true)
{
ELOG("响应中没有响应结果或者结果类型错误! ");
return false;
}
return true;
}
Json::Value result()
{
return _body[KEY_RESULT];
}
void setResult(const Json::Value& result)
{
_body[KEY_RESULT] = result;
}
};
(8) TopicResponse主题响应:
//主题响应,
class TopicResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
(9) ServiceResponse服务器响应: 检查响应码以及操作类型, 如果操作类型是服务查询还要检查请求方法以及主机信息是否正确. 以及对方法还有操作类型以及主机信息进行返回以及设置.
//服务器响应,
class ServiceResponse : public JsonResponse
{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check() override
{
//大部分响应只有响应状态码; 只需要判断响应状态码存在和正确即可.
if(_body[KEY_RCODE].isNull() == true ||
_body[KEY_RCODE].isIntegral() == false)
{
ELOG("响应中没有响应状态码或响应状态码类型错误! ");
return false;
}
if(_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("响应中没有操作类型或者操作类型错误! ");
return false;
}
if(_body[KEY_OPTYPE].asInt() == (int)(ServiceOpType::SERVICE_DISCOVERY) &&
(_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false ||
_body[KEY_HOST].isNull() == true ||
_body[KEY_HOST].isArray() == false))
{
ELOG("服务响应中响应消息字段错误! ");
return false;
}
return true;
}
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string& method)
{
_body[KEY_METHOD] = method;
}
ServiceOpType Optype()
{
return (ServiceOpType)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOpType optype)
{
_body[KEY_OPTYPE] = (int)optype;
}
void setHost(std::vector<Address> addrs)
{
for(auto& addr : addrs)
{
Json::Value val;
val[KEY_HOST_IP] = addr.first;
val[KEY_HOST_PORT] = addr.second;
_body[KEY_HOST].append(val);
}
}
std::vector<Address> Hosts()
{
std::vector<Address> addrs;
int sz = _body[KEY_HOST].size();
for(int i = 0; i < sz; i++)
{
Address addr;
addr.first = _body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second = _body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
};
(10) MessageFactory: 对消息对象进行封装, 最上面的RpcRequest/RpcResponse以及TopicRequest/TopicResponse以及ServiceRequest以及ServiceResponse进行封装.
//实现消息对象的生产工厂;
class MessageFactory {
public:
static BaseMessage::ptr create(MType mtype)
{
switch(mtype) {
case MType::REQ_RPC : return std::make_shared<RpcRequest>();
case MType::RSP_RPC : return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC : return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC : return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE : return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE : return std::make_shared<ServiceResponse>();
}
return BaseMessage::ptr();
}
//不定参数模板, forward是进行万能引用, 保持对象的性质(左值或右值).
template<typename T, typename ...Args>
static std::shared_ptr<T> create(Args&& ...args) {
return std::make_shared<T>(std::forward(args)...);
}
};
1.3 通信-Muduo封装实现:
(1) MuduoBuffer: 继承BaseBuffer, 纯虚函数实现父类的接口, 直接调用Muduo库里面的接口即可完成.
#pragma once
#include <functional>
#include "detail.hpp"
#include "fields.hpp"
#include "abstract.hpp"
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
#include <muduo/net/TcpServer.h>
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
#include <muduo/base/CountDownLatch.h>
#include <muduo/net/EventLoopThread.h>
#include <utility>
#include <mutex>
#include <unordered_map>
namespace RPC
{
class MuduoBuffer : public BaseBuffer
{
public:
using ptr = std::shared_ptr<MuduoBuffer>; // 定义一个指针;
MuduoBuffer(muduo::net::Buffer *buf)
: _buf(buf)
{
}
virtual size_t readableSize() override
{
return _buf->readableBytes();
}
virtual int32_t peekInt32() override
{
// muduo库是一个网络库, 从缓冲区取出4字节整形,
// 会进行网络字节序转换.
return _buf->peekInt32();
}
virtual void retriveInt32() override
{
return _buf->retrieveInt32();
}
virtual int32_t readInt32() override
{
return _buf->readInt32();
}
virtual std::string retrieveAsString(size_t len) override
{
return _buf->retrieveAsString(len);
}
private:
muduo::net::Buffer *_buf;
};
}
(2) BufferFactory: 将MuduoBuffer进行工厂类的封装.
class BufferFactory
{
public:
template <typename ...Args>
static BaseBuffer::ptr create(Args&& ...args)
{
return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);
}
};
(3) LVProtocol: 继承实现BaseProtocall里面的接口, lenFieldsLength是数据长度, mtypeFieldsLength是类型长度, idlenFieldsLength是ID字段长度.
对数据是否可以处理, 足不足够进行处理, 算出总长度, 类型, id长度, 正文长度, 找到类型以及id, 接着就是对消息进行处理, 调用对应的消息函数, 进行反序列化, 以及设置id和类型.
其中也提供序列化操作:
根据LV规则, 读取数据以及长度, 包装成LV格式.
// 判断缓存区数据是否可以进行一条数据的处理;
class LVProtocol : public BaseProtocal
{
public:
//|--len--|--VALUE--|
//|--len--|--mtype--|--idLen--|--id--|--body--|
using ptr = std::shared_ptr<LVProtocol>;
virtual bool canProcessed(const BaseBuffer::ptr &buf) override
{
if(buf->readableSize() < lenFieldsLength)
return false;
int32_t total_len = buf->peekInt32();
DLOG("total_len:%d", total_len);
if (buf->readableSize() < (total_len + lenFieldsLength))
{
return false;
}
return true;
}
virtual bool onMessage(const BaseBuffer::ptr &buf, BaseMessage::ptr &msg) override
{
// 调用onmessage时候, 默认可以进行处理一条数据;
int32_t total_len = buf->readInt32();
MType mtype = (MType)buf->readInt32();
int32_t idlen = buf->readInt32();
int32_t body_len = total_len - idlenFieldsLength - idlen - mtypeFieldsLength;
std::string id = buf->retrieveAsString(idlen);
std::string body = buf->retrieveAsString(body_len);
msg = MessageFactory::create(mtype);
if (msg.get() == nullptr)
{
ELOG("消息类型错误, 构造消息对象失败!");
return false;
}
bool ret = msg->unserialize(body);
if (ret == false)
{
ELOG("消息正文反序列失败!");
return false;
}
msg->setId(id);
msg->setMType(mtype);
return true;
}
virtual std::string serialize(const BaseMessage::ptr& msg) override
{
//|--len--|--mtype--|--idLen--|--id--|--body--|
std::string body = msg->serialize();
std::string id = msg->rid();
int32_t idlen = htonl(id.size());
auto mtype = htonl((int32_t)msg->mtype());
int32_t h_total_len = mtypeFieldsLength + mtypeFieldsLength + id.size() + body.size();
int32_t n_total_len = htonl(h_total_len);
DLOG("h_total_len:%d", h_total_len);
std::string result;
result.reserve(h_total_len);
result.append((char *)&n_total_len, lenFieldsLength);
result.append((char *)&mtype, mtypeFieldsLength);
result.append((char *)&idlen, idlenFieldsLength);
result.append(id);
result.append(body);
return result;
}
private:
const size_t lenFieldsLength = 4;
const size_t mtypeFieldsLength = 4;
const size_t idlenFieldsLength = 4;
};
(4) ProtocolFactory, 进行封装BaseProtocol工厂类模式.
class ProtocolFactory
{
public:
template <typename ...Args>
static BaseProtocal::ptr create(Args&& ...args)
{
return std::make_shared<LVProtocol>(std::forward<Args>(args)...);
}
};
(5) MuduoConnection: 继承BaseConnection, 直接调用Muduo库里里面的接口即可实现构造函数, 发送数据, 关闭连接, 连接状态的实现.
class MuduoConnection : public BaseConnection
{
public:
using ptr = std::shared_ptr<MuduoConnection>;
MuduoConnection(const muduo::net::TcpConnectionPtr &conn,
BaseProtocal::ptr &protocol)
: _protocol(protocol), _conn(conn)
{
}
virtual void send(const BaseMessage::ptr &msg) override
{
std::string body = _protocol->serialize(msg);
_conn->send(body);
}
virtual void shutdown() override
{
_conn->shutdown();
}
virtual bool connected() override
{
_conn->connected();
}
private:
BaseProtocal::ptr _protocol;
muduo::net::TcpConnectionPtr _conn;
};
class ConnectionFactory
{
public:
template <typename ...Args>
static BaseConnection::ptr create(Args&& ...args)
{
return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);
}
};
(6) MuduoServer: 继承BaseServer, onConnection进行连接管理, 简历连接成功需要标记一下连接的消息, 使用unordered_map解决的, 连接断开是需要从unordered_map里面移除的.
onMessage: 判断数据是否足够处理, 可能数据过大也要判断, 判断完发送到缓冲区, 接着进行连接管理.
class MuduoServer : public BaseServer
{
public:
using ptr = std::shared_ptr<MuduoServer>;
MuduoServer(int port)
: _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port),
"MuduoServer", muduo::net::TcpServer::kReusePort),
_protocol(ProtocolFactory::create())
{
}
virtual void start() override
{
_server.setConnectionCallback(std::bind(&MuduoServer::onConnection, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&MuduoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_server.start(); // 打开服务器机械能监听;
_baseloop.loop(); // 开始死循环事务监听;
}
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
std::cout << "链接建立成功!" << std::endl;
auto muduo_conn = ConnectionFactory::create(conn, _protocol);
{
std::unique_lock<std::mutex> lock(_mutex);
_conns.insert(std::make_pair(conn, muduo_conn));
}
if (_cb_connection)
_cb_connection(muduo_conn);
}
else
{
std::cout << "链接断开" << std::endl;
BaseConnection::ptr muduo_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it == _conns.end())
{
return;
}
muduo_conn = it->second;
_conns.erase(conn);
}
if (_cb_close)
_cb_close(muduo_conn);
}
}
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp)
{
DLOG("连接有数据量到来, 开始处理! ")
auto base_buf = BufferFactory::create(buf);
while (1)
{
if (_protocol->canProcessed(base_buf) == false)
{
// 数据不足处理;
if (base_buf->readableSize() > maxDatasize)
{
conn->shutdown();
ELOG("缓冲区中数据过大! ");
return;
}
DLOG("数据量不足! ");
break;
}
DLOG("缓冲区中数据可处理! ")
BaseMessage::ptr msg;
bool ret = _protocol->onMessage(base_buf, msg);
if (ret == false)
{
conn->shutdown();
ELOG("缓冲区数据错误! ");
return;
}
DLOG("消息反序列化成功! ")
BaseConnection::ptr base_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it = _conns.find(conn);
if (it == _conns.end())
{
conn->shutdown();
return;
}
base_conn = it->second;
}
DLOG("调用回调函数进行消息处理! ");
if (_cb_message)
_cb_message(base_conn, msg);
}
}
private:
const size_t maxDatasize = (1 << 16);
BaseProtocal::ptr _protocol;
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
std::mutex _mutex;
std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;
};
class ServerFactory
{
public:
template <typename ...Args>
static BaseServer::ptr create(Args&& ...args)
{
return std::make_shared<MuduoServer>(std::forward<Args>(args)...);
}
};
(7) MuduoClient: 继承BaseClient, onConnection连接管理调用上面实现的ConnectionFactory, onMessage和服务器一样.
class MuduoClient : public BaseClient
{
public:
using ptr = std::shared_ptr<MuduoClient>;
MuduoClient(const std::string &sip, int sport)
: _downlatch(1), // 初始化计数为1.
_protocol(ProtocolFactory::create()),
_baseloop(_loopthread.startLoop()),
_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient")
{
}
virtual void connect() override
{
DLOG("设置回调函数, 连接服务器! ");
_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));
_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_client.connect();
_downlatch.wait();
DLOG("连接服务器成功! ");
}
virtual void shutdown() override
{
return _client.disconnect();
}
virtual bool send(const BaseMessage::ptr & msg) override
{
if(connected() == false)
{
ELOG("连接已断开! ");
return false;
}
_conn->send(msg);
return true;
}
virtual BaseConnection::ptr connection() override
{
return _conn;
}
virtual bool connected() override
{
return (_conn && _conn->connected());
}
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
std::cout << "链接建立成功!" << std::endl;
_downlatch.countDown(); // 计数为0被唤醒.
_conn = ConnectionFactory::create(conn, _protocol);
}
else
{
std::cout << "链接断开" << std::endl;
_conn.reset();
}
}
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp)
{
DLOG("连接有数据到来, 开始处理! ");
auto base_buf = BufferFactory::create(buf);
while (1)
{
if (_protocol->canProcessed(base_buf) == false)
{
// 数据不足处理;
if (base_buf->readableSize() > maxDatasize)
{
conn->shutdown();
ELOG("缓冲区中数据过大! ");
return;
}
DLOG("数据量不足! ");
break;
}
DLOG("缓冲区中数据可处理! ")
BaseMessage::ptr msg;
bool ret = _protocol->onMessage(base_buf, msg);
if (ret == false)
{
conn->shutdown();
ELOG("缓冲区数据错误! ");
return;
}
DLOG("缓冲区中数据解析完毕, 调用回调函数进行处理! ")
if (_cb_message)
_cb_message(_conn, msg);
}
}
protected:
const size_t maxDatasize = (1 << 16);
BaseProtocal::ptr _protocol;
BaseConnection::ptr _conn;
muduo::CountDownLatch _downlatch;
muduo::net::EventLoopThread _loopthread;
muduo::net::EventLoop *_baseloop;
muduo::net::TcpClient _client;
};
class ClientFactory
{
public:
template <typename ...Args>
static BaseClient::ptr create(Args&& ...args)
{
return std::make_shared<MuduoClient>(std::forward<Args>(args)...);
}
};