c++如何利用线程池和epool设计高并发服务器
设计一个高并发服务器需要有效地处理大量同时连接的客户端请求。结合线程池和epoll可以实现高效的I/O多路复用和任务并发处理。
1. 基本概念
- 线程池:用于管理和重用线程,避免频繁创建和销毁线程带来的开销。
- epoll:Linux下的高效I/O多路复用机制,适合处理大量并发连接。
2. 设计步骤
1. 初始化
- 创建一个线程池,预先启动一定数量的线程以备使用。
- 创建一个epoll实例,用于监控多个文件描述符上的I/O事件。
2. 监听和接受连接
- 使用
socket()
创建服务器套接字,并使用bind()
和listen()
绑定到特定的IP和端口。 - 使用
epoll_ctl()
注册服务器套接字上的EPOLLIN
事件,以便在有新连接时得到通知。
3. 事件循环
- 使用
epoll_wait()
等待事件的发生。 - 对于每个事件,根据其类型进行处理:
- 新连接:接受新连接,设置为非阻塞模式,并使用
epoll_ctl()
注册到epoll实例中,关注EPOLLIN
事件。 - 可读事件:从套接字读取数据,并将读取任务提交到线程池进行处理。
- 可写事件:将待发送的数据写入套接字。
- 新连接:接受新连接,设置为非阻塞模式,并使用
4. 线程池处理
- 在线程池中处理业务逻辑,如解析请求、处理数据、生成响应。
- 处理完成后,将响应数据写入对应的套接字。
5. 清理和关闭
- 关闭所有打开的文件描述符。
- 销毁线程池,释放资源。
3. 代码示例
#include <iostream>
#include <sys/epoll.h> // epoll相关函数和数据结构
#include <netinet/in.h> // sockaddr_in结构体和相关常量
#include <unistd.h> // POSIX操作系统API,包括read, write, close等
#include <fcntl.h> // 文件控制选项,如fcntl函数
#include <vector>
#include <thread> // C++11线程库
#include <functional> // std::function,用于存储可调用对象
#include <queue> // 队列容器
#include <mutex> // 互斥锁,用于线程同步
#include <condition_variable> // 条件变量,用于线程同步
// 设置文件描述符为非阻塞模式
int setNonBlocking(int fd) {
int flags = fcntl(fd, F_GETFL, 0); // 获取文件描述符的当前状态标志
return fcntl(fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞模式
}
// 线程池的简单实现
class ThreadPool {
public:
// 构造函数,初始化线程池
ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; ++i) {
// 创建工作线程
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex); // 加锁
// 等待条件变量,直到有任务或停止标志为true
condition.wait(lock, [this] { return stop || !tasks.empty(); });
if (stop && tasks.empty()) return; // 如果停止并且任务队列为空,则退出
task = std::move(tasks.front()); // 从任务队列取出任务
tasks.pop(); // 移除任务
}
task(); // 执行任务
}
});
}
}
// 将任务加入任务队列
template<class F>
void enqueue(F&& task) {
{
std::unique_lock<std::mutex> lock(mutex); // 加锁
tasks.push(std::forward<F>(task)); // 将任务加入队列
}
condition.notify_one(); // 通知一个等待线程
}
// 析构函数,清理线程池
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex); // 加锁
stop = true; // 设置停止标志
}
condition.notify_all(); // 通知所有等待线程
for (std::thread &worker : workers) {
worker.join(); // 等待所有线程完成
}
}
private:
std::vector<std::thread> workers; // 工作线程
std::queue<std::function<void()>> tasks; // 任务队列
std::mutex mutex; // 互斥锁
std::condition_variable condition; // 条件变量
bool stop = false; // 停止标志
};
// 服务器主逻辑
void runServer(int port) {
int serverFd = socket(AF_INET, SOCK_STREAM, 0); // 创建套接字
setNonBlocking(serverFd); // 设置非阻塞
sockaddr_in serverAddr; // 服务器地址结构体
serverAddr.sin_family = AF_INET; // 地址族
serverAddr.sin_addr.s_addr = INADDR_ANY; // 监听所有接口
serverAddr.sin_port = htons(port); // 端口号,网络字节序
bind(serverFd, (sockaddr*)&serverAddr, sizeof(serverAddr)); // 绑定地址
listen(serverFd, SOMAXCONN); // 监听,最大连接数为系统默认
int epollFd = epoll_create1(0); // 创建epoll实例
epoll_event event; // epoll事件结构体
event.data.fd = serverFd; // 设置事件关联的文件描述符
event.events = EPOLLIN; // 关注可读事件
epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event); // 注册事件
ThreadPool pool(4); // 创建线程池,包含4个线程
std::vector<epoll_event> events(1024); // 用于存储epoll_wait返回的事件
while (true) {
int n = epoll_wait(epollFd, events.data(), events.size(), -1); // 等待事件
for (int i = 0; i < n; ++i) {
if (events[i].data.fd == serverFd) {
// 处理新连接
int clientFd = accept(serverFd, nullptr, nullptr); // 接受新连接
setNonBlocking(clientFd); // 设置非阻塞
event.data.fd = clientFd; // 设置事件关联的文件描述符
event.events = EPOLLIN; // 关注可读事件
epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event); // 注册事件
} else {
// 处理已有连接的数据
int clientFd = events[i].data.fd;
pool.enqueue([clientFd] { // 将任务加入线程池
char buffer[1024];
int bytesRead = read(clientFd, buffer, sizeof(buffer)); // 读取数据
if (bytesRead > 0) {
// 处理读取的数据
std::cout << "Received: " << std::string(buffer, bytesRead) << std::endl;
// 发送响应
write(clientFd, buffer, bytesRead); // 回显收到的数据
} else {
// 关闭连接
close(clientFd); // 关闭客户端连接
}
});
}
}
}
}
int main() {
runServer(8080); // 启动服务器,监听8080端口
return 0;
}