TinyWebSever源码逐行注释(三)_ thread_pool.cpp
前言
项目源码地址
项目详细介绍
项目简介:
Linux下C++轻量级Web服务器,助力初学者快速实践网络编程,搭建属于自己的服务器.
- 使用 线程池 + 非阻塞socket + epoll(ET和LT均实现) + 事件处理(Reactor和模拟Proactor均实现) 的并发模型
- 使用状态机解析HTTP请求报文,支持解析GET和POST请求
- 访问服务器数据库实现web端用户注册、登录功能,可以请求服务器图片和视频文件
- 实现同步/异步日志系统,记录服务器运行状态
- 经Webbench压力测试可以实现上万的并发连接数据交换
thread_pool.cpp用于配置web服务器的线程池,使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。主要内容如下:
- 同步I/O模拟proactor模式
- 半同步/半反应堆
- 线程池
原项目地址的注释较少不适合初学者,于是我将每行都加上了注释帮助大家更好的理解:
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h" // 包含自定义的锁机制(互斥锁和信号量)
#include "../CGImysql/sql_connection_pool.h" // 数据库连接池类,用于处理数据库连接
// 定义一个线程池类,T是模板类型,表示任务的类型
template <typename T>
class threadpool
{
public:
/*
* 构造函数,初始化线程池
* actor_model 表示工作模式,connPool 是数据库连接池,thread_number 是线程池中的线程数,
* max_request 是最大允许的请求队列长度
*/
threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);
// 析构函数,释放线程池资源
~threadpool();
// 将新的请求任务添加到队列中,附带状态
bool append(T *request, int state);
// 将新的请求任务添加到队列中,不附带状态
bool append_p(T *request);
private:
/* 工作线程运行的函数,它会不断从工作队列中取任务执行 */
static void *worker(void *arg);
// 实际处理任务的函数,从任务队列中取出任务并处理
void run();
private:
int m_thread_number; // 线程池中的线程数
int m_max_requests; // 请求队列中允许的最大请求数
pthread_t *m_threads; // 描述线程池的数组,其大小为 m_thread_number
std::list<T *> m_workqueue; // 请求队列,用于存储需要处理的任务
locker m_queuelocker; // 保护请求队列的互斥锁,避免多线程同时访问时产生竞态条件
sem m_queuestat; // 信号量,表示是否有任务需要处理
connection_pool *m_connPool; // 数据库连接池,用于任务处理时的数据库操作
int m_actor_model; // 模型切换,表示不同的处理模式
};
// 构造函数,初始化线程池
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool *connPool, int thread_number, int max_requests)
: m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests),
m_threads(NULL), m_connPool(connPool)
{
// 如果线程数或请求数不合法,抛出异常
if (thread_number <= 0 || max_requests <= 0)
throw std::exception();
// 动态分配线程数组
m_threads = new pthread_t[m_thread_number];
if (!m_threads)
throw std::exception();
// 循环创建线程,每个线程调用 worker 函数来执行任务
for (int i = 0; i < thread_number; ++i)
{
// 创建线程,worker 是线程的入口函数
if (pthread_create(m_threads + i, NULL, worker, this) != 0)
{
delete[] m_threads; // 创建线程失败,清理已分配的资源
throw std::exception();
}
// 线程分离模式,线程结束后自动释放资源
if (pthread_detach(m_threads[i]))
{
delete[] m_threads;
throw std::exception();
}
}
}
// 析构函数,释放线程数组的资源
template <typename T>
threadpool<T>::~threadpool()
{
delete[] m_threads;
}
// 将请求任务加入请求队列,并指定任务的状态(读或写)
template <typename T>
bool threadpool<T>::append(T *request, int state)
{
// 加锁,确保对队列的操作是线程安全的
m_queuelocker.lock();
// 如果请求队列已满,解锁并返回 false
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
// 设置请求的状态
request->m_state = state;
// 将请求添加到队列的末尾
m_workqueue.push_back(request);
// 解锁
m_queuelocker.unlock();
// 通知有新任务要处理
m_queuestat.post();
return true;
}
// 将请求任务加入请求队列,不指定状态
template <typename T>
bool threadpool<T>::append_p(T *request)
{
// 加锁,确保对队列的操作是线程安全的
m_queuelocker.lock();
// 如果请求队列已满,解锁并返回 false
if (m_workqueue.size() >= m_max_requests)
{
m_queuelocker.unlock();
return false;
}
// 将请求添加到队列的末尾
m_workqueue.push_back(request);
// 解锁
m_queuelocker.unlock();
// 通知有新任务要处理
m_queuestat.post();
return true;
}
// 线程入口函数,线程从任务队列中取任务处理
template <typename T>
void *threadpool<T>::worker(void *arg)
{
// 将传入的参数转换为线程池对象
threadpool *pool = (threadpool *)arg;
// 调用线程池的 run 函数,执行任务
pool->run();
return pool;
}
// 处理任务的主函数,循环从请求队列中取任务并处理
template <typename T>
void threadpool<T>::run()
{
while (true)
{
// 等待有任务到来,信号量阻塞线程
m_queuestat.wait();
// 加锁以安全访问请求队列
m_queuelocker.lock();
// 如果队列为空,解锁并继续等待
if (m_workqueue.empty())
{
m_queuelocker.unlock();
continue;
}
// 从队列头取出一个请求
T *request = m_workqueue.front();
// 移除取出的请求
m_workqueue.pop_front();
// 解锁
m_queuelocker.unlock();
// 如果请求为空,继续处理下一个请求
if (!request)
continue;
// 根据不同的 actor 模型处理请求
if (1 == m_actor_model)
{
// 读操作
if (0 == request->m_state)
{
// 调用 read_once 尝试读取数据
if (request->read_once())
{
// 数据读取成功,标记为需要进一步处理
request->improv = 1;
// 使用数据库连接池处理数据库相关任务
connectionRAII mysqlcon(&request->mysql, m_connPool);
// 处理请求
request->process();
}
else
{
// 数据读取失败,设置定时器标志
request->improv = 1;
request->timer_flag = 1;
}
}
else
{
// 写操作
if (request->write())
{
// 数据写入成功,标记为处理完成
request->improv = 1;
}
else
{
// 数据写入失败,设置定时器标志
request->improv = 1;
request->timer_flag = 1;
}
}
}
else
{
// 如果使用另一种模型,不区分读写,直接处理
connectionRAII mysqlcon(&request->mysql, m_connPool);
request->process();
}
}
}
#endif