当前位置: 首页 > article >正文

线程池实现服务端

线程池实现服务端

线程池的实现

文章目录

  • 线程池实现服务端
    • 实现思路
    • 通信任务函数
    • 处理客户端请求任务函数
    • 创建线程池
    • 完整实现
    • 线程池的实现

实现思路

将服务端的任务抽象为两个任务,即接收客户端请求,和客户端通信;这两个任务就作为线程池中任务队列的元素。我们需要做的就是:

  • socket:创建监听的文件描述符(socket) fd;

  • bind:fd 和自身的 ip 和端口绑定;

  • listen:为当前文件描述符设置监听,主要是设置客户端连接数量;

  • 创建线程池;

  • 向任务队列中添加处理客户端请求的任务;

  • 当收到客户端请求并建立连接后,向任务队列中添加与客户端通信的任务;

以上为主要思路;至于线程的创建销毁,线程池都会自动完成,我们只需要关心以上几步即可;前面的思路在前面的文章中已经讲解,因此此处重点阐述后三步;

其中任务即函数,需要定义两个函数,一个用于接收请求,一个和客户端通信;重点需要考虑的即为这两个函数的参数,在下面将详细介绍。

本文为了合乎逻辑的地推算出需要哪些函数参数,因此讲解可能与执行顺序不一致;

通信任务函数

用于通信的函数需要的参数为 用于通信的文件描述符,客户端的 addr;而在添加任务时,任务函数的参数为 void*,因此将两个参数打包为一个结构体:

typedef struct {
    int fd;
    struct sockaddr_in addr;
} SockInfo;

工作函数得到这些信息后即可进行通信:

void working(void* arg) {
    SockInfo* sockInfo = (SockInfo*)arg;
    // 开始通信
    while (1) {
        char buff[1024];
        int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);
        if (ret > 0) {
            printf("客户端说:%s\n", buff);
            send(sockInfo->fd, buff, strlen(buff) + 1, 0);
        } else if (ret == 0) {
            printf("客户端已断开连接\n");
            // 初始化,表示弃用该通信套接字
            sockInfo->fd = -1;
            break;
        } else {
            perror("read fail...\n");
            sockInfo->fd = -1;
            break;
        }
    }
    close(sockInfo->fd);
}

处理客户端请求任务函数

该函数需要的参数为 用于监听的文件描述符,线程池(因为在接收请求后需要向任务队列添加任务),将两个变量进行打包:

typedef struct 
{
    int fd;
    ThreadPool* pool;
} PoolInfo;

在获取以上信息后,即可监听客户端,当收到请求后,会创建连接并得到通信文件描述符以及客户端的地址;由于这两个信息是通信任务函数所需要的,因此定义一个结构体,将这些信息存放进去;然后添加通信任务到任务队列:

void acceptConn(void* arg) {
    PoolInfo* poolInfo = (PoolInfo*)arg;
    // 4. 阻塞等待客户端连接
    while (1) {
        // 创建子线程
        SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));
        int len = sizeof(struct sockaddr);
        pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);
        if (pInfo->fd == -1) {
            perror("accept fail...\n");
            exit(0);
        }
        printf("建立通信,文件描述符为 %d\n", pInfo->fd);
        // 添加通信任务
        threadPoolAdd(poolInfo->pool, working, pInfo);
    }
    close(poolInfo->fd);
}

创建线程池

该任务在主线程中完成,在完成 1-3 步后,我们需要将任务添加到任务队列,那么我们需要参数,在前面分析过,此处需要的参数为用于监听的文件描述符和线程池,定义该结构体,将任务添加到任务队列即可;在添加完成后,主线程退出,此后,线程池将接管所有子线程。

