Json-Rpc框架(Muduo库快速上手)
阅读导航
- 引言
- 一、Muduo库简介
- 二、Muduo库常见接口
- 1. TcpServer类基础介绍
- 2. EventLoop类基础介绍
- 3. TcpConnection类基础介绍
- 4. TcpClient类基础介绍
- 5. Buffer类基础介绍
- 三、Muduo库使用示例
- ⭕英译汉服务器
- ⭕英译汉客户端
引言
在上一篇文章中,我们简要介绍了在项目中使用的JsonCpp库,这是一个广泛使用的C++ JSON解析和生成库,它为我们的项目提供了高效、灵活的数据序列化与反序列化能力。然而,在构建服务架构时,仅依靠数据序列化是远远不够的。高效的网络通信是确保系统稳定运行、提升用户体验的关键环节。为此,在本篇文章中,我们将聚焦于项目中用到的另一个重要库——Muduo网络库。
Muduo是一个用于Linux多线程服务器的C++非阻塞网络库,它基于Reactor模式设计,提供了高性能的网络通信能力。它支持TCP、UDP等多种协议,并且拥有良好的可扩展性和灵活性。
一、Muduo库简介
Muduo,由陈硕大佬精心开发,是一个基于非阻塞IO和事件驱动的高性能C++ TCP网络编程库。它采用了主从Reactor模型,这种模型特别适用于构建高并发的网络服务器。在Muduo中,使用的线程模型被称为“one loop per thread”,这一理念的核心在于:
-
一个线程对应一个事件循环(EventLoop):这意味着每个线程都维护着它自己的事件循环,该循环专门用于响应和处理该线程内的计时器事件以及IO事件。这样的设计有助于减少线程间的竞争和同步开销,提高系统的并发处理能力。
-
一个文件描述符(或TCP连接)由单一线程处理:在Muduo中,每个TCP连接(或更一般地说,每个文件描述符)都被绑定到特定的EventLoop上,并由该EventLoop所在的线程负责其读写操作。这种绑定关系确保了数据的一致性和线程安全,避免了多线程同时操作同一资源可能导致的竞态条件和数据不一致问题。
二、Muduo库常见接口
1. TcpServer类基础介绍
#include <memory>
#include <functional>
#include <string>
#include "muduo/net/EventLoop.h"
#include "muduo/net/InetAddress.h"
#include "muduo/net/Timestamp.h"
#include "muduo/net/Buffer.h"
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr&, Buffer*, Timestamp)> MessageCallback;
class InetAddress : public muduo::copyable {
public:
InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable {
public:
enum Option {
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const std::string& nameArg,
Option option = kNoReusePort);
void setThreadNum(int numThreads);
void start();
// 当一个新连接建立成功的时候被调用
void setConnectionCallback(const ConnectionCallback& cb) {
connectionCallback_ = cb;
}
// 消息的业务处理回调函数---这是收到新连接消息的时候被调用的函数
void setMessageCallback(const MessageCallback& cb) {
messageCallback_ = cb;
}
private:
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
};
2. EventLoop类基础介绍
class EventLoop : noncopyable
{
public:
// 开始事件循环,直到调用quit()方法为止
void loop();
// 请求退出事件循环
void quit();
// 在指定的时间运行回调函数一次
// 参数time是回调应该被调用的时间戳
// 参数cb是当时间到达时应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runAt(Timestamp time, TimerCallback cb);
// 在当前时间加上指定的延迟后运行回调函数一次
// 参数delay是延迟时间(秒)
// 参数cb是当延迟时间过后应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runAfter(double delay, TimerCallback cb);
// 每隔指定的时间间隔重复运行回调函数
// 参数interval是时间间隔(秒)
// 参数cb是每隔interval秒应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runEvery(double interval, TimerCallback cb);
// 取消指定的定时器
// 参数timerId是之前通过runAt、runAfter或runEvery方法返回的定时器ID
void cancel(TimerId timerId);
private:
// 原子变量,用于指示事件循环是否应该退出
std::atomic<bool> quit_;
// 指向Poller对象的智能指针,Poller负责轮询I/O事件
std::unique_ptr<Poller> poller_;
// 互斥锁,用于保护多线程访问共享数据
mutable MutexLock mutex_;
// 存储待执行函数的向量,这些函数将在事件循环的某个点被执行
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
3. TcpConnection类基础介绍
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
// 构造函数,用于创建TcpConnection对象
// 参数包括事件循环指针、连接名称、套接字文件描述符、本地地址和远程地址
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
// 检查连接是否已建立
bool connected() const { return state_ == kConnected; }
// 检查连接是否已断开
bool disconnected() const { return state_ == kDisconnected; }
// 发送字符串消息(使用C++11的移动语义)
void send(string&& message);
// 发送原始数据
void send(const void* message, int len);
// 使用StringPiece发送消息(StringPiece是Google的字符串切片类,用于高效处理字符串片段)
void send(const StringPiece& message);
// 发送Buffer对象中的数据
void send(Buffer* message);
// 关闭连接
void shutdown();
// 设置连接上下文,上下文可以是任意类型的数据,通过boost::any存储
void setContext(const boost::any& context)
{ context_ = context; }
// 获取连接上下文(常量引用)
const boost::any& getContext() const
{ return context_; }
// 获取连接上下文的可修改指针(注意:这可能不是线程安全的,使用时需要小心)
boost::any* getMutableContext()
{ return &context_; }
// 设置连接建立时的回调函数
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
// 设置接收到消息时的回调函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
private:
// 连接的状态枚举
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
// 指向事件循环的指针,用于在连接上执行定时任务或异步操作
EventLoop* loop_;
// 连接建立时的回调函数
ConnectionCallback connectionCallback_;
// 接收到消息时的回调函数
MessageCallback messageCallback_;
// 发送完成时的回调函数
WriteCompleteCallback writeCompleteCallback_;
// 上下文信息,可以是任意类型的数据,通过boost::any存储
boost::any context_;
// 连接的状态
StateE state_;
};
4. TcpClient类基础介绍
class TcpClient : noncopyable
{
public:
// 构造函数,用于创建 TcpClient 对象。
// 需要提供事件循环指针、服务器地址和客户端名称。
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
// 析构函数,声明为 out-of-line(在类定义外部实现),
// 以便处理 std::unique_ptr 成员(尽管在这个类的定义中没有直接显示)。
~TcpClient();
// 连接到服务器。
void connect();
// 关闭连接。
void disconnect();
// 停止客户端操作,可能包括关闭连接和清理资源。
void stop();
// 获取客户端对应的通信连接 TcpConnection 对象的接口。
// 注意:在发起 connect 后,连接可能尚未建立成功。
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_); // 加锁以保护 connection_
return connection_; // 返回当前连接(如果有的话)
}
// 设置连接成功时的回调函数。
void setConnectionCallback(ConnectionCallback cb)
{
connectionCallback_ = std::move(cb); // 使用移动语义设置回调
}
// 设置收到服务器发送的消息时的回调函数。
void setMessageCallback(MessageCallback cb)
{
messageCallback_ = std::move(cb); // 使用移动语义设置回调
}
private:
EventLoop* loop_; // 指向事件循环的指针,用于处理异步事件
ConnectionCallback connectionCallback_; // 连接成功时的回调函数
MessageCallback messageCallback_; // 收到消息时的回调函数
// 注意:WriteCompleteCallback 在此类的定义中没有直接出现,但可能在其他地方使用
TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 当前连接(受 mutex_ 保护)
mutable MutexLock mutex_; // 用于保护 connection_ 的互斥锁
};
// CountDownLatch 类是一个同步辅助类,用于让一个或多个线程等待直到其他线程的一系列操作完成。
// 它继承自 noncopyable 以防止被复制。
class CountDownLatch : noncopyable
{
public:
// 构造函数,初始化计数器。
explicit CountDownLatch(int count);
// 等待计数器变为零。如果计数器不为零,则当前线程将阻塞。
void wait()
{
MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_
while (count_ > 0) // 如果计数器大于零,则等待
{
condition_.wait(); // 释放锁并进入等待状态,直到被唤醒
}
}
// 将计数器减一。如果计数器变为零,则唤醒所有等待的线程。
void countDown()
{
MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_
--count_; // 计数器减一
if (count_ == 0) // 如果计数器变为零
{
condition_.notifyAll(); // 唤醒所有等待的线程
}
}
// 获取当前计数器的值(主要用于调试)。
int getCount() const;
private:
mutable MutexLock mutex_; // 用于保护 count_ 和 condition_ 的互斥锁
Condition condition_ GUARDED_BY(mutex_); // 条件变量,与 mutex_ 一起使用以实现等待/通知机制
int count_ GUARDED_BY(mutex_); // 计数器,表示需要等待的操作数量
};
5. Buffer类基础介绍
// Buffer 类是一个字节缓冲区类,支持从两端读写数据,以及处理整数和基本字符串。
// 它继承自 muduo::copyable,表明这个类是可以被拷贝的。
class Buffer : public muduo::copyable
{
public:
// 定义了一个便宜的前置空间大小,用于优化读操作。
static const size_t kCheapPrepend = 8;
// 定义了缓冲区的初始大小。
static const size_t kInitialSize = 1024;
// 构造函数,接受一个可选的初始大小参数。
// 缓冲区实际大小为 kCheapPrepend + initialSize,其中 kCheapPrepend 用于优化读操作。
explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{}
// 与另一个Buffer对象交换内容。
void swap(Buffer& rhs);
// 返回可读字节数,即 writerIndex_ - readerIndex_。
size_t readableBytes() const;
// 返回可写字节数,即 buffer_.size() - writerIndex_。
size_t writableBytes() const;
// 返回一个指向可读数据的指针,但不移动读写索引。
const char* peek() const;
// 查找并返回指向缓冲区中第一个EOL(如"\r\n")的指针,从头开始搜索。
const char* findEOL() const;
// 查找并返回指向缓冲区中从指定位置开始的第一个EOL的指针。
const char* findEOL(const char* start) const;
// 从缓冲区中移除指定长度的数据。
void retrieve(size_t len);
// 移除并返回缓冲区中下一个 int64_t 类型的数据。
void retrieveInt64();
// 移除并返回缓冲区中下一个 int32_t 类型的数据。
void retrieveInt32();
// 移除并返回缓冲区中下一个 int16_t 类型的数据。
void retrieveInt16();
// 移除并返回缓冲区中下一个 int8_t 类型的数据。
void retrieveInt8();
// 移除并返回缓冲区中所有可读数据作为字符串。
string retrieveAllAsString();
// 移除并返回缓冲区中指定长度的数据作为字符串。
string retrieveAsString(size_t len);
// 向缓冲区末尾追加 StringPiece 对象。
void append(const StringPiece& str);
// 向缓冲区末尾追加指定长度的数据。
void append(const char* /*restrict*/ data, size_t len);
// 向缓冲区末尾追加指定长度的数据(泛型版本)。
void append(const void* /*restrict*/ data, size_t len);
// 返回一个指向缓冲区末尾(用于写入)的指针。
char* beginWrite();
// 返回一个指向缓冲区末尾(用于写入)的常量指针。
const char* beginWrite() const;
// 更新写入索引,表示已经写入了指定长度的数据。
void hasWritten(size_t len);
// 向缓冲区末尾追加一个 int64_t 类型的数据。
void appendInt64(int64_t x);
// 向缓冲区末尾追加一个 int32_t 类型的数据。
void appendInt32(int32_t x);
// 向缓冲区末尾追加一个 int16_t 类型的数据。
void appendInt16(int16_t x);
// 向缓冲区末尾追加一个 int8_t 类型的数据。
void appendInt8(int8_t x);
// 从缓冲区读取一个 int64_t 类型的数据,并移动读索引。
int64_t readInt64();
// 从缓冲区读取一个 int32_t 类型的数据,并移动读索引。
int32_t readInt32();
// 从缓冲区读取一个 int16_t 类型的数据,并移动读索引。
int16_t readInt16();
// 从缓冲区读取一个 int8_t 类型的数据,并移动读索引。
int8_t readInt8();
// 从缓冲区中查看(不移动读索引)下一个 int64_t 类型的数据。
int64_t peekInt64() const;
// 从缓冲区中查看(不移动读索引)下一个 int32_t 类型的数据。
int32_t peekInt32() const;
// 从缓冲区中查看(不移动读索引)下一个 int16_t 类型的数据。
int16_t peekInt16() const;
// 从缓冲区中查看(不移动读索引)下一个 int8_t 类型的数据。
int8_t peekInt8() const;
// 在缓冲区开头(readerIndex_ 之前)追加一个 int64_t 类型的数据。
void prependInt64(int64_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int32_t 类型的数据。
void prependInt32(int32_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int16_t 类型的数据。
void prependInt16(int16_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int8_t 类型的数据。
void prependInt8(int8_t x);
// 在缓冲区开头(readerIndex_ 之前)追加指定长度的数据。
void prepend(const void* /*restrict*/ data, size_t len);
private:
std::vector<char> buffer_; // 存储字节数据的向量。
size_t readerIndex_; // 读索引,指向下一个可读字节的位置。
size_t writerIndex_; // 写索引,指向下一个可写字节的位置。
static const char kCRLF[]; // 可能的行结束符,如 "\r\n"。
};
三、Muduo库使用示例
接下来,我们将利用Muduo网络库来构建一个基础的英译汉翻译服务器及其对应的客户端。
⭕英译汉服务器
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
// 定义一个翻译服务器类
class DictServer {
public:
// 构造函数,初始化服务器监听端口
DictServer(int port)
: _server(&_baseloop, // 使用_baseloop事件循环
muduo::net::InetAddress("0.0.0.0", port), // 监听地址和端口
"DictServer", // 服务器名称
muduo::net::TcpServer::kReusePort) // 启用端口复用
{
// 设置连接建立/断开的回调函数
_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
// 设置接收到消息的回调函数
_server.setMessageCallback(std::bind(&DictServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
}
// 启动服务器
void start() {
_server.start(); // 开始监听端口
_baseloop.loop(); // 进入事件循环,等待并处理事件
}
private:
// 连接建立或断开的回调函数
void onConnection(const muduo::net::TcpConnectionPtr &conn) {
if (conn->connected()) {
std::cout << "连接建立!\n";
} else {
std::cout << "连接断开!\n";
}
}
// 接收消息的回调函数
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp) {
// 静态的字典映射,用于翻译
static std::unordered_map<std::string, std::string> dict_map = {
{"hello", "你好"},
{"world", "世界"},
{"apple", "苹果"}
};
// 从缓冲区中检索所有消息作为字符串
std::string msg = buf->retrieveAllAsString();
std::string res;
// 在字典中查找单词
auto it = dict_map.find(msg);
if (it != dict_map.end()) {
res = it->second; // 找到,返回对应的翻译
} else {
res = "未知单词!"; // 未找到,返回未知单词消息
}
// 发送翻译结果给客户端
conn->send(res);
}
private:
// 事件循环对象,用于处理网络事件
muduo::net::EventLoop _baseloop;
// TCP服务器对象,用于监听和接受连接
muduo::net::TcpServer _server;
};
int main()
{
// 创建并初始化翻译服务器,监听9090端口
DictServer server(9090);
// 启动服务器
server.start();
return 0;
}
⭕英译汉客户端
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h>
#include <iostream>
#include <string>
// 定义一个字典客户端类
class DictClient {
public:
// 构造函数,初始化客户端
DictClient(const std::string &sip, int sport)
: _loopthread(), // 创建EventLoopThread对象,但此时不启动
_baseloop(_loopthread.startLoop()), // 启动EventLoopThread并获取其EventLoop
_downlatch(1), // 初始化CountDownLatch,计数为1,用于等待连接建立
_client(_baseloop, muduo::net::InetAddress(sip, sport), "DictClient") // 初始化TcpClient
{
// 设置连接事件(连接建立/断开)的回调函数
_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
// 设置接收到消息的回调函数
_client.setMessageCallback(std::bind(&DictClient::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// 连接到服务器
_client.connect();
// 等待连接建立,这里使用CountDownLatch来阻塞当前线程
_downlatch.wait();
}
// 发送消息到服务器
bool send(const std::string &msg) {
if (!_conn->connected()) { // 检查连接是否仍然有效
std::cout << "连接已经断开,发送数据失败!\n";
return false;
}
_conn->send(msg); // 发送消息
return true;
}
private:
// 连接事件回调,处理连接建立或断开的情况
void onConnection(const muduo::net::TcpConnectionPtr &conn) {
if (conn->connected()) {
std::cout << "连接建立!\n";
_downlatch.countDown(); // 连接建立后,减少CountDownLatch的计数
_conn = conn; // 保存TcpConnectionPtr对象
} else {
std::cout << "连接断开!\n";
_conn.reset(); // 清除TcpConnectionPtr对象
}
}
// 消息接收回调,处理从服务器接收到的消息
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp) {
std::string res = buf->retrieveAllAsString(); // 从缓冲区中检索所有消息
std::cout << res << std::endl; // 打印接收到的消息
}
private:
muduo::net::TcpConnectionPtr _conn; // TcpConnection的智能指针,用于存储连接对象
muduo::CountDownLatch _downlatch; // 计数器,用于等待连接建立
muduo::net::EventLoopThread _loopthread; // 事件循环线程
muduo::net::EventLoop *_baseloop; // 指向EventLoopThread中EventLoop的指针
muduo::net::TcpClient _client; // TcpClient对象,用于连接和发送消息
};
int main()
{
// 创建字典客户端实例,连接到本地9090端口的服务器
DictClient client("127.0.0.1", 9090);
while(1) {
std::string msg;
std::cin >> msg; // 读取用户输入的消息
client.send(msg); // 发送消息到服务器
}
return 0;
}