muduo库的使用
muduo库的使用
- 1. 下载安装mudu库
- 2. muduo库的原理
- 3. muduo库常见接口介绍
- 3.1 TcpServer类基础介绍
- 3.2 EventLoop类基础介绍
- 3.3TcpConnection类基础介绍
- 3.4 Buffer类基础介绍
- 3.5 TcpClient类基础介绍
- 4. 示例加深理解
- 4.1 服务端
- 4.2 客户端
- 4.3 Makefile编写
1. 下载安装mudu库
- git方法
git clone https://github.com/chenshuo/muduo.git
- 安装依赖环境
sudo apt-get install libz-dev libboost-all-dev
- 进入目录运行脚本编译安装
[……]$ ./build.sh
[……]$ ./build.sh install
此时会在同级目录下生成一个build目录
2. muduo库的原理
Muduoku
是由陈硕大佬开发的一个基于非阻塞IO和时间驱动的C++高并发TCP网络编程库。它是一款使用epoll多路转接,基于主从Reactor模型的网络库,其使用的线程模型是one loop per thread
,所谓one loop per thread
指的是:
- 一个线程只能有一个事件循环(
EventLoop)
,用于响应计时器和IO事件。 - 一个文件描述符只能由一个线程进行读写,换句话说就是一个TCP连接必须属于某个
EventLoop
管理
在我们之前我们讲解的select,poll,epoll
这些多路转接模型的工作原理都是基于单线程的事件监听。将listen_socket
添加到事件监控中,然后根据listen_socket
监听,一旦由client进行连接就创建socket
并也把这个socket
添加到事件监听中,然后就可以对已添加的socket
进行事件监控,一旦触发了某个事件就对其进行处理。这种通过多路转接模型进行事件触发的IO处理的模型就叫做Reactor模型。但是这也会带来很大的缺陷,前面我们说过了他们都是基于单线程实现的,也就是说执行流只会有一个,如果client
有大量的请求连接的话,假如同一时间有1000个请求,那么第1000个请求就需要等带前999个请求处理完后才会执行第1000个请求,而没处理一个请求时需要花费一定时间的,这个效率对用户的体验是非常差的,所以对于高并发场景的单个Reactor
模型是远远不够的。
所以muduo
库就诞生了,muduo
库的模型是由一个主Reactor
和多个子Reactor
组成的,每个Reactor
由一个线程进行维护,主Reactor
负责监听新的连接请求,并将新连接分发给子Reactor
。主Reactor
使用epoll机制监听端口,当有新的连接请求时,将其分配给某个子Reactor
。子Reactor负责处理分配给它的连接的所有IO操作。每个子Reactor
在一个独立的线程中运行,减少了线程间的上下文切换,提高了效率
3. muduo库常见接口介绍
3.1 TcpServer类基础介绍
主要代码逻辑框架
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void(const TcpConnectionPtr &)> ConnectionCallback;
typedef std::function<void(const TcpConnectionPtr &,
Buffer *,
Timestamp)>
MessageCallback;
class InetAddress : public muduo::copyable
{
public:
InetAddress(StringArg ip, uint16_t port, bool ipv6 = false);
};
class TcpServer : noncopyable
{
public:
enum Option
{
kNoReusePort,
kReusePort,
};
TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const string &nameArg,
Option option = kNoReusePort); // 第四个参数:是否设置为地址重用
// 设置线程数量,也就是设置几个Reactor,默认是只有一个线程也就是主线程
void setThreadNum(int numThreads);
// 开始时间监控,获取新的连接
void start();
/// 当⼀个新连接建⽴成功的时候被调⽤(一个连接建立成功/断开时执行,比如一个广播聊天,一个用户上线里就通知某某上线了,当他线下了也通知某某下线了。)
void setConnectionCallback(const ConnectionCallback &cb)
{
connectionCallback_ = cb;
}
/// 消息的业务处理回调函数---这是收到新连接消息的时候被调⽤的函数
void setMessageCallback(const MessageCallback &cb)
{
messageCallback_ = cb;
}
};
3.2 EventLoop类基础介绍
class EventLoop : noncopyable
{
public:
// 开始事件循环,直到调用quit()方法为止
void loop();
// 请求退出事件循环
void quit();
// 在指定的时间运行回调函数一次
// 参数time是回调应该被调用的时间戳
// 参数cb是当时间到达时应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runAt(Timestamp time, TimerCallback cb);
// 在当前时间加上指定的延迟后运行回调函数一次
// 参数delay是延迟时间(秒)
// 参数cb是当延迟时间过后应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runAfter(double delay, TimerCallback cb);
// 每隔指定的时间间隔重复运行回调函数
// 参数interval是时间间隔(秒)
// 参数cb是每隔interval秒应该被调用的回调函数
// 返回TimerId,可用于取消定时器
TimerId runEvery(double interval, TimerCallback cb);
// 取消指定的定时器
// 参数timerId是之前通过runAt、runAfter或runEvery方法返回的定时器ID
void cancel(TimerId timerId);
private:
// 原子变量,用于指示事件循环是否应该退出
std::atomic<bool> quit_;
// 指向Poller对象的智能指针,Poller负责轮询I/O事件
std::unique_ptr<Poller> poller_;
// 互斥锁,用于保护多线程访问共享数据
mutable MutexLock mutex_;
// 存储待执行函数的向量,这些函数将在事件循环的某个点被执行
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
3.3TcpConnection类基础介绍
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
// 构造函数,用于创建TcpConnection对象
// 参数包括事件循环指针、连接名称、套接字文件描述符、本地地址和远程地址
TcpConnection(EventLoop* loop,
const string& name,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr);
// 检查连接是否已建立
bool connected() const { return state_ == kConnected; }
// 检查连接是否已断开
bool disconnected() const { return state_ == kDisconnected; }
// 发送字符串消息(使用C++11的移动语义)
void send(string&& message);
// 发送原始数据
void send(const void* message, int len);
// 使用StringPiece发送消息(StringPiece是Google的字符串切片类,用于高效处理字符串片段)
void send(const StringPiece& message);
// 发送Buffer对象中的数据
void send(Buffer* message);
// 关闭连接
void shutdown();
// 设置连接上下文,上下文可以是任意类型的数据,通过boost::any存储
void setContext(const boost::any& context)
{ context_ = context; }
// 获取连接上下文(常量引用)
const boost::any& getContext() const
{ return context_; }
// 获取连接上下文的可修改指针(注意:这可能不是线程安全的,使用时需要小心)
boost::any* getMutableContext()
{ return &context_; }
// 设置连接建立时的回调函数
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
// 设置接收到消息时的回调函数
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
private:
// 连接的状态枚举
enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
// 指向事件循环的指针,用于在连接上执行定时任务或异步操作
EventLoop* loop_;
// 连接建立时的回调函数
ConnectionCallback connectionCallback_;
// 接收到消息时的回调函数
MessageCallback messageCallback_;
// 发送完成时的回调函数
WriteCompleteCallback writeCompleteCallback_;
// 上下文信息,可以是任意类型的数据,通过boost::any存储
boost::any context_;
// 连接的状态
StateE state_;
};
3.4 Buffer类基础介绍
// 缓冲区类
// Buffer 类是一个字节缓冲区类,支持从两端读写数据,以及处理整数和基本字符串。
// 它继承自 muduo::copyable,表明这个类是可以被拷贝的。
class Buffer : public muduo::copyable
{
public:
// 定义了一个便宜的前置空间大小,用于优化读操作。
static const size_t kCheapPrepend = 8;
// 定义了缓冲区的初始大小。
static const size_t kInitialSize = 1024;
// 构造函数,接受一个可选的初始大小参数。
// 缓冲区实际大小为 kCheapPrepend + initialSize,其中 kCheapPrepend 用于优化读操作。
explicit Buffer(size_t initialSize = kInitialSize)
: buffer_(kCheapPrepend + initialSize),
readerIndex_(kCheapPrepend),
writerIndex_(kCheapPrepend)
{}
// 与另一个Buffer对象交换内容。
void swap(Buffer& rhs);
// 返回可读字节数,即 writerIndex_ - readerIndex_。
size_t readableBytes() const;
// 返回可写字节数,即 buffer_.size() - writerIndex_。
size_t writableBytes() const;
// 返回一个指向可读数据的指针,但不移动读写索引。
const char* peek() const;
// 查找并返回指向缓冲区中第一个EOL(如"\r\n")的指针,从头开始搜索。
const char* findEOL() const;
// 查找并返回指向缓冲区中从指定位置开始的第一个EOL的指针。
const char* findEOL(const char* start) const;
// 从缓冲区中移除指定长度的数据。
void retrieve(size_t len);
// 移除并返回缓冲区中下一个 int64_t 类型的数据。
void retrieveInt64();
// 移除并返回缓冲区中下一个 int32_t 类型的数据。
void retrieveInt32();
// 移除并返回缓冲区中下一个 int16_t 类型的数据。
void retrieveInt16();
// 移除并返回缓冲区中下一个 int8_t 类型的数据。
void retrieveInt8();
// 移除并返回缓冲区中所有可读数据作为字符串。
string retrieveAllAsString();
// 移除并返回缓冲区中指定长度的数据作为字符串。
string retrieveAsString(size_t len);
// 向缓冲区末尾追加 StringPiece 对象。
void append(const StringPiece& str);
// 向缓冲区末尾追加指定长度的数据。
void append(const char* /*restrict*/ data, size_t len);
// 向缓冲区末尾追加指定长度的数据(泛型版本)。
void append(const void* /*restrict*/ data, size_t len);
// 返回一个指向缓冲区末尾(用于写入)的指针。
char* beginWrite();
// 返回一个指向缓冲区末尾(用于写入)的常量指针。
const char* beginWrite() const;
// 更新写入索引,表示已经写入了指定长度的数据。
void hasWritten(size_t len);
// 向缓冲区末尾追加一个 int64_t 类型的数据。
void appendInt64(int64_t x);
// 向缓冲区末尾追加一个 int32_t 类型的数据。
void appendInt32(int32_t x);
// 向缓冲区末尾追加一个 int16_t 类型的数据。
void appendInt16(int16_t x);
// 向缓冲区末尾追加一个 int8_t 类型的数据。
void appendInt8(int8_t x);
// 从缓冲区读取一个 int64_t 类型的数据,并移动读索引。
int64_t readInt64();
// 从缓冲区读取一个 int32_t 类型的数据,并移动读索引。
int32_t readInt32();
// 从缓冲区读取一个 int16_t 类型的数据,并移动读索引。
int16_t readInt16();
// 从缓冲区读取一个 int8_t 类型的数据,并移动读索引。
int8_t readInt8();
// 从缓冲区中查看(不移动读索引)下一个 int64_t 类型的数据。
int64_t peekInt64() const;
// 从缓冲区中查看(不移动读索引)下一个 int32_t 类型的数据。
int32_t peekInt32() const;
// 从缓冲区中查看(不移动读索引)下一个 int16_t 类型的数据。
int16_t peekInt16() const;
// 从缓冲区中查看(不移动读索引)下一个 int8_t 类型的数据。
int8_t peekInt8() const;
// 在缓冲区开头(readerIndex_ 之前)追加一个 int64_t 类型的数据。
void prependInt64(int64_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int32_t 类型的数据。
void prependInt32(int32_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int16_t 类型的数据。
void prependInt16(int16_t x);
// 在缓冲区开头(readerIndex_ 之前)追加一个 int8_t 类型的数据。
void prependInt8(int8_t x);
// 在缓冲区开头(readerIndex_ 之前)追加指定长度的数据。
void prepend(const void* /*restrict*/ data, size_t len);
private:
std::vector<char> buffer_; // 存储字节数据的向量。
size_t readerIndex_; // 读索引,指向下一个可读字节的位置。
size_t writerIndex_; // 写索引,指向下一个可写字节的位置。
static const char kCRLF[]; // 可能的行结束符,如 "\r\n"。
};
3.5 TcpClient类基础介绍
class TcpClient : noncopyable
{
public:
// 构造函数,用于创建 TcpClient 对象。
// 需要提供事件循环指针、服务器地址和客户端名称。
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
// 析构函数,声明为 out-of-line(在类定义外部实现),
// 以便处理 std::unique_ptr 成员(尽管在这个类的定义中没有直接显示)。
~TcpClient();
// 连接到服务器。
void connect();
// 关闭连接。
void disconnect();
// 停止客户端操作,可能包括关闭连接和清理资源。
void stop();
// 获取客户端对应的通信连接 TcpConnection 对象的接口。
// 注意:在发起 connect 后,连接可能尚未建立成功。
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_); // 加锁以保护 connection_
return connection_; // 返回当前连接(如果有的话)
}
// 设置连接成功时的回调函数。
void setConnectionCallback(ConnectionCallback cb)
{
connectionCallback_ = std::move(cb); // 使用移动语义设置回调
}
// 设置收到服务器发送的消息时的回调函数。
void setMessageCallback(MessageCallback cb)
{
messageCallback_ = std::move(cb); // 使用移动语义设置回调
}
private:
EventLoop* loop_; // 指向事件循环的指针,用于处理异步事件
ConnectionCallback connectionCallback_; // 连接成功时的回调函数
MessageCallback messageCallback_; // 收到消息时的回调函数
// 注意:WriteCompleteCallback 在此类的定义中没有直接出现,但可能在其他地方使用
TcpConnectionPtr connection_ GUARDED_BY(mutex_); // 当前连接(受 mutex_ 保护)
mutable MutexLock mutex_; // 用于保护 connection_ 的互斥锁
};
// CountDownLatch 类是一个同步辅助类,用于让一个或多个线程等待直到其他线程的一系列操作完成。
// 它继承自 noncopyable 以防止被复制。
class CountDownLatch : noncopyable
{
public:
// 构造函数,初始化计数器。
explicit CountDownLatch(int count);
// 等待计数器变为零。如果计数器不为零,则当前线程将阻塞。
void wait()
{
MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_
while (count_ > 0) // 如果计数器大于零,则等待
{
condition_.wait(); // 释放锁并进入等待状态,直到被唤醒
}
}
// 将计数器减一。如果计数器变为零,则唤醒所有等待的线程。
void countDown()
{
MutexLockGuard lock(mutex_); // 加锁以保护 count_ 和 condition_
--count_; // 计数器减一
if (count_ == 0) // 如果计数器变为零
{
condition_.notifyAll(); // 唤醒所有等待的线程
}
}
// 获取当前计数器的值(主要用于调试)。
int getCount() const;
private:
mutable MutexLock mutex_; // 用于保护 count_ 和 condition_ 的互斥锁
Condition condition_ GUARDED_BY(mutex_); // 条件变量,与 mutex_ 一起使用以实现等待/通知机制
int count_ GUARDED_BY(mutex_); // 计数器,表示需要等待的操作数量
};
4. 示例加深理解
编写一个简单的翻译模块。
4.1 服务端
#include <muduo/net/TcpServer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <iostream>
#include <string>
#include <unordered_map>
class DictServer{
public:
DictServer(int port = 9090):_server(&_baseloop,
muduo::net::InetAddress("127.0.0.1",port),
"DictServer",
muduo::net::TcpServer::kReusePort)//第四个参数:是否启动地址重用
{
// 设置回调函数
// 设置连接(连接建立/管理)事件的回调
_server.setConnectionCallback(std::bind(&DictServer::onConnection, this, std::placeholders::_1));
// 设置连接消息的回调
_server.setMessageCallback(std::bind(&DictServer::onMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
}
// 启动服务器
void start(){
_server.start();// 开始监听
_baseloop.loop();// 开始事件循环监控
}
void onConnection(const muduo::net::TcpConnectionPtr& conn)
{
if (conn->connected()){
std::cout << "连接成功\n";
}else{
std::cout << "连接失败\n";
}
}
void onMessage(const muduo::net::TcpConnectionPtr& conn, muduo::net::Buffer* buf, muduo::Timestamp time){
static std::unordered_map<std::string, std::string> dict_map = {
{"hello", "你好"},
{"world", "世界"},
{"map","图"}
};
// 获取所有数据,不需要提供rceve接收消息的接口,直接从缓冲区中拿数据
std::string msg = buf->retrieveAllAsString();
auto it = dict_map.find(msg);
std::string res;
if (it != dict_map.end()){
res = it->second;
}else{
res = "未知单词!";
}
conn->send(res);
}
private:
// 事件循环对象,用户处理网络事件
muduo::net::EventLoop _baseloop; // 这个一定要声明在_server前面,因为需要用EventLoop来构造TcpServer
// TCP服务对象,用于监听和接收连接
muduo::net::TcpServer _server;
};
int main()
{
DictServer server(9090);
server.start();
return 0;
}
4.2 客户端
#include <muduo/net/TcpClient.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThread.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/Buffer.h>
#include <muduo/base/CountDownLatch.h> // 用来同步连接,即连接成功后才开始发消息
#include <iostream>
#include <string>
class DictClient
{
public:
DictClient(const std::string& sip, int port)
: _baseloop(_threadloop.startLoop()), // 这里threadloop返回的是一个EventLoop类型
_client(_baseloop, muduo::net::InetAddress(sip, port), "DictClient"),
_downlatch(1) // 这里初始化为1,为0时才会被唤醒
{
// 设置回调函数
// 设置连接(连接建立/管理)事件的回调
_client.setConnectionCallback(std::bind(&DictClient::onConnection, this, std::placeholders::_1));
// 设置连接消息的回调
_client.setMessageCallback(std::bind(&DictClient::onMessage,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3));
_client.connect();
_downlatch.wait(); // 阻塞等待连接成功后被唤醒
//_baseloop.loop(); // 客户端不能这样写,因为loop是一个死循环,一旦这里执行了loop就一定不会走到send发送数据那一块
// 所以这里要给baseloop.loop在一个新的线程里跑,muduo库提供了一个EventLoopThread可以做到
// 此时我们就不需要使用_baseloop.loop();循环了EventLoopThread里面会帮我们去loop、
}
bool send(const std::string &msg)
{
if (_conn->connected() == false)
{
return false;
std::cout << "连接已断开,发送数据失败" << std::endl;
}
_conn->send(msg);
return true;
}
void onConnection(const muduo::net::TcpConnectionPtr &conn)
{
if (conn->connected())
{
std::cout << "连接成功\n";
_downlatch.countDown(); // 连接成功计数器--,为0时唤醒阻塞
_conn = conn;
}
else
{
std::cout << "连接失败\n";
_conn.reset(); // 清空
}
}
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time)
{
std::string res = buf->retrieveAllAsString();
std::cout << res << std::endl;
}
private:
muduo::net::TcpConnectionPtr _conn; // TcpConnection的智能指针,用于存放连接对象
muduo::CountDownLatch _downlatch; // 计数器用于实现同步
muduo::net::EventLoopThread _threadloop; // 定义这个的时候EventLoopThread 内部执行startloop便可以触发EventLoop 的loop事件循环,
muduo::net::EventLoop *_baseloop;指向EventLoopThread中EventLoop指针
muduo::net::TcpClient _client; // TcpClient对象,用于连接和发送数据
};
int main()
{
DictClient client("127.0.0.1", 9090);
while (true)
{
std::string msg;
std::cin >> msg;
client.send(msg);
}
return 0;
}
这里我们可以跳转到startLoop的实现:
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start();
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();
}
loop = loop_;
}
return loop;
}
在跳转到thread_.start()
void Thread::start()
{
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData* data = new detail::ThreadData(func_, name_, &tid_, &latch_);
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) // 这里就会创建一个线程,这个线程实现的方法就是EventLoop的loop事件循环监控行为
{
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
else
{
latch_.wait();
assert(tid_ > 0);
}
}
void* startThread(void* obj)
{
ThreadData* data = static_cast<ThreadData*>(obj);
data->runInThread();
delete data;
return NULL;
}
void runInThread()
{
……
func_();
……
};
这个func_()是一个回调函数,我们可以去看一下EventLoopThread的构造函数,构造了一个
void EventLoopThread::threadFunc()
{
……
EventLoop loop;
loop.loop(); // 这个就是
……
}
4.3 Makefile编写
CFLAG = -std=c++11 -I ../../build/release-install-cpp11/include/ #指定头文件路径(这里需要根据自身的情况具体填写)
LFLAG = -L ../../build/release-install-cpp11/lib -lmuduo_net -lmuduo_base -lpthread #指定包含库文件路径,注意这里lmuduo_net要先连接(这里需要根据自身的情况具体填写)
all:server client
server:server.cpp
g++ -o $@ $^ $(CFLAG) $(LFLAG)
client:client.cpp
g++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:
rm -fr server client