int main () {
    // 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCP
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd == -1) {
        perror("socket fail...!\n");
        exit(0);
    }
    // 2. socket 和本机ip 端口绑定
    struct sockaddr_in in_addr;
    in_addr.sin_family = AF_INET;
    // 端口号,短整型,主机转网络
    in_addr.sin_port = htons(9898);
    // 获取本机 ip 地址
    in_addr.sin_addr.s_addr = INADDR_ANY;
    int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));
    if (ret == -1) {
        perror("bind fail...\n");
        exit(0);
    }
    // 3. 监听客户端
    ret = listen(fd, 128);
    if (ret == -1) {
        perror("listen fail...\n");
        exit(0);
    }
    
    PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
    poolInfo->fd = fd;

    ThreadPool* pool = threadpoolCreate(3, 8, 100);
    poolInfo->pool = pool;
    threadPoolAdd(pool, acceptConn, poolInfo);

    pthread_exit(NULL);
    return 0;
}

27 行之前的代码不再赘述。

首先现在需要创建一个线程,即用于接收客户端请求的线程,任务函数为 acceptConn;该函数中需要的参数为用于通信的文件描述符 fd

PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
poolInfo->fd = fd;

ThreadPool* pool = threadpoolCreate(3, 8, 100);
poolInfo->pool = pool;
threadPoolAdd(pool, acceptConn, poolInfo);

pthread_exit(NULL);

完整实现

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <pthread.h>
#include "threadpool.h"

typedef struct {
    int fd;
    // pthread_t tid;
    struct sockaddr_in addr;
} SockInfo;

typedef struct 
{
    int fd;
    ThreadPool* pool;
} PoolInfo;



void working(void* arg) {
    SockInfo* sockInfo = (SockInfo*)arg;
    // 开始通信
    while (1) {
        char buff[1024];
        int ret = recv(sockInfo->fd, buff, sizeof(buff), 0);
        if (ret > 0) {
            printf("客户端说:%s\n", buff);
            send(sockInfo->fd, buff, strlen(buff) + 1, 0);
        } else if (ret == 0) {
            printf("客户端已断开连接\n");
            // 初始化,表示弃用该通信套接字
            sockInfo->fd = -1;
            break;
        } else {
            perror("read fail...\n");
            sockInfo->fd = -1;
            break;
        }
    }
    close(sockInfo->fd);
}

void acceptConn(void* arg) {
    PoolInfo* poolInfo = (PoolInfo*)arg;
    // 4. 阻塞等待客户端连接
    while (1) {
        // 创建子线程
        SockInfo* pInfo = (SockInfo*)malloc(sizeof(SockInfo));
        int len = sizeof(struct sockaddr);
        pInfo->fd = accept(poolInfo->fd, (struct sockaddr*)&pInfo->addr, &len);
        if (pInfo->fd == -1) {
            perror("accept fail...\n");
            exit(0);
        }
        printf("建立通信,文件描述符为 %d\n", pInfo->fd);
        // 添加通信任务
        threadPoolAdd(poolInfo->pool, working, pInfo);
    }
    close(poolInfo->fd);
}


int main () {
    // 1. 创建 socket 文件描述符,使用 ipv4 流式协议 TCP
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd == -1) {
        perror("socket fail...!\n");
        exit(0);
    }
    // 2. socket 和本机ip 端口绑定
    struct sockaddr_in in_addr;
    in_addr.sin_family = AF_INET;
    // 端口号,短整型,主机转网络
    in_addr.sin_port = htons(9898);
    // 获取本机 ip 地址
    in_addr.sin_addr.s_addr = INADDR_ANY;
    int ret = bind(fd, (struct sockaddr*)&in_addr, sizeof(in_addr));
    if (ret == -1) {
        perror("bind fail...\n");
        exit(0);
    }
    // 3. 监听客户端
    ret = listen(fd, 128);
    if (ret == -1) {
        perror("listen fail...\n");
        exit(0);
    }
    PoolInfo* poolInfo = (PoolInfo*)malloc(sizeof(PoolInfo));
    poolInfo->fd = fd;

    ThreadPool* pool = threadpoolCreate(3, 8, 100);
    poolInfo->pool = pool;
    threadPoolAdd(pool, acceptConn, poolInfo);

    pthread_exit(NULL);
    return 0;
}

线程池的实现

