当前位置: 首页 > article >正文

c++如何利用线程池和epool设计高并发服务器

设计一个高并发服务器需要有效地处理大量同时连接的客户端请求。结合线程池和epoll可以实现高效的I/O多路复用和任务并发处理。

1. 基本概念

  • 线程池:用于管理和重用线程,避免频繁创建和销毁线程带来的开销。
  • epoll:Linux下的高效I/O多路复用机制,适合处理大量并发连接。

2. 设计步骤

1. 初始化
  • 创建一个线程池,预先启动一定数量的线程以备使用。
  • 创建一个epoll实例,用于监控多个文件描述符上的I/O事件。
2. 监听和接受连接
  • 使用socket()创建服务器套接字,并使用bind()listen()绑定到特定的IP和端口。
  • 使用epoll_ctl()注册服务器套接字上的EPOLLIN事件,以便在有新连接时得到通知。
3. 事件循环
  • 使用epoll_wait()等待事件的发生。
  • 对于每个事件,根据其类型进行处理:
    • 新连接:接受新连接,设置为非阻塞模式,并使用epoll_ctl()注册到epoll实例中,关注EPOLLIN事件。
    • 可读事件:从套接字读取数据,并将读取任务提交到线程池进行处理。
    • 可写事件:将待发送的数据写入套接字。
4. 线程池处理
  • 在线程池中处理业务逻辑,如解析请求、处理数据、生成响应。
  • 处理完成后,将响应数据写入对应的套接字。
5. 清理和关闭
  • 关闭所有打开的文件描述符。
  • 销毁线程池,释放资源。

3. 代码示例

#include <iostream>              
#include <sys/epoll.h>            // epoll相关函数和数据结构
#include <netinet/in.h>           // sockaddr_in结构体和相关常量
#include <unistd.h>               // POSIX操作系统API,包括read, write, close等
#include <fcntl.h>                // 文件控制选项,如fcntl函数
#include <vector>                
#include <thread>                 // C++11线程库
#include <functional>             // std::function,用于存储可调用对象
#include <queue>                  // 队列容器
#include <mutex>                  // 互斥锁,用于线程同步
#include <condition_variable>     // 条件变量,用于线程同步

// 设置文件描述符为非阻塞模式
int setNonBlocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);          // 获取文件描述符的当前状态标志
    return fcntl(fd, F_SETFL, flags | O_NONBLOCK); // 设置为非阻塞模式
}

// 线程池的简单实现
class ThreadPool {
public:
    // 构造函数,初始化线程池
    ThreadPool(size_t numThreads) {
        for (size_t i = 0; i < numThreads; ++i) {
            // 创建工作线程
            workers.emplace_back([this] {
                while (true) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(mutex); // 加锁
                        // 等待条件变量,直到有任务或停止标志为true
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) return; // 如果停止并且任务队列为空,则退出
                        task = std::move(tasks.front()); // 从任务队列取出任务
                        tasks.pop(); // 移除任务
                    }
                    task(); // 执行任务
                }
            });
        }
    }

    // 将任务加入任务队列
    template<class F>
    void enqueue(F&& task) {
        {
            std::unique_lock<std::mutex> lock(mutex); // 加锁
            tasks.push(std::forward<F>(task)); // 将任务加入队列
        }
        condition.notify_one(); // 通知一个等待线程
    }

    // 析构函数,清理线程池
    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mutex); // 加锁
            stop = true; // 设置停止标志
        }
        condition.notify_all(); // 通知所有等待线程
        for (std::thread &worker : workers) {
            worker.join(); // 等待所有线程完成
        }
    }

private:
    std::vector<std::thread> workers; // 工作线程
    std::queue<std::function<void()>> tasks; // 任务队列
    std::mutex mutex; // 互斥锁
    std::condition_variable condition; // 条件变量
    bool stop = false; // 停止标志
};

// 服务器主逻辑
void runServer(int port) {
    int serverFd = socket(AF_INET, SOCK_STREAM, 0); // 创建套接字
    setNonBlocking(serverFd); // 设置非阻塞

    sockaddr_in serverAddr; // 服务器地址结构体
    serverAddr.sin_family = AF_INET; // 地址族
    serverAddr.sin_addr.s_addr = INADDR_ANY; // 监听所有接口
    serverAddr.sin_port = htons(port); // 端口号,网络字节序

    bind(serverFd, (sockaddr*)&serverAddr, sizeof(serverAddr)); // 绑定地址
    listen(serverFd, SOMAXCONN); // 监听,最大连接数为系统默认

    int epollFd = epoll_create1(0); // 创建epoll实例
    epoll_event event; // epoll事件结构体
    event.data.fd = serverFd; // 设置事件关联的文件描述符
    event.events = EPOLLIN; // 关注可读事件
    epoll_ctl(epollFd, EPOLL_CTL_ADD, serverFd, &event); // 注册事件

    ThreadPool pool(4); // 创建线程池,包含4个线程
    std::vector<epoll_event> events(1024); // 用于存储epoll_wait返回的事件

    while (true) {
        int n = epoll_wait(epollFd, events.data(), events.size(), -1); // 等待事件
        for (int i = 0; i < n; ++i) {
            if (events[i].data.fd == serverFd) {
                // 处理新连接
                int clientFd = accept(serverFd, nullptr, nullptr); // 接受新连接
                setNonBlocking(clientFd); // 设置非阻塞
                event.data.fd = clientFd; // 设置事件关联的文件描述符
                event.events = EPOLLIN; // 关注可读事件
                epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event); // 注册事件
            } else {
                // 处理已有连接的数据
                int clientFd = events[i].data.fd;
                pool.enqueue([clientFd] { // 将任务加入线程池
                    char buffer[1024];
                    int bytesRead = read(clientFd, buffer, sizeof(buffer)); // 读取数据
                    if (bytesRead > 0) {
                        // 处理读取的数据
                        std::cout << "Received: " << std::string(buffer, bytesRead) << std::endl;
                        // 发送响应
                        write(clientFd, buffer, bytesRead); // 回显收到的数据
                    } else {
                        // 关闭连接
                        close(clientFd); // 关闭客户端连接
                    }
                });
            }
        }
    }
}

int main() {
    runServer(8080); // 启动服务器,监听8080端口
    return 0;
}


http://www.kler.cn/a/590065.html

相关文章:

  • 高效手机检测:视觉分析技术的优势
  • 【css酷炫效果】纯CSS实现3D翻转卡片动画
  • Java 大视界 -- Java 大数据在智能教育虚拟实验室建设与实验数据分析中的应用(132)
  • LeRobot源码剖析——对机器人各个动作策略的统一封装:包含ALOHA ACT、Diffusion Policy、VLA模型π0
  • 【binlog2sql实践】MySQL数据库binlog日志ROW格式转换标准SQL
  • Linux 蓝牙音频软件栈实现分析
  • 美团Leaf分布式ID生成器:使用详解与核心原理解析
  • 关于虚拟网络编辑器还原默认设置那些坑
  • Pandas DataFrame:数据分析的利器
  • 解决从deepseek接口获取的流式响应输出到前端都是undefined的问题
  • 微服务架构中10个常用的设计模式
  • 平面阵列天线波束形成的Matlab仿真
  • 一场由 ES 分片 routing 引发的问题
  • 2025年Postman的五大替代工具
  • 【软考-架构】5.3、IPv6-网络规划-网络存储-补充考点
  • SpringData Redis:RedisTemplate配置与数据操作
  • 02 windows qt配置ffmpeg开发环境搭建
  • 电脑如何录屏
  • failed to load elasticsearch nodes
  • SpringBoot + Mybatis Plus 整合 Redis