线程池实现服务端
线程池实现服务端
线程池的实现
文章目录
- 线程池实现服务端
- 实现思路
- 通信任务函数
- 处理客户端请求任务函数
- 创建线程池
- 完整实现
- 线程池的实现
实现思路
将服务端的任务抽象为两个任务,即接收客户端请求,和客户端通信;这两个任务就作为线程池中任务队列的元素。我们需要做的就是:
-
socket:创建监听的文件描述符(socket) fd;
-
bind:fd 和自身的 ip 和端口绑定;
-
listen:为当前文件描述符设置监听,主要是设置客户端连接数量;
-
创建线程池;
-
向任务队列中添加处理客户端请求的任务;
-
当收到客户端请求并建立连接后,向任务队列中添加与客户端通信的任务;
以上为主要思路;至于线程的创建销毁,线程池都会自动完成,我们只需要关心以上几步即可;前面的思路在前面的文章中已经讲解,因此此处重点阐述后三步;
其中任务即函数,需要定义两个函数,一个用于接收请求,一个和客户端通信;重点需要考虑的即为这两个函数的参数,在下面将详细介绍。
本文为了合乎逻辑的地推算出需要哪些函数参数,因此讲解可能与执行顺序不一致;
通信任务函数
用于通信的函数需要的参数为 用于通信的文件描述符,客户端的 addr;而在添加任务时,任务函数的参数为 void*
,因此将两个参数打包为一个结构体:
typedef struct {
int fd;
struct sockaddr_in addr;
} SockInfo;
工作函数得到这些信息后即可进行通信:
void working(void* arg) {
SockInfo* sockInfo = (SockInfo*)arg;
// 开始通信
while (1) {
char buff[1024];
int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);
if (ret > 0) {
printf("客户端说:%s\n", buff);
send(sockInfo->fd, buff, strlen(buff) + 1, 0);
} else if (ret == 0) {
printf("客户端已断开连接\n");
// 初始化,表示弃用该通信套接字
sockInfo->fd = -1;
break;
} else {
perror("read fail...\n");
sockInfo->fd = -1;
break;
}
}
close(sockInfo->fd);
}
处理客户端请求任务函数
该函数需要的参数为 用于监听的文件描述符,线程池(因为在接收请求后需要向任务队列添加任务),将两个变量进行打包:
typedef struct
{
int fd;
ThreadPool* pool;
} PoolInfo;
在获取以上信息后,即可监听客户端,当收到请求后,会创建连接并得到通信文件描述符以及客户端的地址;由于这两个信息是通信任务函数所需要的,因此定义一个结构体,将这些信息存放进去;然后添加通信任务到任务队列:
void acceptConn(void* arg) {
PoolInfo* poolInfo = (PoolInfo*)arg;
// 4. 阻塞等待客户端连接
while (1) {
// 创建子线程
SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));
int len = sizeof(struct sockaddr);
pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);
if (pInfo->fd == -1) {
perror("accept fail...\n");
exit(0);
}
printf("建立通信,文件描述符为 %d\n", pInfo->fd);
// 添加通信任务
threadPoolAdd(poolInfo->pool, working, pInfo);
}
close(poolInfo->fd);
}
创建线程池
该任务在主线程中完成,在完成 1-3 步后,我们需要将任务添加到任务队列,那么我们需要参数,在前面分析过,此处需要的参数为用于监听的文件描述符和线程池,定义该结构体,将任务添加到任务队列即可;在添加完成后,主线程退出,此后,线程池将接管所有子线程。
int main () {
// 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCP
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket fail...!\n");
exit(0);
}
// 2. socket 和本机ip 端口绑定
struct sockaddr_in in_addr;
in_addr.sin_family = AF_INET;
// 端口号,短整型,主机转网络
in_addr.sin_port = htons(9898);
// 获取本机 ip 地址
in_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));
if (ret == -1) {
perror("bind fail...\n");
exit(0);
}
// 3. 监听客户端
ret = listen(fd, 128);
if (ret == -1) {
perror("listen fail...\n");
exit(0);
}
PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
poolInfo->fd = fd;
ThreadPool* pool = threadpoolCreate(3, 8, 100);
poolInfo->pool = pool;
threadPoolAdd(pool, acceptConn, poolInfo);
pthread_exit(NULL);
return 0;
}
27 行之前的代码不再赘述。
首先现在需要创建一个线程,即用于接收客户端请求的线程,任务函数为 acceptConn
;该函数中需要的参数为用于通信的文件描述符 fd
,
PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
poolInfo->fd = fd;
ThreadPool* pool = threadpoolCreate(3, 8, 100);
poolInfo->pool = pool;
threadPoolAdd(pool, acceptConn, poolInfo);
pthread_exit(NULL);
完整实现
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "threadpool.h"
typedef struct {
int fd;
// pthread_t tid;
struct sockaddr_in addr;
} SockInfo;
typedef struct
{
int fd;
ThreadPool* pool;
} PoolInfo;
void working(void* arg) {
SockInfo* sockInfo = (SockInfo*)arg;
// 开始通信
while (1) {
char buff[1024];
int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);
if (ret > 0) {
printf("客户端说:%s\n", buff);
send(sockInfo->fd, buff, strlen(buff) + 1, 0);
} else if (ret == 0) {
printf("客户端已断开连接\n");
// 初始化,表示弃用该通信套接字
sockInfo->fd = -1;
break;
} else {
perror("read fail...\n");
sockInfo->fd = -1;
break;
}
}
close(sockInfo->fd);
}
void acceptConn(void* arg) {
PoolInfo* poolInfo = (PoolInfo*)arg;
// 4. 阻塞等待客户端连接
while (1) {
// 创建子线程
SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));
int len = sizeof(struct sockaddr);
pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);
if (pInfo->fd == -1) {
perror("accept fail...\n");
exit(0);
}
printf("建立通信,文件描述符为 %d\n", pInfo->fd);
// 添加通信任务
threadPoolAdd(poolInfo->pool, working, pInfo);
}
close(poolInfo->fd);
}
int main () {
// 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCP
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1) {
perror("socket fail...!\n");
exit(0);
}
// 2. socket 和本机ip 端口绑定
struct sockaddr_in in_addr;
in_addr.sin_family = AF_INET;
// 端口号,短整型,主机转网络
in_addr.sin_port = htons(9898);
// 获取本机 ip 地址
in_addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));
if (ret == -1) {
perror("bind fail...\n");
exit(0);
}
// 3. 监听客户端
ret = listen(fd, 128);
if (ret == -1) {
perror("listen fail...\n");
exit(0);
}
PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
poolInfo->fd = fd;
ThreadPool* pool = threadpoolCreate(3, 8, 100);
poolInfo->pool = pool;
threadPoolAdd(pool, acceptConn, poolInfo);
pthread_exit(NULL);
return 0;
}
线程池的实现
详见文章(该文章使用C++实现,本文所用的线程池是C语言,单思路一致)
threadpool.h
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
typedef struct ThreadPool ThreadPool;
// 创建线程池
ThreadPool* threadpoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程数
int threadPoolAliveNum(ThreadPool* pool);
void* worker(void* arg);
void* manager(void* arg);
void threadExit(ThreadPool* pool);
#endif // _THREADPOOL_H_
threadpool.c
#include "threadpool.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
// 每次添加的线程数
#define NUMBER 2
// 任务
typedef struct Task {
void (*function)(void*);
void* arg;
} Task;
// 线程池
typedef struct ThreadPool {
// 任务队列,数组的每个元素是一个函数及其参数
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 任务个数
int queueFront; // 队头
int queueRear; // 队尾
// 线程
pthread_t managerID; // 管理者线程 ID
pthread_t *threadIDs; // 工作的线程 ID
int minNum; // 线程池最小线程数
int maxNum; // 线程池最大线程数
int busyNum; // 忙的线程数
int liveNum; // 存活的线程数
int exitNum; // 需要销毁的线程数
pthread_mutex_t mutexPool; // 锁整个线程池
pthread_mutex_t mutexBusy; // 锁 busyNum 变量
pthread_cond_t notFull; // 条件变量判断队列是否满了
pthread_cond_t notEmpty; // 任务队列是否为空
int shutdown; // 判断是否需要销毁线程池,销毁为 1,反之为 0
} ThreadPool;
ThreadPool *threadpoolCreate(int min, int max, int queueSize)
{
// 创建线程池
ThreadPool* pool = (ThreadPool*) malloc(sizeof(ThreadPool));
do {
if (pool == NULL) {
printf("malloc threadpool fail ... \n");
break;
}
// 创建工作线程 id 数组
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL) {
printf("malloc threadIDs fail ...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 目前存活的线程数,初始化时将在后面进行创建
pool->exitNum = 0; // 需要退出的线程数,即管理线程计算后得出需要退出的线程数量
// 初始化锁和条件变量
if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
pthread_cond_init(&pool->notFull, NULL) != 0 ||
pthread_cond_init(&pool->notEmpty, NULL) != 0)
{
printf("mutex or condition init fail ...\n");
break;
}
// 创建任务队列
pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
if (pool->taskQ == NULL) {
printf("malloc taskQ fail...\n");
break;
}
pool->queueCapacity = queueSize;
pool->queueSize = 0;
pool->queueFront = 0;
pool->queueRear = 0;
pool->shutdown = 0;
// 创建管理者线程
pthread_create(&pool->managerID, NULL, manager, pool);
// 工作者线程
for (int i = 0; i < min; i++) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
}
return pool;
} while (0);
if (pool && pool->taskQ) free(pool->taskQ);
if (pool && pool->threadIDs) free(pool->threadIDs);
if (pool) free(pool);
return NULL;
}
int threadPoolDestroy(ThreadPool *pool)
{
// 线程池为空
if (pool == NULL) {
return -1;
}
// 关闭线程池
pool->shutdown = 1;
// 阻塞回收管理者线程
pthread_join(pool->managerID, NULL);
// 唤醒消费者线程
for (int i = 0; i < pool->liveNum; i++) {
pthread_cond_signal(&pool->notEmpty);
}
// 释放堆内存
if (pool->taskQ) {
free(pool->taskQ);
}
if (pool->threadIDs) {
free(pool->threadIDs);
}
// 销毁锁和条件变量
pthread_mutex_destroy(&pool->mutexPool);
pthread_mutex_destroy(&pool->mutexBusy);
pthread_cond_destroy(&pool->notEmpty);
pthread_cond_destroy(&pool->notFull);
free(pool);
pool = NULL;
return 0;
}
// 任务添加
void threadPoolAdd(ThreadPool *pool, void (*func)(void*), void *arg)
{
pthread_mutex_lock(&pool->mutexPool);
// 若任务队列满了则阻塞
while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
pthread_cond_wait(&pool->notFull, &pool->mutexPool);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
return;
}
pool->taskQ[pool->queueRear].function = func;
pool->taskQ[pool->queueRear].arg = arg;
pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
pool->queueSize++;
// 任务添加后,即可唤醒消费者
pthread_cond_signal(&pool->notEmpty);
pthread_mutex_unlock(&pool->mutexPool);
}
int threadPoolBusyNum(ThreadPool *pool)
{
pthread_mutex_lock(&pool->mutexBusy);
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
return busyNum;
}
int threadPoolAliveNum(ThreadPool *pool)
{
pthread_mutex_lock(&pool->mutexPool);
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
return liveNum;
}
void *worker(void *arg)
{
ThreadPool* pool = (ThreadPool*) arg;
while (1) {
pthread_mutex_lock(&pool->mutexPool);
// 任务队列为空则工作线程阻塞
while (pool->queueSize == 0 && !pool->shutdown) {
pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
// 若线程唤醒后需要被销毁,则进行销毁,注意销毁时数量必须多于最小值
if (pool->exitNum > 0) {
pool->exitNum--;
if (pool->liveNum > pool->minNum) {
pool->liveNum--;
// 销毁前需要释放锁
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
}
}
// 若线程池关闭,则销毁当前线程
if (pool->shutdown) {
pthread_mutex_unlock(&pool->mutexPool);
threadExit(pool);
}
// 以下为正常情况,即从任务队列中取出任务进行处理
Task task;
task.function = pool->taskQ[pool->queueFront].function;
task.arg = pool->taskQ[pool->queueFront].arg;
// 取出任务后,头节点后移,此处为循环队列
pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
pool->queueSize--;
// notFull 表示任务队列已满的条件变量,此处处理了一个任务队列
// 即可唤醒生产任务的线程继续生产任务
pthread_cond_signal(&pool->notFull);
pthread_mutex_unlock(&pool->mutexPool);
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum++;
pthread_mutex_unlock(&pool->mutexBusy);
// 处理任务
task.function(task.arg);
free(task.arg);
task.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&pool->mutexBusy);
pool->busyNum--;
pthread_mutex_unlock(&pool->mutexBusy);
}
return NULL;
}
void *manager(void *arg)
{
ThreadPool* pool = (ThreadPool*) arg;
// 若线程池不关闭,开始循环
while (!pool->shutdown) {
sleep(3);
// 读取线程池信息,作为管理的依据
pthread_mutex_lock(&pool->mutexPool);
// 任务个数
int queueSize = pool->queueSize;
// 存活的线程数
int liveNum = pool->liveNum;
pthread_mutex_unlock(&pool->mutexPool);
pthread_mutex_lock(&pool->mutexBusy);
// 正在忙的线程数
int busyNum = pool->busyNum;
pthread_mutex_unlock(&pool->mutexBusy);
// 现在得到了任务数,正在忙的线程,以及目前总线程数
// 管理者线程将根据这三个数据对线程进行管理
// 若任务个数 > 存活线程个数 && 存活线程数 < 最大线程数 则添加线程
if (queueSize > (liveNum - busyNum) && liveNum < pool->maxNum) {
pthread_mutex_lock(&pool->mutexPool);
int count = 0;
for (int i = 0; i < pool->maxNum &&
count < NUMBER &&
liveNum < pool->maxNum; i++)
{
if (pool->threadIDs[i] == 0) {
pthread_create(&pool->threadIDs[i], NULL, worker, pool);
count++;
pool->liveNum++;
}
}
pthread_mutex_unlock(&pool->mutexPool);
}
// 忙线程 * 2 < 存活的线程数 && 存活线程数 > 最小线程数 需要销毁多余的线程
if (pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) {
pthread_mutex_lock(&pool->mutexPool);
pool->exitNum = NUMBER;
pthread_mutex_unlock(&pool->mutexPool);
// 唤醒阻塞的线程,让其自动销毁
for (int i = 0; i < NUMBER; i++) {
pthread_cond_signal(&pool->notEmpty);
}
}
}
return NULL;
}
// 销毁当前线程,并在线程池中注销
void threadExit(ThreadPool *pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->maxNum; i++) {
if (pool->threadIDs[i] == tid) {
pool->threadIDs[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}