详见文章(该文章使用C++实现,本文所用的线程池是C语言,单思路一致)
threadpool.h

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
typedef struct ThreadPool ThreadPool;
// 创建线程池
ThreadPool* threadpoolCreate(int min, int max, int queueSize);

// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);

// 添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);

// 获取线程池中工作的线程数
int threadPoolBusyNum(ThreadPool* pool);

// 获取线程池中活着的线程数
int threadPoolAliveNum(ThreadPool* pool);



void* worker(void* arg);
void* manager(void* arg);

void threadExit(ThreadPool* pool);


#endif  // _THREADPOOL_H_

threadpool.c

#include "threadpool.h"
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>

// 每次添加的线程数
#define NUMBER 2

// 任务
typedef struct Task {
    void (*function)(void*);
    void* arg;
} Task;

// 线程池
typedef struct ThreadPool {
    //  任务队列,数组的每个元素是一个函数及其参数
    Task* taskQ;
    int queueCapacity;  // 容量
    int queueSize;       // 任务个数
    int queueFront;     // 队头
    int queueRear;      // 队尾

    // 线程
    pthread_t managerID;    // 管理者线程 ID
    pthread_t *threadIDs;   // 工作的线程 ID
    int minNum;             // 线程池最小线程数
    int maxNum;             // 线程池最大线程数
    int busyNum;            // 忙的线程数
    int liveNum;            // 存活的线程数
    int exitNum;            // 需要销毁的线程数
    pthread_mutex_t mutexPool;  // 锁整个线程池
    pthread_mutex_t mutexBusy;  // 锁 busyNum 变量
    pthread_cond_t notFull;     // 条件变量判断队列是否满了
    pthread_cond_t notEmpty;    // 任务队列是否为空

    int shutdown;               // 判断是否需要销毁线程池,销毁为 1,反之为 0
} ThreadPool;

ThreadPool *threadpoolCreate(int min, int max, int queueSize)
{
    // 创建线程池
    ThreadPool* pool = (ThreadPool*) malloc(sizeof(ThreadPool));
    do {
        if (pool == NULL) {
            printf("malloc threadpool fail ... \n");
            break;
        }
        
        // 创建工作线程 id 数组
        pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
        if (pool->threadIDs == NULL) {
            printf("malloc threadIDs fail ...\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);

        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min;   // 目前存活的线程数,初始化时将在后面进行创建
        pool->exitNum = 0;     // 需要退出的线程数,即管理线程计算后得出需要退出的线程数量


        // 初始化锁和条件变量
        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0) 
        {
            printf("mutex or condition init fail ...\n");
            break;
        }
        // 创建任务队列
        pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);
        if (pool->taskQ == NULL) {
            printf("malloc taskQ fail...\n");
            break;
        }
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;

        pool->shutdown = 0;

        // 创建管理者线程
        pthread_create(&pool->managerID, NULL, manager, pool);

        // 工作者线程
        for (int i = 0; i < min; i++) {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);
        }
        return pool;
    } while (0);

    if (pool && pool->taskQ) free(pool->taskQ);
    if (pool && pool->threadIDs) free(pool->threadIDs);
    if (pool) free(pool);
    return NULL;
}

int threadPoolDestroy(ThreadPool *pool)
{
    // 线程池为空
    if (pool == NULL) {
        return -1;
    }
    // 关闭线程池
    pool->shutdown = 1;

    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);

    // 唤醒消费者线程
    for (int i = 0; i < pool->liveNum; i++) {
        pthread_cond_signal(&pool->notEmpty);
    }
    // 释放堆内存
    if (pool->taskQ) {
        free(pool->taskQ);
    }
    if (pool->threadIDs) {
        free(pool->threadIDs);
    }
    // 销毁锁和条件变量
    pthread_mutex_destroy(&pool->mutexPool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);

    free(pool);
    pool = NULL;

    return 0;
}

