【从零实现Json-Rpc框架】- 第三方库介绍 - Muduo篇
📢博客主页:https://blog.csdn.net/2301_779549673
📢博客仓库:https://gitee.com/JohnKingW/linux_test/tree/master/lesson
📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
📢本文由 JohnKi 原创,首发于 CSDN🙉
📢未来很长,值得我们全力奔赴更美好的生活✨
文章目录
- 📢前言
- 🏳️🌈1 Muduo库是什么
- 🏳️🌈2 Muduo库常见接口介绍
- 2.1 **TcpServer类基础介绍**
- 2.2 EventLoop类基础介绍
- 2.3 TcpConnection类基础介绍
- 2.4 TcpClient类基础介绍
- 2.5 Buffer类基础介绍
- 🏳️🌈3 Muduo 使用案例
- 3.1 server.cpp
- 3.2 client.cpp
- 👥总结
📢前言
紧接上回,接下来笔者将介绍rpc框架的最重要的三个第三方库,方便大家理解
这回是 Muduo 篇
🏳️🌈1 Muduo库是什么
Muduo
由陈硕大佬开发,是一个基于非阻塞I0和事件驱动的C++高并发TCP网络编程库。它是一款基于主从Reactor模型的网络库,其使用的线程模型是oneloop per thread
,所谓one loop per thread
指的是:
- 一个线程只能有一个事件循环(EventLoop),用于响应计时器和I0事件
- 文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须归属于某个EventLoop管理
🏳️🌈2 Muduo库常见接口介绍
这部分并不需要很大掌握,只需要知道大约流程和部分接口需要干什么
这部分文字介绍得不方便,大家根据注释理解下就行,重要的参数和方法都已经标注了
用CSDN博客看可能会不清晰,大家可以复制到各自的编译器中,代码和注释会更加清楚明白,刷题平台也行
2.1 TcpServer类基础介绍
这部分是辅助的
#include <?>
// 使用 TcpConnectionPtr(shared_ptr)管理连接生命周期,避免内存泄漏
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
// 参数 TcpConnectionPtr 是对当前连接的封装,用户可操作连接对象(如发送数据、关闭连接)。
typedef std::function<void(const TcpConnectionPtr&)> ConnectionCallback;
// TcpConnectionPtr:当前连接对象。
// Buffer*:接收数据的缓冲区指针,用户可从中读取字节流。
// Timestamp:消息到达的时间戳。
typedef std::function<void(const TcpConnectionPtr&, Buffer*, Timestamp)> MessageCallback;
// 辅助类:封装网络地址信息(IP + 端口),用于服务器或客户端的地址绑定。
// ip:IP 地址字符串(如 "127.0.0.1")。
// port:端口号。
// ipv6:是否使用 IPv6(默认为 IPv4)。
class InetAddress : public muduo::copyable {
public:
InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
这部分才是主体
class TcpServer : noncopyable {
public:
// 定义服务器套接字选项,用于控制端口复用行为。
enum Option {
kNoReusePort, // 禁止端口复用(默认选项)。
kReusePort, //允许端口复用(需操作系统支持),允许多个进程/线程绑定同一端口,提升高并发性能。
};
// 构造函数:初始化 TCP 服务器,绑定监听地址和事件循环。
TcpServer( EventLoop* loop, // 事件循环对象指针(基于 Reactor 模型,负责处理 I/O 事件)。
const InetAddress& listenAddr, // 服务器监听的地址(IP + 端口),通过 InetAddress 封装。
const string& nameArg, // 服务器名称标识符。
Option option = kNoReusePort); // 端口复用选项(默认 kNoReusePort)。
// 方法:设置服务器线程池中的工作线程数量。
void setThreadNum(int numThreads);
// 方法:启动服务器,开始监听客户端连接。
void start();
// 方法:设置新连接建立或关闭时的回调函数。
void setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}
// 方法:设置消息到达时的回调函数,处理接收到的数据。
void setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}
};
2.2 EventLoop类基础介绍
#include <?>
class EventLoop : noncopyable
{
public:
// 启动事件循环的核心函数,阻塞当前线程并持续监听和处理事件(如 I/O 事件、定时器事件等)。
void loop();
// 安全退出事件循环,通常在回调函数中调用。
void quit();
// 在指定的绝对时间 time 触发回调函数 cb。
TimerId runAt(Timestamp time, TimerCallback cb);
// 在指定的相对时间 delay 后触发回调函数 cb。
TimerId runAfter(double delay, TimerCallback cb);
// 每隔指定的时间间隔 interval 触发回调函数 cb。
TimerId runEvery(double interval, TimerCallback cb);
// 取消指定 ID 的定时器。
void cancel(TimerId timerId);
private:
std::atomic<bool> quit_; // 原子布尔标志,控制事件循环是否退出
std::unique_ptr<Poller> poller_; // 封装底层 I/O 多路复用机制(如 epoll、poll 或 select),负责监听和分发 I/O 事件。
mutable MutexLock mutex_; // 保护 pendingFunctors_ 的线程安全,防止多线程同时修改导致数据竞争。
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_); // 保存其他线程提交到事件循环线程中待执行的回调函数。
};
其中最重要的 Loop()
内部流程:
- 调用 Poller::poll() 监听所有注册的 I/O 事件(如 socket 可读/可写)。
- 处理就绪的 I/O 事件(如接收新连接、读取数据)。
- 执行到期定时器回调(通过 TimerQueue 管理)。
- 执行其他线程提交到 pendingFunctors_ 中的回调(通过互斥锁保证线程安全)。
- 循环持续直到 quit_ 被设置为 true。
2.3 TcpConnection类基础介绍
#include <?>
// TcpConnection 类:封装 TCP 连接的生命周期管理、数据读写及回调机制
// 继承 noncopyable 禁止拷贝,继承 enable_shared_from_this 支持安全获取 shared_ptr
class TcpConnection : noncopyable, public std::enable_shared_from_this<TcpConnection> {
public:
// 构造函数:初始化 TCP 连接对象
// 参数:
// - loop: 所属的事件循环(I/O 线程)
// - name: 连接的唯一标识名称
// - sockfd: 已建立的 TCP 套接字文件描述符
// - localAddr: 本地地址(服务器地址)
// - peerAddr: 对端地址(客户端地址)
TcpConnection(EventLoop *loop,
const string &name,
int sockfd,
const InetAddress &localAddr,
const InetAddress &peerAddr);
// 判断连接状态是否为已连接(kConnected)
bool connected() const { return state_ == kConnected; }
// 判断连接状态是否为已断开(kDisconnected)
bool disconnected() const { return state_ == kDisconnected; }
// 发送数据(多个重载版本,支持不同数据类型)
// 方法说明:将数据写入发送缓冲区,若直接发送失败则注册可写事件等待重试
// 注意:线程安全,但需确保在 I/O 线程调用(或通过跨线程任务提交)
// 发送字符串(右值引用,高效移动语义)
void send(string &&message);
// 发送原始数据(指针 + 长度)
void send(const void *message, int len);
// 发送字符串片段(StringPiece 类型,避免拷贝)
void send(const StringPiece &message);
// 发送 Buffer 对象(支持缓冲区链式管理)
void send(Buffer *message);
// 主动关闭连接(半关闭,停止写入数据)
// 触发条件:发送完剩余数据后关闭写端,状态转为 kDisconnecting
void shutdown();
// 上下文管理:允许用户附加任意类型数据到连接对象
// 用途:例如存储会话 ID、用户认证信息等
void setContext(const boost::any &context) { context_ = context; }
const boost::any &getContext() const { return context_; }
boost::any *getMutableContext() { return &context_; }
// 设置回调函数(由 TcpServer 或用户代码调用)
// 连接状态变化回调(建立/断开时触发)
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; }
// 消息到达回调(收到数据时触发)
void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; }
private:
// 连接状态枚举
enum StateE {
kDisconnected, // 已断开(默认状态)
kConnecting, // 正在连接中(未完全建立)
kConnected, // 已连接(正常工作状态)
kDisconnecting // 正在断开中(等待数据发送完毕)
};
// 私有成员变量
EventLoop *loop_; // 所属事件循环(I/O 线程)
ConnectionCallback connectionCallback_; // 连接状态回调函数
MessageCallback messageCallback_; // 消息到达回调函数
WriteCompleteCallback writeCompleteCallback_; // 数据发送完成回调(未在接口中暴露)
boost::any context_; // 用户自定义上下文数据
};
关键设计说明
- 连接状态管理
- 状态通过 StateE 枚举控制,确保生命周期安全(如禁止在断开后发送数据)。
- 状态转换逻辑:
连接建立:kConnecting → kConnected
主动关闭:kConnected → kDisconnecting → kDisconnected
被动关闭:直接进入 kDisconnected
- 数据发送机制
- 缓冲队列:发送数据时先写入内核缓冲区,若缓冲区满则暂存到应用层缓冲队列,等待可写事件触发重试。
- 线程安全:send() 方法通过 loop_->runInLoop() 确保在 I/O 线程执行(若跨线程调用)。
- 回调函数
- connectionCallback_:在连接建立(kConnected)或断开(kDisconnected)时触发。
- messageCallback_:当收到对端数据时触发,参数通常为 TcpConnectionPtr 和 Buffer*(数据缓冲区)。
2.4 TcpClient类基础介绍
#include <?>
class TcpClient : noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop *loop,
const InetAddress &serverAddr,
const string &nameArg);
~TcpClient(); // force out-line dtor, for std::unique_ptr members.
void connect(); // 连接服务器
void disconnect(); // 关闭连接
void stop();
// 获取客⼾端对应的通信连接Connection对象的接⼝,发起connect后,有可能还没有连接建⽴成功
TcpConnectionPtr connection() const{
MutexLockGuard lock(mutex_);
return connection_;
}
/// 连接服务器成功时的回调函数
void setConnectionCallback(ConnectionCallback cb){
connectionCallback_ = std::move(cb);
}
/// 收到服务器发送的消息时的回调函数
void setMessageCallback(MessageCallback cb){
messageCallback_ = std::move(cb);
}
private:
EventLoop *loop_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
/*
需要注意的是,因为muduo库不管是服务端还是客⼾端都是异步操作,
对于客⼾端来说如果我们在连接还没有完全建⽴成功的时候发送数据,这是不被允许的。
因此我们可以使⽤内置的CountDownLatch类进⾏同步控制
*/
class CountDownLatch : noncopyable
{
public:
explicit CountDownLatch(int count);
void wait()
{
MutexLockGuard lock(mutex_);
while (count_ > 0)
{
condition_.wait();
}
}
void countDown()
{
MutexLockGuard lock(mutex_);
--count_;
if (count_ == 0)
{
condition_.notifyAll();
}
}
int getCount() const;
private:
mutable MutexLock mutex_;
Condition condition_ GUARDED_BY(mutex_);
int count_ GUARDED_BY(mutex_);
};
2.5 Buffer类基础介绍
#include <?>
class Buffer : public muduo::copyable
{
public:
// 缓冲区头部预留空间大小(用于快速添加协议头等前置数据)
static const size_t kCheapPrepend = 8;
// 缓冲区初始数据区大小
static const size_t kInitialSize = 1024;
// 构造函数:创建指定初始容量的缓冲区(总容量=预留空间+初始数据区)
explicit Buffer(size_t initialSize = kInitialSize);
// 交换两个缓冲区内容(高效的内存交换操作)
void swap(Buffer& rhs);
// 获取可读取数据的字节数(writerIndex_ - readerIndex_)
size_t readableBytes() const;
// 获取可写入空间的剩余字节数(buffer_.size() - writerIndex_)
size_t writableBytes() const;
// 返回指向可读数据起始位置的指针(不移动读索引)
const char* peek() const;
// 查找缓冲区中的行结束符(CRLF),返回位置指针,未找到返回nullptr
const char* findEOL() const;
// 从指定起始位置查找行结束符(CRLF)
const char* findEOL(const char* start) const;
// 从缓冲区消费指定长度的数据(前移读索引)
void retrieve(size_t len);
// 从缓冲区消费8字节数据(对应int64类型)
void retrieveInt64();
// 从缓冲区消费4字节数据(对应int32类型)
void retrieveInt16();
// 从缓冲区消费2字节数据(对应int16类型)
void retrieveInt8();
// 从缓冲区消费1字节数据(对应int8类型)
// 取出全部可读数据并转为字符串(同时清空缓冲区)
string retrieveAllAsString();
// 取出指定长度的数据并转为字符串
string retrieveAsString(size_t len);
// 向缓冲区追加数据(自动扩容)
void append(const StringPiece& str);
void append(const char* data, size_t len);
void append(const void* data, size_t len);
// 获取可写空间起始指针(非const版本允许写入操作)
char* beginWrite();
// 获取可写空间起始指针(const版本)
const char* beginWrite() const;
// 更新写索引位置(表示已写入指定长度的数据)
void hasWritten(size_t len);
// 追加网络字节序的整型数据(自动转换主机字节序)
void appendInt64(int64_t x);
void appendInt32(int32_t x);
void appendInt16(int16_t x);
void appendInt8(int8_t x);
// 读取并消费网络字节序的整型数据(自动转换为主机字节序)
int64_t readInt64();
int32_t readInt32();
int16_t readInt16();
int8_t readInt8();
// 查看网络字节序的整型数据(不移动读索引)
int64_t peekInt64() const;
int32_t peekInt32() const;
int16_t peekInt16() const;
int8_t peekInt8() const;
// 在预留空间前部添加网络字节序的整型数据(用于协议头等场景)
void prependInt64(int64_t x);
void prependInt32(int32_t x);
void prependInt16(int16_t x);
void prependInt8(int8_t x);
// 在预留空间前部添加任意数据
void prepend(const void* data, size_t len);
private:
std::vector<char> buffer_; // 数据存储容器
size_t readerIndex_; // 当前读位置索引
size_t writerIndex_; // 当前写位置索引
static const char kCRLF[]; // 行结束符常量(值为"\r\n")
};
🏳️🌈3 Muduo 使用案例
3.1 server.cpp
// 实现一个翻译服务器,客户端发送过来一个英语单词,返回一个汉语词语
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/TcpConnection.h>
#include <iostream>
#include <string>
#include <unordered_map>
class DictServer{
public:
DictServer(int port):
// 0.0.0.0 表示监听所有地址
_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "DictServer", muduo::net::TcpServer::kReusePort)
{
// 设置回调函数
// std::placeholders::_1 表示第一个参数,即 TcpConnectionPtr
_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
_server.setMessageCallback(std::bind(&DictServer::onMessage, this,
std::placeholders::_1, // TcpConnectionPtr
std::placeholders::_2, // Buffer*
std::placeholders::_3 // Timestamp
));
}
void Start(){
_server.start(); // 先开始监听
_baseloop.loop(); // 开始死循环事件监控
}
private:
// 方法: 连接
void onConnection(const muduo::net::TcpConnectionPtr& conn){
if(conn->connected())
std::cout << "连接成功!\n";
else
std::cout << "连接断开!\n";
}
// 方法: 处理客户端请求
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp){
static std::unordered_map<std::string, std::string> dict_map = {
{"hello", "你好"},
{"world", "世界"},
{"apple", "苹果"},
{"banana", "香蕉"},
{"orange", "橘子"}
};
std::cout << "0000000000000" << std::endl;
std::string msg = buffer->retrieveAllAsString(); // 获取所有数据
std::string res;
auto it = dict_map.find(msg);
std::cout << "1111111111111" << std::endl;
// 找到了
if(it != dict_map.end())
res = it->second; // 发送翻译结果
else
res = "没有找到对应的翻译!";
conn->send(res); // 发送翻译结果
}
private:
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
};
int main(){
DictServer server(9090);
server.Start();
return 0;
}
- 服务器初始化
-
构造函数 DictServer(int port)
创建 TcpServer 实例,绑定到 0.0.0.0:port,支持端口复用(kReusePort)。
设置两个回调函数:
- 连接回调 (onConnection): 处理客户端连接/断开事件。
- 消息回调 (onMessage): 处理客户端发送的数据。 -
启动服务器 Start()
- 调用 _server.start() 开始监听端口。
- 进入事件循环 _baseloop.loop(),等待客户端请求。
- 连接管理:onConnection
- 功能:
当客户端连接建立或断开时触发,输出日志。
- 消息处理:onMessage
核心流程:- 读取数据:
从 Buffer* 中提取所有数据作为字符串(retrieveAllAsString())。 - 查询字典:
在预定义的 unordered_map 中查找英文单词对应的中文翻译。 - 返回结果:
若找到则返回翻译,否则返回错误提示。
- 读取数据:
4. 主函数
- 创建 DictServer 实例并启动,监听端口 9090。
- 事件循环持续运行,直到程序终止。
- 代码流程图
启动服务器 (端口 9090)
│
├── 客户端连接
│ ├── onConnection: 打印日志
│ └── 等待消息...
│
└── 接收消息
├── onMessage: 读取数据 → 查询字典 → 返回结果
└── 持续监听...
3.2 client.cpp
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include <string>
class DictClient{
public:
DictClient(const std::string& ip, int sport):
_baseloop(_loopthread.startLoop()),
_downlatch(1), // 计数器初始化为1
_client(_baseloop, muduo::net::InetAddress(ip, sport), "DictClient")
{
// 设置连接事件(连接建立/管理)的回调
_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
// 设置连接消息的回调
_client.setMessageCallback(std::bind(&DictClient::onMessage, this,
std::placeholders::_1, // TcpConnectionPtr
std::placeholders::_2, // Buffer*
std::placeholders::_3 // Timestamp
));
// 连接服务器
_client.connect();
_downlatch.wait(); // 等待连接成功
// _baseloop.loop(); // 开始
}
bool send(const std::string& msg){
if(_conn->connected() == false){
std::cout << "连接已经断开,发送数据失败\n";
return false;
}
_conn->send(msg);
return true;
}
private:
// 方法: 连接
void onConnection(const muduo::net::TcpConnectionPtr& conn){
if(conn->connected()){
std::cout << "连接建立!\n";
_downlatch.countDown(); // 计数--,为0时,唤醒阻塞,表示连接成功
_conn = conn;
}
else{
std::cout << "连接失败!\n";
_conn.reset(); // 清空连接
}
}
// 方法: 处理客户端请求
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buffer, muduo::Timestamp){
std::string res = buffer->retrieveAllAsString();
std::cout << res << std::endl;
}
private:
muduo::net::TcpConnectionPtr _conn; // 连接对象
muduo::CountDownLatch _downlatch; // 计数器
muduo::net::EventLoopThread _loopthread; // 事件循环线程
muduo::net::EventLoop* _baseloop; // 事件循环对象
muduo::net::TcpClient _client; // 客户端对象
};
int main(){
DictClient client("127.0.0.1", 9090);
while(1){
std::string msg;
std::cin >> msg;
client.send(msg);
}
return 0;
}
-
客户端初始化
构造函数 DictClient(ip, port)- 创建 EventLoopThread,生成独立的事件循环线程 _baseloop。
- 初始化 CountDownLatch(计数器为1),用于同步等待连接建立。
- 创建 TcpClient 实例,绑定到服务器地址 ip:port。
- 设置回调函数:
- 连接回调 (onConnection): 处理连接成功/断开事件。
- 消息回调 (onMessage): 处理服务器返回的响应数据。 - 发起连接 (_client.connect()),并通过 _downlatch.wait() 阻塞主线程,直到连接成功。
-
连接管理:onConnection
功能:- 连接建立时:
输出日志 “连接建立!”。
调用 _downlatch.countDown() 将计数器减至0,唤醒主线程继续执行。
保存有效的 TcpConnectionPtr 到 _conn。 - 连接断开时:
输出日志 “连接失败!”。
清空 _conn,避免使用无效连接。
- 连接建立时:
-
消息发送:send
流程:- 检查 _conn->connected() 确认连接有效。
- 调用 _conn->send(msg) 发送用户输入的英文单词。
- 返回 bool 表示发送是否成功。
-
消息接收:onMessage
流程:- 从 Buffer* 中提取所有数据 (retrieveAllAsString())。
- 直接输出到控制台,显示服务器返回的中文翻译。
-
主函数
- 创建 DictClient 实例,连接到本地服务器 127.0.0.1:9090。
- 进入循环,等待用户输入英文单词并发送。
-
代码流程图
启动客户端 (连接 127.0.0.1:9090)
│
├── 连接服务器
│ ├── 成功:触发 onConnection → 计数器归零 → 主线程继续
│ └── 失败:清理连接并提示
│
├── 用户输入单词
│ └── 调用 send() 发送
│
└── 接收服务器响应
└── onMessage: 提取数据 → 打印结果
👥总结
本篇博文对 【从零实现Json-Rpc框架】- 第三方库介绍 - Muduo篇 做了一个较为详细的介绍,不知道对你有没有帮助呢
觉得博主写得还不错的三连支持下吧!会继续努力的~