C++ Json-Rpc框架-3项目实现(1)(1.其它函数实现2.消息类型字段定义3.消息Message/通信Muduo抽象具象实现)
项⽬实现
一:常⽤的零碎功能接⼝类实现
1.简单⽇志宏实现
意义:快速定位程序运⾏逻辑出错的位置。
项⽬在运⾏中可能会出现各种问题,出问题不可怕,关键的是要能找到问题,并解决问题。
解决问题的⽅式:
• gdb调试:逐步调试过于繁琐,缓慢。主要⽤于程序崩溃后的定位。
• 系统运⾏⽇志分析:在任何程序运⾏有可能逻辑错误的位置进⾏输出提⽰,快速定位逻辑问题的位置。
#include<stdio.h>
#include<time.h>
//日志等级
#define LDBG 0 //输出调试
#define LINF 1 //输出信息
#define LERR 2 //输出错误
//大于等于默认日志等级才能输出
#define LDEFAULT LINF
//1.time(NULL)获取时间戳 2.localtime()转化为当地时间 3.strftime()格式化时间戳
#define LOG(level, format, ...)\
do{ \
if(level>=LDEFAULT){\
time_t t=time(NULL);\
struct tm *lt =localtime(&t);\
char time_tmp[32]={0};\
strftime(time_tmp,sizeof(time_tmp)-1,"%m-%d %T",lt);\
fprintf(stdout,"[%s][%s:%d] " format "\n",time_tmp,__FILE__,__LINE__,##__VA_ARGS__);\
}\
} while (0);
#define DLOG(format,...) LOG(LDBG,format,##__VA_ARGS__);
#define TLOG(format,...) LOG(LINF,format,##__VA_ARGS__);
#define ELOG(format,...) LOG(LERR,format,##__VA_ARGS__);
int main()
{
TLOG("%d-%s",1,"dwad");
TLOG("dwad");
// [03-21 18:38:48][test.c:28] 1-dwad
// [03-21 18:38:48][test.c:29] dwad
}
#define LOG(level, format, ...)\
do{ \
1...)代表可变参数
2.\表示当前行和下一行是连一块的 (定义宏只能在同一行)
3.do{}while(0) 封装宏定义中的多行代码,确保宏的语法正确性
解释
##__VA_ARGS__
__VA_ARGS__
是 C 语言宏的可变参数,它允许宏接受不定数量的参数。##
用于处理 "参数为空" 的情况,它的作用是:
- 如果
__VA_ARGS__
为空,就去掉前面的,
,防止格式错误。- 如果
__VA_ARGS__
有内容,它会正常展开。所以,TLOG("dwad")不用写成TLOG("%s","dwad"),"dwad"直接就可以对应format.
因为##__VA_ARGS__为空会去掉,。
fprintf(stdout, "[%s][%s:%d] " format "\n", time_tmp, __FILE__, __LINE__, ##__VA_ARGS__);
1.fprintf 向文件中写入 stdout表示将日志信息输出到标准输出(终端)
2."[%s][%s:%d] " time_tmp, __FILE__, __LINE__ 打印格式化的时间戳+文件名+当前行号
2.Json序列化和反序列化工具
//2.Json序列化和反序列化工具
class JsonUtil
{
public:
// 实现序列化
static bool serialize(const Json::Value &val, std::string &body)
{
// 先实例化一个工厂类
Json::StreamWriterBuilder wbuilder;
// 来创建序列化器
std::unique_ptr<Json::StreamWriter> write(wbuilder.newStreamWriter());
// 将 JSON 数据输出到字符串
std::stringstream ss;
int re = write->write(val, &ss);
if (re < 0)
{
DLOG("serialize faile");
return false;
}
body = ss.str();
return true;
}
// 实现反序列化
static bool unserialize(std::string &body, Json::Value &val)
{
// body=R"({"姓名":"王伟","年龄":19,"成绩":[10,99.9,6]})";
// 先实例化一个工厂类
Json::CharReaderBuilder crb;
// 来创建序反列化器
std::unique_ptr<Json::CharReader> cr(crb.newCharReader());
std::string errs;
bool ret = cr->parse(body.c_str(), body.c_str() + body.size(), &val, &errs);
if (ret == false)
{
ELOG("json unserizlize faile: %s",errs.c_str());
return false;
}
return true;
}
};
序列化
步骤 | 代码 | 作用 |
---|---|---|
1️⃣ 创建 JSON 数据 | Json::Value root; | 生成 JSON 数据 |
2️⃣ 创建 StreamWriterBuilder (工厂类) | Json::StreamWriterBuilder wbuilder; | 准备 JSON 序列化 |
3️⃣ 通过工厂创建 StreamWriter (序列化器) | std::unique_ptr<Json::StreamWriter> writer(wbuilder.newStreamWriter()); | 生成 JSON 序列化对象 |
4️⃣ 将 JSON 数据写入 std::stringstream | writer->write(root, &ss); | 将 JSON 转换为字符串格式 |
反序列化
步骤 | 代码 | 作用 |
---|---|---|
创建解析器工厂 | Json::CharReaderBuilder crb; | 生成 JSON 解析器 |
创建 JSON 解析器 | std::unique_ptr<Json::CharReader> rc(crb.newCharReader()); | 解析 JSON 字符串 |
解析 JSON | rc->parse(body.c_str(), body.c_str() + body.size(), &val, &errs); | 解析 JSON 并存入 val |
3.UUID
UUID(Universally Unique Identifier), 也叫通⽤唯⼀识别码,通常由32位16进制数字字符组成。
UUID的标准型式包含32个16进制数字字符,以连字号分为五段,形式为8-4-4-4-12的32个字符,如:550e8400-e29b-41d4-a716-446655440000。
在这⾥,uuid⽣成,我们采⽤⽣成8个随机数字,加上8字节序号,共16字节数组⽣成32位16进制字符的组合形式来确保全局唯⼀的同时能够根据序号来分辨数据。
class UUID {
public:
static std::string uuid() {
std::stringstream ss;
//1. 构造一个机器随机数对象
std::random_device rd;
//2. 以机器随机数为种子构造伪随机数对象
std::mt19937 generator (rd());
//3. 构造限定数据范围的对象
std::uniform_int_distribution<int> distribution(0, 255);
//4. 生成8个随机数,按照特定格式组织成为16进制数字字符的字符串
for (int i = 0; i < 8; i++) {
if (i == 4 || i == 6) ss << "-";
ss << std::setw(2) << std::setfill('0') <<std::hex << distribution(generator);
}
ss << "-";
//5. 定义一个8字节序号,逐字节组织成为16进制数字字符的字符串
static std::atomic<size_t> seq(1); // 00 00 00 00 00 00 00 01
size_t cur = seq.fetch_add(1);
for (int i = 7; i >= 0; i--) {
if (i == 5) ss << "-";
ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> (i*8)) & 0xFF);
}
return ss.str();
}
};
17f707e9-6fdc-5c91-0000-000000000001
2edfa77c-c406-b65f-0000-000000000002
e7805f58-791c-c21a-0000-000000000003
3c36ed8c-4a1b-9c55-0000-000000000004
2204500a-26ec-3603-0000-000000000005
c78d1868-ca57-b474-0000-000000000006
0dad43c8-56a8-7264-0000-000000000007
7d7dcc1d-f81f-a2c0-0000-000000000008
ss << std::setw(2) << std::setfill('0') <<std::hex << distribution(generator);
1.std::setw(2) << std::setfill('0') setw(2)设置宽度为2 setfill('0')填充字符为0(默认空格)
2.std::hex 转化为16进制
3.distribution(generator); 生产一个随机数(范围可设置)
static std::atomic<size_t> seq(1);
static 只初始化一次,保证全局唯一,atomic 它是一个线程安全的类型,可以在多线程环境中安全地进行读取和修改。
size_t cur = seq.fetch_add(1);
获取当前值,然后 +1 (原子操作)
ss << std::setw(2) << std::setfill('0') << std::hex << ((cur >> (i*8)) & 0xFF);
cur是获取的8字节序号,eg. 00 00 00 00 00 00 00 01
&0xFF取最低的两个字节 cur >> (i*8) 右移i*8个bit位 即两个字节
<< ((cur >> (i*8)) & 0xFF) 因为i初始化7,所以是倒着把cur放入字符串中(从左向右)
二.项⽬消息类型字段信息定义
#pragma once
#include <unordered_map>
#include <iostream>
#include <string.h>
namespace wws
{
// RPC 相关:
#define KEY_METHOD "method" // JSON-RPC 请求的方法名称
#define KEY_PARAMS "parameters" // JSON-RPC 请求的参数
#define KEY_TOPIC_KEY "topic_key" // 发布-订阅模式下的主题键
#define KEY_TOPIC_MSG "topic_msg" // 发布-订阅模式下的主题消息内容
// 发布-订阅相关:
#define KEY_OPTYPE "optype" // 操作类型,例如订阅、取消订阅等
#define KEY_HOST "host" // 服务器或节点的主机信息
#define KEY_HOST_IP "ip" // 服务器或节点的 IP 地址
// 网络通信相关:
#define KEY_HOST_PORT "port" // 服务器或节点的端口号
#define KEY_RCODE "rcode" // 响应代码(可能表示成功或错误码)
#define KEY_RESULT "result" // JSON-RPC 响应的结果数据
// 消息类型(MType)
enum class MType
{
REQ_RPC = 0, // RPC 请求(调用远程方法)
RSP_RPC, // RPC 响应(返回远程方法的结果)
REQ_TOPIC, // 主题请求(订阅、取消订阅、发布消息等)
RSP_TOPIC, // 主题响应(主题操作的确认或错误返回)
REQ_SERVICE, // 服务请求(注册、发现、上线、下线)
RSP_SERVICE // 服务响应(服务操作的确认或错误返回)
};
// JSON-RPC 错误码(RCode)
enum class RCode
{
RCODE_OK = 0, // 成功处理
RCODE_PARSE_FAILED, // 消息解析失败
RCODE_ERROR_MSGTYPE, // 消息类型错误
RCODE_INVALID_MSG, // 无效消息
RCODE_DISCONNECTED, // 连接已断开
RCODE_INVALID_PARAMS, // 无效的 RPC 参数
RCODE_NOT_FOUND_SERVICE, // 未找到对应的服务
RCODE_INVALID_OPTYPE, // 无效的操作类型
RCODE_NOT_FOUND_TOPIC, // 未找到对应的主题
RCODE_INTERNAL_ERROR // 内部错误
};
// 根据错误码返回错误描述信息
static std::string errReason(RCode code)
{
static std::unordered_map<RCode, std::string> err_map = {
{RCode::RCODE_OK, "成功处理!"},
{RCode::RCODE_PARSE_FAILED, "消息解析失败!"},
{RCode::RCODE_ERROR_MSGTYPE, "消息类型错误!"},
{RCode::RCODE_INVALID_MSG, "无效消息"},
{RCode::RCODE_DISCONNECTED, "连接已断开!"},
{RCode::RCODE_INVALID_PARAMS, "无效的 Rpc 参数!"},
{RCode::RCODE_NOT_FOUND_SERVICE, "没有找到对应的服务!"},
{RCode::RCODE_INVALID_OPTYPE, "无效的操作类型"},
{RCode::RCODE_NOT_FOUND_TOPIC, "没有找到对应的主题!"},
{RCode::RCODE_INTERNAL_ERROR, "内部错误!"}};
auto it = err_map.find(code);
if (it == err_map.end())
{
return "未知错误!";
}
return it->second;
}
// RPC 请求类型(RType)
enum class RType
{
REQ_ASYNC = 0, // 异步请求(不需要返回值)
REQ_CALLBACK // 回调请求(执行后调用回调函数)
};
// 主题(Topic)操作类型(TopicOptype)
enum class TopicOptype
{
TOPIC_CREATE = 0, // 创建主题
TOPIC_REMOVE, // 删除主题
TOPIC_SUBSCRIBE, // 订阅主题
TOPIC_CANCEL, // 取消订阅
TOPIC_PUBLISH // 发布消息
};
// 服务(Service)操作类型(ServiceOptype)
enum class ServiceOptype
{
SERVICE_REGISTRY = 0, // 注册服务
SERVICE_DISCOVERY, // 发现服务
SERVICE_ONLINE, // 服务上线
SERVICE_OFFLINE, // 服务下线
SERVICE_UNKNOW // 未知服务操作
};
}
三.消息/通信抽象实现
1.BaseMessage消息抽象类
负责定义网络消息的通用结构。
方法列表:
mtype()
/setMType()
:消息类型(如 RPC、注册、发布订阅等)。
id()
/setId()
:消息唯一标识符(如 UUID)。
serialize()
/unserialize()
:序列化/反序列化消息体。
check()
:合法性检查。✅ 用途: 提供统一的消息接口,便于支持不同协议(如 JSON、Protobuf)。
#pragma once
#include "fields.hpp"
namespace wws
{
class BaseMessage
{
public:
virtual ~BaseMessage(){};
virtual void setId(const std::string &id)
{_rid=id;}
virtual std::string rid(){return _rid;}
virtual void setMType(MType mtype)
{_mtype=mtype;}
virtual MType mtype(){return _mtype;}
virtual std::string serialize()=0;//返回序列化字符串
virtual bool unserialize(const std::string &msg)=0;//对msg进行反序列化 并将对应字段设置到成员变量中
virtual bool check()=0;//检测字段是否完整
private:
std::string _rid;
MType _mtype;
};
}
2.BaseBuffer
缓冲区抽象类
负责封装底层缓冲区的读取操作。
方法列表:
readableBytes()
:返回当前缓冲区可读的字节数。
peekInt32()
:读取但不消费一个 int32 数据。
readInt32()
:读取并消费一个 int32。
retrieveInt32()
:从缓冲区中移除一个 int32。
retrieveAsString(size_t len)
:读取指定长度的字符串。✅ 用途: 提供统一的缓冲区读取接口。
class BaseBuffer
{
public:
virtual size_t readableSize()=0; //返回当前缓冲区有多少数据
virtual int32_t peekInt32()=0; //读int32大小的数据但不从缓冲区中取出
virtual void retrieveInt32()=0; //从缓冲区中取出int32大小数据
virtual int32_t readInt32()=0;//读取int32大小数据
virtual std::string retrieveAsString(size_t len) =0; //读取len字节大小的数据
};
3.BaseProtocol
协议抽象类
负责封装协议相关逻辑(如解码、编码、分包等)。
方法列表:
onMessage(BaseBufferPtr &, BaseMessagePtr &)
:从缓冲区中解析出完整消息。
serialize(const BaseMessagePtr &)
:将消息序列化为字节流。
canProcessed(const BaseBufferPtr &)
:判断缓冲区中是否存在完整数据包。✅ 用途: 定义协议格式,如我们项目中使用的 LV 协议(length + value)。
class BaseMessage
{
public:
using ptr=std::shared_ptr<BaseMessage>;
...
};
class BaseBuffer
{
public:
using ptr=std::shared_ptr<BaseBuffer>;
...
};
class BaseProtocol
{
public:
virtual bool canProcessed(const BaseBuffer::ptr &buf)=0;//判断缓冲区中是否有完整的报文
virtual bool oMessage(const BaseBuffer::ptr &buf,BaseMessage::ptr &msg)=0;//从缓冲区中解析完整的报文
virtual std::string serialize(const BaseMessage::ptr &msg)=0;//把数据进行序列化 准备应答
};
}
4.BaseConnection
连接抽象类
封装一个 TCP 连接(或其他传输通道)。
方法列表:
send(BaseMessagePtr)
:发送一条消息。
shutdown()
:关闭连接。
connected()
:返回连接状态。✅ 用途: 上层不感知底层 socket,用统一接口控制连接。
class BaseConnection
{
public:
using ptr=std::shared_ptr<BaseConnection>;
virtual void send(const BaseMessage::ptr &msg)=0;//发送消息
virtual void shutdown()=0;//关闭连接
virtual bool connected()=0;//判断连接是否正常
};
}
5.BaseServer
服务端抽象类
封装对外服务的 Server(如 RpcServer、TopicServer)。
方法列表:
setConnectionCallback(...)
:连接建立时的回调。
setCloseCallback(...)
:连接断开时的回调。
setMessageCallback(...)
:收到消息时的回调。
start()
:启动服务端监听。✅ 用途: 建立统一的服务端行为规范。
class BaseServer
{
public:
using ptr =std::shared_ptr<BaseServer>;
virtual void setConnectionCallback(const ConnectionCallback&cb){_cb_connection=cb;}//设置连接建立时的回调
virtual void setCloseCallback(const CloseCallback&cb){_cb_close=cb;}//设置连接关闭时的回调
virtual void setMessageCallback(const MessageCallback&cb){_cb_message=cb;}//设置信息发送时的回调
virtual void start()=0;//运行服务端
private:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
6.BaseClient
客户端抽象类
封装客户端连接功能。
方法列表:
connect()
:连接到服务端。
send(...)
:发送消息。
shutdown()
:关闭连接。
connected()
:是否连接。
connection()
:获取当前连接。✅ 用途: 定义 RPC 客户端、订阅客户端的统一行为。
class BaseClient
{
public:
using ptr =std::shared_ptr<BaseClient>;
virtual void setConnectionCallback(const ConnectionCallback&cb){_cb_connection=cb;}//设置连接建立时的回调
virtual void setCloseCallback(const CloseCallback&cb){_cb_close=cb;}//设置连接关闭时的回调
virtual void setMessageCallback(const MessageCallback&cb){_cb_message=cb;}//设置信息发送时的回调
virtual void connect()=0;//连接服务端
virtual void shutdown()=0;//断开连接
virtual void send(const BaseMessage::ptr&)=0;//发送数据
virtual BaseConnection::ptr conntion()=0; //获取当前连接
virtual bool connected()=0; //判断连接是否断开
private:
ConnectionCallback _cb_connection;
CloseCallback _cb_close;
MessageCallback _cb_message;
};
四.消息/通信具象层实现
1.BaseMessage消息具体实现
Request类型的实现
BaseMessage
↓
JsonRequest(包含 body 字段)
↓
├── RpcRequest
├── TopicRequest
└── ServiceRequest
JsonRequest基类
class JsonRequest:public JsonMessage
{
public:
using ptr=std::shared_ptr<JsonRequest>;
};
class JsonMessage:public BaseMessage
{
public:
using ptr=std::shared_ptr<JsonMessage>;
virtual std::string serialize()override //返回序列化字符串
{
std::string body;
bool ret=JSON::serialize(_body,body);
if(ret==false)
return std::string();
return body;
}
virtual bool unserialize(const std::string &msg)override//对msg进行反序列化 并将对应字段设置到成员变量中
{
return JSON::unserialize(msg,_body);
}
protected:
Json::Value _body;
};
class JsonRequest:public JsonMessage
{
public:
using ptr=std::shared_ptr<JsonRequest>;
};
class JsonResponse:public JsonMessage
{
public:
using ptr=std::shared_ptr<JsonResponse>;
//以JsonResponse为基类的子类实现中都有rcode响应状态码
//可以在基类中实现check()
virtual bool check()override
{
//在响应中,大部分的响应都只要响应状态码
//因此只需要判断响应状态码字段是否存在,类型是否正确
if(_body[KEY_RCODE].isNull()==true)
{
ELOG("响应中没有响应状态码 ");
return false;
}
if(_body[KEY_RCODE].isIntegral()==false)
{
ELOG("响应状态码类型错误");
return false;
}
return true;
}
RCode rcode()
{
return (RCode)_body[KEY_RCODE].asInt();
}
void setRcode(RCode rcode)
{
_body[KEY_RCODE]=(int)rcode;
}
};
//1.RpcRequest RPC 函数调用请求
class RpcRequest:public JsonRequest
{
public:
using ptr=std::shared_ptr<RpcRequest>;
virtual bool check() override
{
//在rpc请求中 包含请求方法名称method(字符串) 参数params(对象)
if(_body[KEY_METHOD].isNull()==true||
_body[KEY_METHOD].isString()==false)
{
ELOG("RPC请求中没有方法名称或者方法类型类型错误");
return false;
}
if(_body[KEY_PARAMS].isNull()==true||
_body[KEY_PARAMS].isObject()==false)
{
ELOG("RPC请求中没有参数信息或者类型错误");
return false;
}
return true;
}
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &method_name)
{
_body[KEY_METHOD]=method_name;
}
Json::Value params()
{
return _body[KEY_PARAMS];
}
void setParams(const Json::Value¶ms)
{
_body[KEY_PARAMS]=params;
}
};
//2.TopicRequest 发布订阅机制
class TopicRequest:public JsonRequest
{
public:
using ptr=std::shared_ptr<TopicRequest>;
virtual bool check() override
{
// 在主题请求中 包含请求主题名称topic_key(字符串) 操作类型optype(int) 内容topic_msg(字符串)
if (_body[KEY_TOPIC_KEY].isNull() == true ||
_body[KEY_TOPIC_KEY].isString() == false)
{
ELOG("主题请求中没有主题名称或者主题类型类型错误");
return false;
}
if (_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("主题请求中没有操作类型或者操作类型类型错误");
return false;
}
//如果是消息发布 才需要检测是否有消息字段
if (_body[KEY_OPTYPE].asInt() == (int)TopicOptype::TOPIC_PUBLISH &&
(_body[KEY_TOPIC_MSG].isNull() == true ||
_body[KEY_TOPIC_MSG].isString() == false))
{
ELOG("主题消息发布请求中没有消息内容字段或者消息内容类型错误");
return false;
}
return true;
}
//主题名称
std::string topicKey()
{
return _body[KEY_TOPIC_KEY].asString();
}
void setTopicKey(const std::string &Key)
{
_body[KEY_TOPIC_KEY]=Key;
}
//主题类型
TopicOptype optype()
{
return (TopicOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(TopicOptype optype)
{
_body[KEY_OPTYPE]=(int)optype;
}
//主题内容
std::string topicMsg()
{
return _body[KEY_TOPIC_MSG].asString();
}
void setTopicMsg(const std::string &Msg)
{
_body[KEY_TOPIC_MSG]=Msg;
}
};
//3.服务注册与发现机制
typedef std::pair<std::string,int> Address;
class ServiceRequest:public JsonRequest
{
public:
using ptr=std::shared_ptr<ServiceRequest>;
virtual bool check() override
{
// 在主题请求中 包含请求方法名称method(字符串) 操作类型optype(int) 主机信息host(ip+port)
if (_body[KEY_METHOD].isNull() == true ||
_body[KEY_METHOD].isString() == false)
{
ELOG("服务请求中没有方法名称或者方法类型类型错误");
return false;
}
if (_body[KEY_OPTYPE].isNull() == true ||
_body[KEY_OPTYPE].isIntegral() == false)
{
ELOG("服务请求中没有操作类型或者操作类型类型错误");
return false;
}
//服务发现和 服务注册/服务上线/服务下线 不同
//服务发现不需要host字段 只需要告诉服务器你要找哪个方法,无需提供自己的地址
//服务注册/上线/下线 必须告诉服务器你是谁
if (_body[KEY_OPTYPE].asInt()!=(int)(ServiceOptype::SERVICE_DISCOVERY)&&
_body[KEY_HOST].isNull()==true||
_body[KEY_HOST].isObject()==false||
_body[KEY_HOST_IP].isNull() == true ||
_body[KEY_HOST_IP].isString() == false||
_body[KEY_HOST_PORT].isNull() == true ||
_body[KEY_HOST_PORT].isIntegral() == false)
{
ELOG("服务请求中主机地址信息错误!");
return false;
}
return true;
}
//方法名称
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &name)
{
_body[KEY_METHOD] = name;
}
//服务类型
ServiceOptype optype()
{
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOptype optype)
{
_body[KEY_OPTYPE]=(int)optype;
}
//主机ip+port
Address host()
{
Address addr;
addr.first=_body[KEY_HOST][KEY_HOST_IP].asString();
addr.second=_body[KEY_HOST][KEY_HOST_PORT].asInt();
return addr;
}
void setHost(const Address &host)
{
Json::Value val;
val[KEY_HOST_IP]=host.first;
val[KEY_HOST_PORT]=host.second;
_body[KEY_HOST]=val;
}
};
Response类型实现
BaseMessage(基类)
↓
JsonMessage(扩展了 body 字段)
↓
JsonResponse(所有响应类的抽象基类,提供 check() 校验)
↓
├── RpcResponse
├── TopicResponse
└── ServiceResponse
JsonResponse基类:
class JsonResponse:public JsonMessage
{
public:
using ptr=std::shared_ptr<JsonResponse>;
//以JsonResponse为基类的子类实现中都有rcode响应状态码
//可以在基类中实现check()
virtual bool check()override
{
//在响应中,大部分的响应都只要响应状态码
//因此只需要判断响应状态码字段是否存在,类型是否正确
if(_body[KEY_RCODE].isNull()==true)
{
ELOG("响应中没有响应状态码 ");
return false;
}
if(_body[KEY_RCODE].isIntegral()==false)
{
ELOG("响应状态码类型错误");
return false;
}
return true;
}
RCode rcode()
{
return (RCode)_body[KEY_RCODE].asInt();
}
void setRcode(RCode rcode)
{
_body[KEY_RCODE]=(int)rcode;
}
};
//二.应答部分
//1.RpcResponse
class RpcResponse:public JsonResponse
{
public:
using ptr = std::shared_ptr<RpcResponse>;
virtual bool check()override
{
if(_body[KEY_RCODE].isNull()==true||
_body[KEY_RCODE].isIntegral()==false)
{
ELOG("响应中没有响应状态码或者状态码类型错误 ");
return false;
}
if(_body[KEY_RESULT].isNull()==true)
{
ELOG("响应中没有Rpc调用结果,或结果类型错误!");
return false;
}
return true;
}
Json::Value result()
{
return _body[KEY_RESULT];
}
void setResult(const Json::Value &result)
{
_body[KEY_RESULT]=result;
}
};
//2.TopicResponse
class TopicResponse:public JsonResponse
{
public:
using ptr = std::shared_ptr<TopicResponse>;
};
//3.ServiceResponse
class ServiceResponse :public JsonResponse
{
public:
using ptr = std::shared_ptr<ServiceResponse>;
virtual bool check()override
{
if(_body[KEY_RCODE].isNull()==true||
_body[KEY_RCODE].isIntegral()==false)
{
ELOG("响应中没有响应状态码或者状态码类型错误 ");
return false;
}
if(_body[KEY_OPTYPE].isNull()==true||
_body[KEY_OPTYPE].isIntegral()==false)
{
ELOG("响应中没有操作类型,或操作类型的类型错误!");
return false;
}
//如果当前响应是服务发现响应,就必须有方法名(string)和可用主机列表(host[]数组)
if(_body[KEY_OPTYPE].asInt()==(int)ServiceOptype::SERVICE_DISCOVERY&&
_body[KEY_METHOD].isNull()==true||
_body[KEY_METHOD].isIntegral()==false||
_body[KEY_HOST].isNull()==true||
_body[KEY_HOST].isArray()==false)
{
ELOG("服务发现响应中响应信息字段错误!");
return false;
}
return true;
}
ServiceOptype optype()
{
return (ServiceOptype)_body[KEY_OPTYPE].asInt();
}
void setOptype(ServiceOptype optype)
{
_body[KEY_OPTYPE]=(int)optype;
}
std::string method()
{
return _body[KEY_METHOD].asString();
}
void setMethod(const std::string &name)
{
_body[KEY_METHOD] = name;
}
void setHost(std::vector<Address> addrs)
{
for(auto&addr:addrs)
{
Json::Value val;
val[KEY_HOST_IP]=addr.first;
val[KEY_HOST_PORT]=addr.second;
_body[KEY_HOST].append(val);
}
}
std::vector<Address> hosts()
{
std::vector<Address> addrs;
int sz=_body[KEY_HOST].size();
for(int i=0;i<sz;i++)
{
Address addr;
addr.first=_body[KEY_HOST][i][KEY_HOST_IP].asString();
addr.second=_body[KEY_HOST][i][KEY_HOST_PORT].asInt();
addrs.push_back(addr);
}
return addrs;
}
};
1.关于3.ServiceResponse 中
//如果当前响应是服务发现响应,就必须有方法名(string)和可用主机列表(host[]数组)
if(_body[KEY_OPTYPE].asInt()==(int)ServiceOptype::SERVICE_DISCOVERY&&
响应时要先通过optype判断类型,因为服务发现和服务注册/上线/下线需要的字段不同,服务发现要返回服务提供方的信息,一个host数组 来保存所提供服务主机的ip+port,还有一个method来表示方法名称。
而对于服务注册/上线/下线,服务端不需要返回具体主机地址,只要告诉客户端操作是否成功即可。
操作类型(optype) method
字段hosts
字段说明 SERVICE_DISCOVERY
✅ 是 ✅ 是(数组,多个 host) 发现服务时,需要告诉你找的 method
,并返回提供该方法的所有服务主机地址SERVICE_ONLINE
/OFFLINE
/REGISTRY
❌ 否 ❌ 否 上线、下线、注册的响应只需状态码即可,无需告诉你其他服务信息
2.同理在Request类中3.ServiceRequest也有服务发现请求与其他服务操作请求的字段的差异性。
服务发现不需要host字段 只需要告诉服务器你要找哪个方法,无需提供自己的地址。
服务注册/上线/下线 必须告诉服务器你是谁。
操作类型(optype) 是否需要 host
字段说明 SERVICE_ONLINE
✅ 是 要告诉注册中心:我上线了,我在哪(ip + port) SERVICE_OFFLINE
✅ 是 要告知下线主机地址 SERVICE_REGISTRY
✅ 是 初次注册,主机信息必需 SERVICE_DISCOVERY
❌ 否 只需告诉“我要找某个方法”,不需要告诉“我是谁”
消息对象的生产工厂
为啥用工厂模式?
在你的 RPC 系统中,接收到网络数据后需要反序列化出消息对象,而这些消息类型不固定。
如果没有工厂,你可能要写很多类似的代码:
if (mtype == REQ_RPC) obj = std::make_shared<RpcRequest>(); else if (mtype == RSP_RPC) obj = std::make_shared<RpcResponse>(); ...
一多就难维护。用
MessageFactory::create(mtype)
一句搞定,结构更清晰、扩展性更强。
//实现一个消息对象的生产工厂
//方法一:
class MessageFactory
{
public:
static BaseMessage::ptr create(MType mtype)
{
switch(mtype)
{
case MType::REQ_RPC :return std::make_shared<RpcRequest>();
case MType::RSP_RPC :return std::make_shared<RpcResponse>();
case MType::REQ_TOPIC :return std::make_shared<TopicRequest>();
case MType::RSP_TOPIC :return std::make_shared<TopicResponse>();
case MType::REQ_SERVICE :return std::make_shared<ServiceRequest>();
case MType::RSP_SERVICE :return std::make_shared<ServiceResponse>();
}
return BaseMessage::ptr();
}
//方法二:模板函数
template<typename T,typename ...Args>//类型参数包
static std::shared_ptr<T> create(Args&& ...args)//Args&& 转发引用 ...args函数参数包
{
return std::make_shared<T>(std::forward<Args>(args)...);//...参数包展开运算符,是为了把 模板参数包展开
}
};
这个模板函数的作用是:
接受任意数量和类型的参数(通过参数包
Args&&...
)。使用 完美转发 把这些参数传给
std::make_shared<T>
。返回一个
shared_ptr<T>
智能指针。
关键词 含义 template<typename T, typename ...Args>
支持任意类型 T
和任意数量参数Args...
Args&& ...args
接收所有参数(完美转发支持左值/右值) std::forward<Args>(args)...
将参数原样转发给 T
的构造函数,避免不必要的拷贝,提高性能std::make_shared<T>
创建对象并返回 shared_ptr<T>
(智能指针)( 不使用
概念 含义 左值(lvalue) 有名字、可取地址 右值(rvalue) 临时、无名、不可取地址 右值引用(T&&) 如果 T 是具体类型,是右值引用,只能绑定右值 转发引用(T&&) 在模板中出现,可以绑定任意值 std::move
强制把左值转换成右值(用于触发移动) std::forward
根据类型自动转发左值/右值(用于完美转发) std::forward
的话,右值参数也会被当成左值来处理,从而导致性能下降( 比如不走移动构造,走了拷贝构造)。)
test.cpp 消息类型测试:序列化和反序列化 各字段的设置和获取
#include "message.hpp"
int main()
{
// //1.2.TopicRequest
// wws::TopicRequest::ptr trp= wws::MessageFactory::create<wws::TopicRequest>();
// trp->setTopicKey("new");
// trp->setOptype(wws::TopicOptype::TOPIC_PUBLISH);//发布
// trp->setTopicMsg("0721");
// std::string str=trp->serialize();
// std::cout<<str<<std::endl;
// wws::TopicRequest::ptr trp2= wws::MessageFactory::create<wws::TopicRequest>();
// bool ret=trp2->unserialize(str);
// if(ret==false) return -1;
// ret=trp2->check();
// if(ret==false) return -1;
// std::cout<<trp2->topicKey()<<std::endl;
// std::cout<<(int)trp2->optype()<<std::endl;
// std::cout<<trp2->topicMsg()<<std::endl;
// //1.3.ServiceRequest
// wws::ServiceRequest::ptr trp= wws::MessageFactory::create<wws::ServiceRequest>();
// trp->setMethod("Add");
// trp->setOptype(wws::ServiceOptype::SERVICE_REGISTRY);//发布
// trp->setHost(wws::Address("127.0.0.1",9090));
// std::string str=trp->serialize();
// std::cout<<str<<std::endl;
// wws::ServiceRequest::ptr trp2= wws::MessageFactory::create<wws::ServiceRequest>();
// bool ret=trp2->unserialize(str);
// if(ret==false) return -1;
// ret=trp2->check();
// if(ret==false) return -1;
// std::cout<<trp2->method()<<std::endl;
// std::cout<<(int)trp2->optype()<<std::endl;
// std::cout<<trp2->host().first<<std::endl;
// std::cout<<trp2->host().second<<std::endl;
// //2.1RpcResponse
// wws::RpcResponse::ptr trp= wws::MessageFactory::create<wws::RpcResponse>();
// trp->setRcode(wws::RCode::RCODE_OK);
// trp->setResult(33); //对result类型不做检测
// std::string str=trp->serialize();
// std::cout<<str<<std::endl;
// wws::RpcResponse::ptr trp2= wws::MessageFactory::create<wws::RpcResponse>();
// bool ret=trp2->unserialize(str);
// if(ret==false) return -1;
// ret=trp2->check();
// if(ret==false) return -1;
// std::cout<<(int)trp2->rcode()<<std::endl;
// std::cout<<trp2->result().asInt()<<std::endl;//双方约定好 result的类型 传什么类型就用什么方法取.as_
// //2.2RpcResponse
// wws::TopicResponse::ptr trp= wws::MessageFactory::create<wws::TopicResponse>();
// trp->setRcode(wws::RCode::RCODE_OK);
// std::string str=trp->serialize();
// std::cout<<str<<std::endl;
// wws::TopicResponse::ptr trp2= wws::MessageFactory::create<wws::TopicResponse>();
// bool ret=trp2->unserialize(str);
// if(ret==false) return -1;
// ret=trp2->check();
// if(ret==false) return -1;
// std::cout<<(int)trp2->rcode()<<std::endl;
//2.3ServiceResponse
wws::ServiceResponse::ptr trp= wws::MessageFactory::create<wws::ServiceResponse>();
trp->setRcode(wws::RCode::RCODE_OK);
trp->setMethod("Add");
trp->setOptype(wws::ServiceOptype::SERVICE_DISCOVERY);
std::vector<wws::Address> addrs;
addrs.push_back(wws::Address("127.0.0.1",9090));
addrs.push_back(wws::Address("127.0.0.2",9091));
trp->setHost(addrs);
std::string str=trp->serialize();
std::cout<<str<<std::endl;
wws::ServiceResponse::ptr trp2= wws::MessageFactory::create<wws::ServiceResponse>();
bool ret=trp2->unserialize(str);
if(ret==false) return -1;
ret=trp2->check();
if(ret==false) return -1;
std::cout<<(int)trp2->rcode()<<std::endl;
std::cout<<trp2->method()<<std::endl;
std::vector<wws::Address> addrs1=trp2->hosts();
for(auto&addr:addrs)
std::cout<<addr.first<<":"<<addr.second<<std::endl;
return 0;
}
2.MuduoBuffer缓冲区实现
class MuduoBuffer:public BaseBuffer
{
public:
using ptr=std::shared_ptr<MuduoBuffer>;
MuduoBuffer(muduo::net::Buffer *buf):_buf(buf){}
virtual size_t readableSize()override //返回当前缓冲区有多少数据
{
return _buf->readableBytes();
}
virtual int32_t peekInt32()override //读int32大小的数据但不从缓冲区中取出
{
//muduo库是一个网络库Muduo 在接收数据时会自动将“网络字节序(大端)”转换回“主机字节序(小端)”
//所以我们发送数据时必须先把主机字节序转换成网络字节序
return _buf->peekInt32();
}
virtual void retrieveInt32()override //从缓冲区中取出int32大小数据
{
return _buf->retrieveInt32();
}
virtual int32_t readInt32()override//读取int32大小数据
{
return _buf->readInt32();
}
virtual std::string retrieveAsString(size_t len)override //读取len字节大小的数据
{
return _buf->retrieveAsString(len);
}
private:
muduo::net::Buffer *_buf;
};
class BufferFactory
{
public:
template <typename... Args>
static BaseBuffer::ptr create(Args &&...args)
{
return std::make_shared<MuduoBuffer>(std::forward<Args>(args)...);
}
};
3.LVProtocol 协议实现
将一个消息对象
msg
转换成一个标准的字节流格式,准备进行网络传输。序列化格式遵循特定的顺序,并且通过htonl
等函数确保数据按网络字节序(大端)进行处理,避免在不同平台上出现字节序不一致的问题。
class LVProtocol :public BaseProtocol
{
public:
using ptr=std::shared_ptr<LVProtocol>;
//判断缓冲区中是否有完整的报文
// |--Len--|--VALUE--|
// |--Len--|--mtype--|--idlen--|--id--|--body--|
virtual bool canProcessed(const BaseBuffer::ptr &buf)override
{
//先判断能否读取4字节的Len 即VALUE大小
if(buf->readableSize()<lenFieldsLength)
return false;
int32_t total_len=buf->peekInt32();
//判断是否至少有一个完整报文
if(buf->readableSize()<(lenFieldsLength+total_len))
return false;
return true;
}
// |--Len--|--mtype--|--idlen--|--id--|--body--|
//从缓冲区中解析完整的报文
virtual bool oMessage(const BaseBuffer::ptr &buf,BaseMessage::ptr &msg)override
{
//当调用onMessage的时候,默认认为缓冲区中的数据足够一条完整的消息
int32_t total_len=buf->readInt32();//读取总长度
MType mtype=(MType)buf->readInt32();//读取数据类型
int32_t idlen=buf->readInt32();
int32_t body_len=total_len-mtypeFieldsLength-idlenFieldsLength-idlen;
std::string id=buf->retrieveAsString(idlen);
std::string body=buf->retrieveAsString(body_len);
//构建消息对象
msg=MessageFactory::create(mtype);
if(msg.get()==nullptr)//get()获取底层指针的值
{
ELOG("消息类型错误,构造消息对象失败");
return false;
}
//将body内容反序列化并填入msg中
bool ret=msg->unserialize(body);
if(ret==false)
{
ELOG("消息正文反序列化失败");
return false;
}
msg->setId(id);
msg->setMType(mtype);
return true;
}
//把数据进行序列化 准备应答
virtual std::string serialize(const BaseMessage::ptr &msg)override
{
// |--Len--|--mtype--|--idlen--|--id--|--body--|
//muduo库是一个网络库Muduo 在接收数据时会自动将“网络字节序(大端)”转换回“主机字节序(小端)”
//所以我们发送数据时必须先把主机字节序转换成网络字节序hton
std::string body=msg->serialize();
std::string id=msg->rid();
int32_t idlen=htonl(id.size());
int32_t mtype=htonl((int)msg->mtype());
int32_t h_total_len =body.size()+idlen+idlenFieldsLength+mtypeFieldsLength;
int32_t n_total_len=htonl(h_total_len);
std::string result;
result.reserve(h_total_len);//先扩容
// 手动把整数的地址转成字符指针
result.append((char*)&n_total_len,lenFieldsLength);
result.append((char*)&mtype,mtypeFieldsLength);
result.append((char*)&idlen,idlenFieldsLength);
result.append(id);
result.append(body);
return result;
}
private:
const size_t lenFieldsLength=4;
const size_t mtypeFieldsLength=4;
const size_t idlenFieldsLength=4;
};
class ProtocolFactory
{
public:
template<typename ...Args>
static BaseProtocol::ptr create(Args&& ...args)
{
return std::make_shared<LVProtocol>(std::forward<Args>(args)...);
}
};
result.append((char*)&xxx, len);
为什么要转为char*?
C++ 中 append() 的参数必须是字符指针(const char*),用于追加文本或原始字节流。
但整数(如 int32_t, uint16_t, 等)并不是字符类型,所以要 手动把整数的地址转成字符指针,表示「我要将这个整数的内存原始字节按顺序放入字符串中」。这样后面读取的时候读4字节就是一个整型数据。
这种写法是为了构造二进制协议格式,性能高效,常见于消息框架、自定义协议中。
std::string::append()
将字符串、字符、或二进制数据追加到当前字符串末尾。
形式 示例 说明 append(const std::string& str)
s.append("abc");
追加一个字符串 append(const char* s, size_t n)
s.append(buf, 4);
追加前 n
个字符(可用于二进制)append(size_t n, char c)
s.append(3, '.');
追加 n
个相同字符
函数名 含义 用于方向 标志记忆 htonl
Host → Network(Long) 发送 int32_t
时用主机转网络 htons
Host → Network(Short) 发送 int16_t
时用主机转网络 ntohl
Network → Host(Long) 接收 int32_t
时用网络转主机 ntohs
Network → Host(Short) 接收 int16_t
时用网络转主机
4.MuduoConnection连接实现
MuduoConnection
是对 Muduo 中 TcpConnection 的一层封装,目的是统一网络发送/接收的接口,让上层只面向 BaseConnection 编程,屏蔽底层实现细节。
class MuduoConnection:public BaseConnection
{
public:
using ptr=std::shared_ptr<MuduoConnection>;
MuduoConnection(BaseProtocol::ptr protocol,muduo::net::TcpConnectionPtr conn)
:_protocol(protocol),
_conn(conn)
{}
virtual void send(const BaseMessage::ptr &msg)override //发送消息
{
std::string body=_protocol->serialize(msg);
_conn->send(body);
}
virtual void shutdown()override //关闭连接
{
_conn->shutdown();
}
virtual bool connected()override//判断连接是否正常
{
return _conn->connected();
}
private:
BaseProtocol::ptr _protocol; //进行序列化和反序列化
muduo::net::TcpConnectionPtr _conn; //进行网络通信
};
class ConnectionFactory
{
public:
template<typename ...Args>
static BaseConnection::ptr create(Args&& ...args)
{
return std::make_shared<MuduoConnection>(std::forward<Args>(args)...);
}
};
5.MuduoServer 服务端实现
MuduoServer
是整个服务器通信核心模块,它基于 Muduo 实现了 TCP 通信能力,封装了连接管理与消息处理,并统一为BaseServer
抽象,方便整个 Json-RPC 框架的后续拓展。
class MuduoServer:public BaseServer
{
public:
using ptr = std::shared_ptr<MuduoServer>;
MuduoServer(int port)
:_server(&_baseloop, muduo::net::InetAddress("0.0.0.0",port),
"MuduoServer",muduo::net::TcpServer::kNoReusePort),
_protocol(ProtocolFactory::create())
{}
using ptr =std::shared_ptr<MuduoServer>;
virtual void start()override//运行服务端
{
//设置连接建立/断开 消息的回调函数
_server.setConnectionCallback(std::bind(&MuduoServer::onConnection,this,std::placeholders::_1));
_server.setMessageCallback(std::bind(&MuduoServer::onMessage,this,
std::placeholders::_1,std::placeholders::_2,std::placeholders::_3));
_server.start();//开始监听
_baseloop.loop();//开始循环事件监控
}
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn) // 连接建立/断开的回调函数
{
if (conn->connected()) // 判断连接是否存在
{
std::cout << "连接建立" << std::endl;
auto muduo_conn=ConnectionFactory::create(conn,_protocol);
//上锁并限定作用域 出作用域自动解锁
{
std::unique_lock<std::mutex> lock(_mutex);
_conns.insert(std::make_pair(conn,muduo_conn));
}
if(_cb_connection) _cb_connection(muduo_conn);//设置回调函数就调用
}
else
{
std::cout << "连接断开" << std::endl;
BaseConnection::ptr muduo_conn;
{
std::unique_lock<std::mutex> lock(_mutex);
auto it=_conns.find(conn);
//没找到TcpConnectionPtr对应的MuduoConnection::ptr
if(it==_conns.end())
{
return;//连接已经断开了 直接return
}
muduo_conn=it->second;//获取对应的MuduoConn(但是用基类Baseconn指向的)
_conns.erase(conn);//删除联系
}
if(_cb_close) _cb_close(muduo_conn);
}
}
// 接收到消息的回调函数
void onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp)
{
auto base_buf=BufferFactory::create(buf);
while(1)
{
if(_protocol->canProcessed(base_buf)==false)
{
//如果缓冲区的数据比2^16还大,但仍不是完整的报文
//说明报文的len字段错误
if(base_buf->readableSize()>maxDataSize)
{
conn->shutdown();//断开连接
ELOG("缓冲区数据过大");
return;
}
break;
}
//有完整的报文
BaseMessage::ptr msg;
//从缓冲区的报文解析出body字段再内容反序列化并填入msg中
bool ret=_protocol->oMessage(base_buf,msg);
if(ret==false)
{
conn->shutdown();
ELOG("缓冲区中数据错误");
return ;
}
//此时反序列化成功body字段信息放入msg中
BaseConnection::ptr base_conn;//获取TcpConn对应的连接 进行回调函数的调用
{
std::unique_lock<std::mutex> lock(_mutex);
auto it =_conns.find(conn);
if(it==_conns.end())
{
conn->shutdown();
return;
}
base_conn=it->second;
}
//缓冲区数据解析完毕 调用回调函数
if(_cb_message) _cb_message(base_conn,msg);
}
}
private:
const size_t maxDataSize=(1<<16);//2^16
BaseProtocol::ptr _protocol;//用于构建MuduoConn
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
std::mutex _mutex;
std::unordered_map<muduo::net::TcpConnectionPtr,BaseConnection::ptr> _conns;//用MuduoConn封装TcpConn 两者间建立映射
};
class ServerFactory
{
public:
template<typename ...Args>
static MuduoServer::ptr create(Args&& ...args)
{
return std::make_shared<MuduoServer>(std::forward<Args>(args)...);
}
};
std::unordered_map<muduo::net::TcpConnectionPtr, BaseConnection::ptr> _conns;
为什么TcpConnectionPtr 和 BaseConnection::ptr要建立映射关系?
因为 Muduo 回调中只给你原始连接对象 TcpConnectionPtr,而框架中所有处理(协议、发送、抽象)都依赖于 BaseConnection::ptr,所以必须建立它们之间的映射关系,才能实现消息处理、连接管理等功能。
std::unique_lock<std::mutex> lock(_mutex);
含义:
std::unique_lock
:是一个通用的互斥锁管理器,比std::lock_guard
更灵活。
<std::mutex>
:指定要管理的锁类型是std::mutex
。
lock
:定义的锁对象变量名,进入作用域时加锁,离开作用域时自动解锁。
(_mutex)
:传入的互斥量对象(你在类中定义的_mutex
)RAII(资源获取即初始化)机制让你不需要手动
lock()
和unlock()
:
阶段 操作 lock
构造时自动调用 _mutex.lock()
加锁lock
析构时自动调用 _mutex.unlock()
解锁
std::unique_lock<std::mutex> lock(_mutex);
是一种 自动加解锁的写法,让你在并发访问共享资源时更加安全、简洁,避免忘记解锁而造成死锁。
为什么在连接的回调函数中和消息的回调函数中访问_conns时要使用锁?
Muduo 内部采用 Reactor 模型 + 多线程,也就是说:
有可能同时有多个客户端连接/断开/发消息
每个事件(如连接/断开)都可能在不同线程中被回调触发
所以当多个连接同时断开时,如果你不加锁访问
_conns
,可能会导致:onMessage 中加锁是为了线程安全地访问
问题类型 举例说明 ❌ 读写冲突 一个线程正在 find()
,另一个线程erase()
,导致迭代器失效或崩溃❌ 数据不一致 多线程同时访问 _conns
,会出现竞争条件❌ 程序崩溃 unordered_map
本身不是线程安全容器_conns
映射表,避免读写冲突。锁的范围仅限于 find+拷贝,之后使用连接对象是安全的。
6.MuduoClient 客户端实现
MuduoClient
负责与服务端建立 TCP 连接,处理消息的接收、发送,回调处理,并对 Muduo 底层通信封装为框架内统一接口BaseClient
。
class MuduoClient : BaseClient
{
public:
using ptr = std::shared_ptr<BaseClient>;
MuduoClient(const std::string &sip, int sport) : _protocol(ProtocolFactory::create()),
_downlactch(1), // 初始化计数器=1
_baseloop(_loopthread.startLoop()),
_client(_baseloop, muduo::net::InetAddress(sip, sport), "MuduoClient")
{
}
virtual void connect() override // 连接服务端
{
// 连接事件回调
_client.setConnectionCallback(std::bind(&MuduoClient::onConnection, this, std::placeholders::_1));
// 连接消息回调
_client.setMessageCallback(std::bind(&MuduoClient::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 连接服务器 client的connect接口是一个非阻塞的操作
// 所以可能会出现connect还没建立连接,conntion接口就获取了连接,send()发送数据
_client.connect(); // 此时计数>0
_downlactch.wait(); // 等连接建立完成countDown()后计数-- =0继续
}
virtual void shutdown() override // 断开连接
{
return _client.disconnect();
}
virtual bool send(const BaseMessage::ptr &msg) override // 发送数据
{
if (connected() == false)
{
ELOG("连接已断开!");
return false;
}
_conn->send(msg);
}
virtual BaseConnection::ptr conntion() override // 获取当前连接
{
return _conn;
}
virtual bool connected() override // 判断连接是否断开
{
return (_conn && _conn->connected());
}
private:
void onConnection(const muduo::net::TcpConnectionPtr &conn) // 连接建立/断开的回调函数
{
if (conn->connected()) // 判断连接是否存在
{
std::cout << "连接建立" << std::endl;
_conn = ConnectionFactory::create(conn);
_downlactch.countDown(); // 计数-- =0 唤醒wait
}
else
std::cout << "连接断开" << std::endl;
}
// 接收到消息的回调函数
void onMessage(const muduo::net::TcpConnectionPtr &conn,
muduo::net::Buffer *buf, muduo::Timestamp)
{
auto base_buf = BufferFactory::create(buf);
while (1)
{
if (_protocol->canProcessed(base_buf) == false)
{
// 如果缓冲区的数据比2^16还大,但仍不是完整的报文
// 说明报文的len字段错误
if (base_buf->readableSize() > maxDataSize)
{
conn->shutdown(); // 断开连接
ELOG("缓冲区数据过大");
return;
}
break;
}
// 有完整的报文
BaseMessage::ptr msg;
// 从缓冲区的报文解析出body字段再内容反序列化并填入msg中
bool ret = _protocol->oMessage(base_buf, msg);
if (ret == false)
{
conn->shutdown();
ELOG("缓冲区中数据错误");
return;
}
// 缓冲区数据解析完毕 调用回调函数
if (_cb_message)
_cb_message(_conn, msg);
}
}
private:
const size_t maxDataSize = (1 << 16); // 2^16
BaseProtocol::ptr _protocol;
BaseConnection::ptr _conn;
muduo::CountDownLatch _downlactch; // 做计数同步的类 void wait()计数>0阻塞 countDown()-- 计数=0唤醒wait
muduo::net::EventLoopThread _loopthread; // 实例化后自动创建一个线程执行loop
muduo::net::EventLoop *_baseloop;
muduo::net::TcpClient _client;
};
class ClientFactory
{
public:
template <typename... Args>
static BaseClient::ptr create(Args &&...args)
{
return std::make_shared<MuduoClient>(std::forward<Args>(args)...);
}
};
成员 类型 说明 _client
muduo::net::TcpClient
Muduo 封装的 TCP 客户端对象 _conn
BaseConnection::ptr
封装后的连接对象(包含协议逻辑) _protocol
BaseProtocol::ptr
用于解析、封装通信数据的协议对象(如 LV 协议) _downlatch
CountDownLatch
等待连接完成的同步工具 _loopthread
/_baseloop
EventLoopThread
/EventLoop*
单线程 IO 事件循环
客户端的 BaseConnection::ptr _conn不用map和TcpConn建立映射关系吗?
客户端只有一个连接,直接保存 BaseConnection::ptr _conn 即可,不需要映射。服务端需要处理多个连接,所以必须用 map 建立 TcpConnectionPtr 到 BaseConnection::ptr 的映射关系。
关于客户端设置信息的回调函数,调用的过程:
#include "message.hpp" #include "net.hpp" void onMessage(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg) { std::string body =msg->serialize(); std::cout<<body<<std::endl; } int main() { auto client=wws::ClientFactory::create("127.0.0.1",9090); client->setMessageCallback(onMessage); client->connect(); return 0; }
1.业务设置回调函数
main(){client->setMessageCallback(onMessage);}
该函数由
BaseClient
定义:此时
_cb_message
成员变量就保存了你业务层提供的onMessage()
。
2.connect 建立连接并注册 Muduo 的消息回调
设置了 Muduo TcpClient 的
onMessage
回调函数:oMessage的三个参数谁传的?
这三个参数由 Muduo 框架底层自动传入的!
我们在connect()中已经提供bind()设置了回调函数并绑定了参数,
从此之后:
每当某个连接
conn
上有数据可读,Muduo 就会:
从 socket 中读取数据进
muduo::net::Buffer
对象获取这个连接的
TcpConnectionPtr
获取当前
Timestamp (
时间戳)
然后 调用你注册的
onMessage(conn, buf, time)
这个是 Muduo 在底层收到数据之后调用的函数。
3.Muduo 收到网络数据 → 调用你的
MuduoClient::onMessage()
这一步完成了解包(如 LV 协议)、反序列化(如 JSON)过程,得到了
BaseMessage::ptr
类型的消息对象。
4.触发你业务设置的
_cb_message
回调函数
【服务端发送 JSON 消息】 ↓ 【muduo TcpClient 收到数据】 ↓ MuduoClient::onMessage(conn, buf, timestamp) ↓ _basebuf = BufferFactory::create(buf) msg = _protocol->onMessage(...) ↓ _cb_message(_conn, msg) ↓ 你在 main() 中的 onMessage()
服务端和客户端为什么设置回调函数的位置不同?
✅ 服务端在 start() 里设置回调
✅ 客户端在 connect() 里设置回调
服务端必须先设置好回调后再启动监听;
start()
之后会立即开启监听并接收客户端连接;所以在调用
start()
之前,必须设置好回调函数(连接、消息、关闭);否则客户端一旦连上,服务端都没来得及注册处理逻辑,就错过了事件,行为未定义或崩溃。
客户端的连接是主动行为。客户端并不会自动发起连接,必须你主动调用,因此在
connect()
前设置回调即可。
对比点 服务端 客户端 连接方式 被动接收连接 主动发起连接 核心操作 _server.start()
启动监听_client.connect()
发起连接设置回调时机 必须在 .start()
前完成必须在 .connect()
前完成为什么 否则客户端连上了没人处理 否则连接成功了没人处理
客户端 服务端通信简单测试
client
#include "message.hpp" #include "net.hpp" #include <thread> void onMessage(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg) { std::string body =msg->serialize(); std::cout<<body<<std::endl; } int main() { auto client=wws::ClientFactory::create("127.0.0.1",9090); client->setMessageCallback(onMessage); client->connect(); auto rpc_req=wws::MessageFactory::create<wws::RpcRequest>(); rpc_req->setId("111"); rpc_req->setMType(wws::MType::RSP_RPC); rpc_req->setMethod("Add"); Json::Value val; val["num1"]=11; val["num2"]=22; rpc_req->setParams(val); client->send(rpc_req); std::this_thread::sleep_for(std::chrono::seconds(10)); client->shutdown(); return 0; }
server
#include "message.hpp" #include "net.hpp" void onMessage(const wws::BaseConnection::ptr conn,wws::BaseMessage::ptr& msg) { std::string body =msg->serialize(); std::cout<<body<<std::endl; auto rpc_rsp=wws::MessageFactory::create<wws::RpcResponse>(); rpc_rsp->setId("111"); rpc_rsp->setMType(wws::MType::RSP_RPC); rpc_rsp->setRcode(wws::RCode::RCODE_OK); rpc_rsp->setResult(33); conn->send(rpc_rsp); } int main() { auto server=wws::ServerFactory::create(9090); server->setMessageCallback(onMessage); server->start(); return 0; }