// 任务添加
void threadPoolAdd(ThreadPool *pool, void (*func)(void*), void *arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    // 若任务队列满了则阻塞
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
        pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown) {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    // 任务添加后,即可唤醒消费者
    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

int threadPoolBusyNum(ThreadPool *pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}

int threadPoolAliveNum(ThreadPool *pool)
{
    pthread_mutex_lock(&pool->mutexPool);
    int liveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    return liveNum;
}

void *worker(void *arg)
{
    ThreadPool* pool = (ThreadPool*) arg;
    while (1) {
        pthread_mutex_lock(&pool->mutexPool);
        // 任务队列为空则工作线程阻塞
        while (pool->queueSize == 0 && !pool->shutdown) {
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);
            // 若线程唤醒后需要被销毁,则进行销毁,注意销毁时数量必须多于最小值
            if (pool->exitNum > 0) {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    // 销毁前需要释放锁
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }
        // 若线程池关闭,则销毁当前线程
        if (pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        // 以下为正常情况,即从任务队列中取出任务进行处理
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;
        // 取出任务后,头节点后移,此处为循环队列
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;

        // notFull 表示任务队列已满的条件变量,此处处理了一个任务队列
        // 即可唤醒生产任务的线程继续生产任务
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexPool);

        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        // 处理任务
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;
        printf("thread %ld end working...\n", pthread_self());

        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void *manager(void *arg)
{
    ThreadPool* pool = (ThreadPool*) arg;
    // 若线程池不关闭,开始循环
    while (!pool->shutdown) {
        sleep(3);
        // 读取线程池信息,作为管理的依据
        pthread_mutex_lock(&pool->mutexPool);
        // 任务个数
        int queueSize = pool->queueSize;
        // 存活的线程数
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        pthread_mutex_lock(&pool->mutexBusy);
        // 正在忙的线程数
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);
        // 现在得到了任务数,正在忙的线程,以及目前总线程数
        // 管理者线程将根据这三个数据对线程进行管理

        // 若任务个数 > 存活线程个数 && 存活线程数 < 最大线程数 则添加线程
        if (queueSize > (liveNum - busyNum) && liveNum < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexPool);
            int count = 0;
            for (int i = 0; i < pool->maxNum && 
                            count < NUMBER && 
                            liveNum < pool->maxNum; i++)
            {
                if (pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    count++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }

        // 忙线程 * 2 < 存活的线程数 && 存活线程数 > 最小线程数 需要销毁多余的线程
        if (pool->busyNum * 2 < pool->liveNum && pool->liveNum > pool->minNum) {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            // 唤醒阻塞的线程,让其自动销毁
            for (int i = 0; i < NUMBER; i++) {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

// 销毁当前线程,并在线程池中注销
void threadExit(ThreadPool *pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->maxNum; i++) {
        if (pool->threadIDs[i] == tid) {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

http://www.kler.cn/a/297160.html

相关文章:

  • Linux Runtime PM(运行时电源管理)框架API
  • 97.游戏的启动与多开-共享内存多开检测
  • 九,自定义转换器详细操作(附+详细源码解析)
  • c语言--力扣简单题目(移除链表元素)讲解
  • UE4_后期处理_后期处理材质及后期处理体积一
  • 从Milvus迁移DashVector
  • 久久派安装启用USB摄像头(基于mjpg-streamer)
  • TCP 拥塞控制
  • 本地服务器部署Text generation并添加code llama实现远程多人协作
  • C#复习之内部类和分布类
  • 2024年CCPC网络赛A题题解 —— 军训Ⅰ(gym105336A)
  • 无人机之报警器的工作原理
  • CH9114 USB转四串口替换FT4232H系列芯片
  • 基于Flink的流式计算可视化开发实践之配置->任务生成->任务部署过程
  • Flutter 开发常用第三方库总结
  • 十分钟学会Kubernetes(K8S) 部署SpringBoot3.0
  • 【数学建模经验贴】国赛拿到赛题后,该如何选题?
  • 如何在 Adobe Admin Console 中创建和管理 Marketo Engage 产品配置文件
  • 【自用19.3】C++构造函数复盘
  • go程序解说