网络编程(tcp线程池)
1.有一个水果盘,一次只能放一个水果,妈妈买苹果或者香蕉,让儿子和女儿吃水果。儿子只吃苹果,女儿只吃香蕉。请设计代码实现这个过程。
提示:创建3个信号量,分别用于控制妈妈,儿子,女儿线程对果盘的访问,妈妈线程先运行,当妈妈放置苹果时给儿子线程的信号量V操作,
当放置香蕉时候给女儿线程的信号量V操作
main.c
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
sem_t sem_mom; // 控制妈妈访问果盘的信号量
sem_t sem_son; // 控制儿子访问果盘的信号量
sem_t sem_daughter; // 控制女儿访问果盘的信号量
void *mom(void *arg)
{
while (1)
{
sem_wait(&sem_mom); // 等待果盘空闲
// 随机放置苹果或香蕉
int fruit = rand() % 2;
if (fruit == 0)
{
printf("妈妈放了一个苹果。\n");
sem_post(&sem_son); // 唤醒儿子
}
else
{
printf("妈妈放了一个香蕉。\n");
sem_post(&sem_daughter); // 唤醒女儿
}
sleep(1); // 模拟放置水果的耗时
}
}
void *son(void *arg)
{
while (1)
{
sem_wait(&sem_son); // 等待苹果
printf("儿子吃了一个苹果。\n");
sem_post(&sem_mom); // 唤醒妈妈
sleep(1); // 模拟吃水果的耗时
}
}
void *daughter(void *arg)
{
while (1)
{
sem_wait(&sem_daughter); // 等待香蕉
printf("女儿吃了一个香蕉。\n");
sem_post(&sem_mom); // 唤醒妈妈
sleep(1); // 模拟吃水果的耗时
}
}
int main()
{
srand(time(NULL)); // 初始化随机种子
// 初始化信号量
sem_init(&sem_mom, 0, 1);
sem_init(&sem_son, 0, 0);
sem_init(&sem_daughter, 0, 0);
// 创建线程
pthread_t tid_mom, tid_son, tid_daughter;
pthread_create(&tid_mom, NULL, mom, NULL);
pthread_create(&tid_son, NULL, son, NULL);
pthread_create(&tid_daughter, NULL, daughter, NULL);
// 等待线程结束(实际上不会结束)
pthread_join(tid_mom, NULL);
pthread_join(tid_son, NULL);
pthread_join(tid_daughter, NULL);
// 销毁信号量
sem_destroy(&sem_mom);
sem_destroy(&sem_son);
sem_destroy(&sem_daughter);
return 0;
}
信号量的作用
sem_mom:
初始值为 1,表示果盘初始为空,妈妈可以放置水果。
每当儿子或女儿吃完水果后,会释放 sem_mom,通知妈妈可以继续放置水果。
sem_son:
初始值为 0,表示初始时没有苹果。
当妈妈放置苹果后,会释放 sem_son,通知儿子可以吃苹果。
sem_daughter:
初始值为 0,表示初始时没有香蕉。
当妈妈放置香蕉后,会释放 sem_daughter,通知女儿可以吃香蕉。
线程的工作流程
妈妈线程 (mom 函数)
等待果盘空闲:
通过 sem_wait(&sem_mom) 等待果盘空闲(sem_mom 的值大于 0)。
随机放置水果:
使用 rand() % 2 随机选择放置苹果或香蕉。
如果放置苹果,通过 sem_post(&sem_son) 通知儿子。
如果放置香蕉,通过 sem_post(&sem_daughter) 通知女儿。
模拟耗时:
使用 sleep(1) 模拟放置水果的耗时。
循环执行:
无限循环,持续放置水果。
儿子线程 (son 函数)
等待苹果:
通过 sem_wait(&sem_son) 等待苹果(sem_son 的值大于 0)。
吃苹果:
打印日志 “儿子吃了一个苹果。”。
通过 sem_post(&sem_mom) 通知妈妈果盘已空,可以继续放置水果。
模拟耗时:
使用 sleep(1) 模拟吃水果的耗时。
循环执行:
无限循环,持续吃苹果。
女儿线程 (daughter 函数)
等待香蕉:
通过 sem_wait(&sem_daughter) 等待香蕉(sem_daughter 的值大于 0)。
吃香蕉:
打印日志 “女儿吃了一个香蕉。”。
通过 sem_post(&sem_mom) 通知妈妈果盘已空,可以继续放置水果。
模拟耗时:
使用 sleep(1) 模拟吃水果的耗时。
循环执行:
无限循环,持续吃香蕉。
程序的整体流程
初始化:
初始化随机数种子 srand(time(NULL))。
初始化信号量:sem_mom = 1,sem_son = 0,sem_daughter = 0。
创建线程:
创建妈妈、儿子、女儿三个线程,分别执行 mom、son、daughter 函数。
线程运行:
妈妈线程随机放置水果,并通知对应的儿子或女儿线程。
儿子或女儿线程吃完水果后,通知妈妈线程继续放置水果。
通过信号量的同步机制,确保果盘的状态一致。
等待线程结束:
使用 pthread_join 等待线程结束(实际是无限循环,不会结束)。
释放资源:
销毁信号量 sem_destroy。
代码的优点
线程同步:
使用信号量实现了线程之间的同步,确保生产者和消费者之间的操作有序。
随机性:
妈妈放置水果的类型是随机的,增加了程序的动态性。
模拟耗时:
使用 sleep(1) 模拟操作耗时,使程序更贴近真实场景。
代码的改进建议
线程退出机制:
当前代码是无限循环,无法正常退出。可以添加一个退出条件(如按特定键退出),并在退出时优雅地终止线程。
果盘容量限制:
当前果盘只能容纳一个水果。可以通过增加信号量或缓冲区来支持多个水果的放置。
日志优化:
可以添加时间戳或线程 ID 到日志中,方便调试和观察线程行为。
错误处理:
在执行信号量操作时,可以添加错误检查,确保程序健壮性。
2.任一并发服务器设计代码实现一远程登录,注册的功能:
要求:
2.1. 客户端发送请求类型(登录/注册) 及用户名和密码,服务器作出响应:
如果客户端发送登录请求,则服务器需要读取保存了已注册用户的文件(user.dat),比对此刻客户端发送的用户名和密码是否
是
文件中已记录的数据,并发送登录成功或失败的信息给客户端
如果客户端发送注册请求,服务器将客户端发送的用户名和密码数据写入文件(user.dat)
2.2. 客户端交互界面设计如下
=======================
1. 用户注册
2. 用户登录
0. 退出
=======================
当客户端注册成功后可选择登录操作,登录成功会退出进程,否则一直停留在本界面,继续登录,或选择退出,结束进程
提示: 网络数据可借助结构体类型传递,例如
typedef struct
{
char user[10];
int16_t request;
char pass[10];
}__attribute__((packed)) param_t;
当服务器接收到网络数据后,可解析数据,根据 request 成员获知客户端的请求类型。并完成对客户端的响应。
服务端
header.h
#ifndef __HEADER_H
#define __HEADER_H
// 常用头文件
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
// 网络编程涉及的头文件
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/ip.h>
#include <sys/types.h>
// 本机字节序和网络字节序转换相关函数的头文件
#include <arpa/inet.h>
// 关闭套接字用close函数需要的头文件
#include <unistd.h>
//线程相关的函数头, mutex相关的函数
#include <pthread.h>
// 类型重命名:地址结构体的规范
typedef struct sockaddr SockAddr;
// 地址结构体的规范的实现结构体
typedef struct sockaddr_in SockAddrIn;
#include <net/if.h>
#endif
mysock.h
#ifndef __MY_SOCK_H
#define __MY_SOCK_H
// 导入头文件
#include "header.h"
typedef struct
{
char user[10]; // 用户名
int16_t request; // 请求类型(1: 注册, 2: 登录)
char pass[10]; // 密码
} __attribute__((packed)) param_t;
// 定义函数接口
// 1.根据给定的ip地址和端口号初始化socket描述符
int socket_init(const char *ip, uint16_t port);
// 2.根据socket的描述符建立连接
int build_connet(int socket);
// 3.关闭socket
int close_socket(int socket);
// 4.提供服务的函数----线程要执行的任务函数
void *doService(void *args);
#endif
threadpool.h
#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
// 引入头文件
#include "header.h"
// 定义任务所需的结构体
typedef struct _task
{
// 任务函数的指针变量
void *(*task)(void *);
// 任务函数执行时需要的参数
void *args;
// 下一个任务节点的地址
struct _task *next;
} task_t;
// 线程池相关的结构体
typedef struct
{
// 线程池的信息相关的成员
int running; // 线程池中线程的状态(1:在运行,0:不在运行)
int thread_num; // 线程池中线程的数量
int queue_num; // 任务队列当前的任务数量
int queue_max_size; // 任务队列的最大容量
// 线程同步相关的成员
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond_queue_empty; // 条件变量:检测任务队列是否为空
pthread_cond_t cond_queue_full; // 条件变量:检测任务队列是否存满
// 线程相关的成员
pthread_t *id;
task_t *head;
task_t *tail;
} threadpool_t;
// 线程池相关的接口
// 1.线程池初始化
int threadpool_init(threadpool_t *pool, int tnum, int qsize);
// 2.给任务队列添加任务
int threadpool_addtask(threadpool_t *pool, void *(*task)(void *), void *args);
// 3.销毁线程池
int threadpool_destroy(threadpool_t *pool);
#endif
mysock.c
#include "mysock.h"
#define USER_FILE "user.dat"
#define BUFFER_SIZE 256
// 1.根据给定的ip地址和端口号初始化socket描述符
int socket_init(const char *ip, uint16_t port)
{
// 1.创建socket
int s_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == s_sockfd)
{
perror("socket failed");
return -1;
}
// 2.绑定地址
SockAddrIn serverAddr;
// ipv4协议
serverAddr.sin_family = AF_INET;
// 端口
serverAddr.sin_port = htons(port);
// ip地址
serverAddr.sin_addr.s_addr = inet_addr(ip);
// 绑定地址
int r = bind(s_sockfd, (SockAddr *)&serverAddr, sizeof(serverAddr));
if (-1 == r)
{
perror("bind failed");
close(s_sockfd);
return -1;
}
// 3.监听客户端
int r2 = listen(s_sockfd, 1);
if (-1 == r2)
{
perror("listen failed");
close(s_sockfd);
return -1;
}
return s_sockfd;
}
// 2.根据socket的描述符建立连接
int build_connet(int socket)
{
// 4.接受客户端的连接
SockAddrIn clientAddr;
socklen_t len = sizeof(clientAddr);
puts("等待客户端的连接......");
int c_sockfd = accept(socket, (SockAddr *)&clientAddr, &len);
// int c_sockfd = -1;
if (-1 == c_sockfd)
{
perror("accept failed");
close(socket);
return -1;
}
/* 获取连接成功的客户端ip地址
char *ip = inet_ntoa(clientAddr.sin_addr);
// 获取连接成功的客户端端口号
unsigned short port = ntohs(clientAddr.sin_port);
printf("[%s: %d]: 客户端连接成功\n", ip, port);
// 给客户端做出响应---发送连接成功的标识
char *msg = "连接服务器成功";
send(c_sockfd, msg, strlen(msg), 0);*/
// 返回客户端的套接字描述符
return c_sockfd;
}
// 3.关闭socket
int close_socket(int socket)
{
return close(socket);
}
// 4.提供服务的函数----线程要执行的任务函数
// 服务函数
pthread_mutex_t file_mutex = PTHREAD_MUTEX_INITIALIZER;
// 处理客户端请求的函数
void *doService(void *arg)
{
int c_sockfd = *(int *)arg;
param_t param;
char buffer[BUFFER_SIZE];
while (1)
{
// 接收客户端发送的数据
int len = recv(c_sockfd, ¶m, sizeof(param), 0);
if (len <= 0)
{
if (len == 0)
{
printf("客户端已断开连接。\n");
}
else
{
perror("接收数据失败");
}
break;
}
// 处理注册请求
if (param.request == 1)
{
pthread_mutex_lock(&file_mutex);
FILE *file = fopen(USER_FILE, "a+");
if (file == NULL)
{
perror("打开文件失败");
strcpy(buffer, "注册失败,无法写入文件!");
}
else
{
int found = 0;
char user[10], pass[10];
while (fscanf(file, "%s %s", user, pass) != EOF)
{
if (strcmp(user, param.user) == 0)
{
found = 1;
break;
}
}
if (found)
{
strcpy(buffer, "注册失败,用户名已存在!");
}
else
{
fprintf(file, "%s %s\n", param.user, param.pass);
strcpy(buffer, "注册成功!");
}
fclose(file);
}
pthread_mutex_unlock(&file_mutex);
}
// 处理登录请求
else if (param.request == 2)
{
pthread_mutex_lock(&file_mutex);
FILE *file = fopen(USER_FILE, "r");
if (file == NULL)
{
perror("打开文件失败");
strcpy(buffer, "登录失败,用户数据不存在!");
}
else
{
int found = 0;
char user[10], pass[10];
while (fscanf(file, "%s %s", user, pass) != EOF)
{
if (strcmp(user, param.user) == 0 && strcmp(pass, param.pass) == 0)
{
found = 1;
break;
}
}
fclose(file);
if (found)
{
strcpy(buffer, "登录成功!");
}
else
{
strcpy(buffer, "登录失败,用户名或密码错误!");
}
}
pthread_mutex_unlock(&file_mutex);
}
// 处理无效请求
else
{
strcpy(buffer, "无效请求类型!");
}
// 发送响应给客户端
if (send(c_sockfd, buffer, strlen(buffer), 0) < 0)
{
perror("发送响应失败");
break;
}
}
// 关闭连接
close(c_sockfd);
printf("客户端连接已关闭。\n");
return NULL;
}
这段代码实现了一个简单的服务器程序,核心功能包括:
初始化服务端套接字。
接受客户端连接。
处理客户端的注册和登录请求。
通过线程处理多个客户端的请求。
以下是对代码的详细分析:
代码结构
头文件和宏定义:
包含了自定义头文件 mysock.h(假设其中定义了 SockAddr 和 SockAddrIn)。
定义了 USER_FILE 和 BUFFER_SIZE 两个宏。
socket 初始化函数 (socket_init):
创建并初始化服务端套接字,绑定地址和端口,开始监听客户端连接。
连接建立函数 (build_connet):
接受客户端的连接请求,返回客户端的套接字描述符。
socket 关闭函数 (close_socket):
关闭指定的套接字。
服务函数 (doService):
处理客户端的注册和登录请求。
使用文件 user.dat 存储用户数据。
支持多线程处理多个客户端的请求。
函数详细分析
1. socket_init
int socket_init(const char *ip, uint16_t port)
{
// 1.创建socket
int s_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (-1 == s_sockfd)
{
perror("socket failed");
return -1;
}
// 2.绑定地址
SockAddrIn serverAddr;
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
serverAddr.sin_addr.s_addr = inet_addr(ip);
int r = bind(s_sockfd, (SockAddr *)&serverAddr, sizeof(serverAddr));
if (-1 == r)
{
perror("bind failed");
close(s_sockfd);
return -1;
}
// 3.监听客户端
int r2 = listen(s_sockfd, 1);
if (-1 == r2)
{
perror("listen failed");
close(s_sockfd);
return -1;
}
return s_sockfd;
}
功能:初始化服务端套接字。
流程:
创建套接字(AF_INET 表示 IPv4,SOCK_STREAM 表示 TCP)。
绑定 IP 地址和端口。
开始监听客户端连接(listen 的第二个参数 1 表示最大等待连接数)。
返回值:成功返回套接字描述符,失败返回 -1。
2. build_connet
<C>
int build_connet(int socket)
{
// 接受客户端连接
SockAddrIn clientAddr;
socklen_t len = sizeof(clientAddr);
puts("等待客户端的连接......");
int c_sockfd = accept(socket, (SockAddr *)&clientAddr, &len);
if (-1 == c_sockfd)
{
perror("accept failed");
close(socket);
return -1;
}
return c_sockfd;
}
功能:接受客户端的连接请求。
流程:
调用 accept 接受客户端连接。
如果连接成功,返回客户端的套接字描述符。
如果失败,返回 -1 并关闭服务端套接字。
返回值:成功返回客户端套接字描述符,失败返回 -1。
3. close_socket
int close_socket(int socket)
{
return close(socket);
}
功能:关闭指定的套接字。
返回值:返回 close 函数的返回值。
4. doService
<C>
void *doService(void *arg)
{
int c_sockfd = *(int *)arg;
param_t param;
char buffer[BUFFER_SIZE];
while (1)
{
// 接收客户端请求
int len = recv(c_sockfd, ¶m, sizeof(param), 0);
if (len <= 0)
{
if (len == 0)
{
printf("客户端已断开连接。\n");
}
else
{
perror("接收数据失败");
}
break;
}
// 处理注册请求
if (param.request == 1)
{
pthread_mutex_lock(&file_mutex);
FILE *file = fopen(USER_FILE, "a+");
if (file == NULL)
{
perror("打开文件失败");
strcpy(buffer, "注册失败,无法写入文件!");
}
else
{
int found = 0;
char user[10], pass[10];
while (fscanf(file, "%s %s", user, pass) != EOF)
{
if (strcmp(user, param.user) == 0)
{
found = 1;
break;
}
}
if (found)
{
strcpy(buffer, "注册失败,用户名已存在!");
}
else
{
fprintf(file, "%s %s\n", param.user, param.pass);
strcpy(buffer, "注册成功!");
}
fclose(file);
}
pthread_mutex_unlock(&file_mutex);
}
// 处理登录请求
else if (param.request == 2)
{
pthread_mutex_lock(&file_mutex);
FILE *file = fopen(USER_FILE, "r");
if (file == NULL)
{
perror("打开文件失败");
strcpy(buffer, "登录失败,用户数据不存在!");
}
else
{
int found = 0;
char user[10], pass[10];
while (fscanf(file, "%s %s", user, pass) != EOF)
{
if (strcmp(user, param.user) == 0 && strcmp(pass, param.pass) == 0)
{
found = 1;
break;
}
}
fclose(file);
if (found)
{
strcpy(buffer, "登录成功!");
}
else
{
strcpy(buffer, "登录失败,用户名或密码错误!");
}
}
pthread_mutex_unlock(&file_mutex);
}
// 处理无效请求
else
{
strcpy(buffer, "无效请求类型!");
}
// 发送响应给客户端
if (send(c_sockfd, buffer, strlen(buffer), 0) < 0)
{
perror("发送响应失败");
break;
}
}
// 关闭连接
close(c_sockfd);
printf("客户端连接已关闭。\n");
return NULL;
}
功能:处理客户端的注册和登录请求。
流程:
接收客户端发送的请求数据。
根据请求类型(注册或登录)处理:
注册:检查用户名是否已存在,如果不存在则写入文件。
登录:检查用户名和密码是否匹配。
发送响应给客户端。
如果连接断开或出错,关闭客户端套接字。
线程安全:使用互斥锁 pthread_mutex_t 确保文件操作的线程安全。
代码的优点
模块化设计:
将初始化、连接、服务等功能分离,代码结构清晰。
多线程支持:
通过 doService 函数支持多线程处理客户端请求。
线程安全:
使用互斥锁保护文件操作,避免竞争条件。
代码的改进建议
日志优化:
增加日志输出(如客户端 IP 和端口),方便调试。
文件操作优化:
使用 fopen 和 fclose 频繁操作文件效率较低,可以改用内存缓存或数据库。
错误处理:
在文件操作和网络通信中,添加更详细的错误处理逻辑。
支持更多请求类型:
扩展 doService 函数,支持更多操作(如修改密码、删除用户)。
退出机制:
添加退出机制,允许服务器优雅地关闭。
示例运行流程
服务端启动:
调用 socket_init 初始化套接字,绑定 IP 和端口。
调用 build_connet 接受客户端连接。
客户端连接:
连接到服务端,发送注册或登录请求。
服务端处理:
调用 doService 处理请求,返回响应。
连接关闭:
客户端断开连接后,服务端关闭套接字。
通过以上分析,可以看出这段代码是一个功能完善的服务器程序,适合用于处理客户端的注册和登录请求。
threadpool.c
#include "threadpool.h"
void *task_fun(void *args)
{
threadpool_t *pool = (threadpool_t *)args;
// 循环执行服务端的代码
while (1)
{
// 加互斥锁
pthread_mutex_lock(&pool->mutex);
// 循环判断线程池中队列的数量是否为0,线程的状态是否是运行状态
while (pool->queue_num == 0 && pool->running == 1)
{
// 线程池中队列的数量为0,并且当前有任务正在运行
// 条件变量为空的条件阻塞
pthread_cond_wait(&pool->cond_queue_empty, &pool->mutex);
}
// 如果任务数量不为空,或者当前没有正在运行的任务
if (pool->running == 0)
{
// 当前没有正在运行的任务,无需执行操作,直接返回
// 解互斥锁
pthread_mutex_unlock(&pool->mutex);
// break;
return NULL;
}
// 记录当前任务队列中的头部任务
task_t *p = pool->head;
// 任务数量减去1
pool->queue_num--;
// 判断任务的数量是否为0
if (pool->queue_num == 0)
{
// 执行了当前任务没有等待执行的任务
// 让头指针和尾指针都移动到空位值(NULL)
pool->head = pool->tail = NULL;
}
else
{
// 执行了当前任务还有等待执行的任务
// 移动头节点到下一个节点处
pool->head = pool->head->next;
}
// 如果任务队列数量等于任务最大值,表示任务队列满了
if (pool->queue_num == pool->queue_max_size - 1)
{
// 给条件变量为满的发送信号
pthread_cond_signal(&pool->cond_queue_full);
}
// 解互斥锁
pthread_mutex_unlock(&pool->mutex);
// 回调任务函数
// p->task(p->args);
(*p->task)(p->args);
// 回收执行了的头节点
free(p);
p = NULL;
}
return NULL;
}
// 1.线程池初始化
int threadpool_init(threadpool_t *pool, int tnum, int qsize)
{
// 初始化结构体成员
pool->running = 1; // 1表示运行, 0表示未运行
pool->thread_num = tnum;
pool->queue_num = 0;
pool->queue_max_size = qsize;
pool->head = NULL;
pool->tail = NULL;
// 初始化互斥锁
if (pthread_mutex_init(&pool->mutex, NULL) == -1)
{
return -1;
}
// 初始化条件变量
if (pthread_cond_init(&pool->cond_queue_empty, NULL) == -1)
{
// 销毁上一步初始化的互斥量
pthread_mutex_destroy(&pool->mutex);
return -1;
}
if (pthread_cond_init(&pool->cond_queue_full, NULL) == -1)
{
// 销毁上一步初始化的互斥量
pthread_mutex_destroy(&pool->mutex);
// 销毁上一步的条件变量
pthread_cond_destroy(&pool->cond_queue_empty);
return -1;
}
// 在堆内存上为线程id申请一段内存空间
pool->id = (pthread_t *)calloc(tnum, sizeof(pthread_t));
if (pool->id == NULL)
{
// 销毁上一步初始化的互斥量
pthread_mutex_destroy(&pool->mutex);
// 销毁上一步的条件变量
pthread_cond_destroy(&pool->cond_queue_empty);
// 销毁上一步的条件变量
pthread_cond_destroy(&pool->cond_queue_full);
return -1;
}
// 循环创建线程
for (int i = 0; i < tnum; i++)
{
pthread_create(pool->id + i, NULL, task_fun, pool);
}
return 0;
}
// 2.给任务队列添加任务
int threadpool_addtask(threadpool_t *pool, void *(*task_fun)(void*), void *args)
{
if (pool == NULL || task_fun == NULL)
{
// 池子为空,或者要执行的任务函数指针为空,就直接返回
return -1;
}
// 加互斥锁
pthread_mutex_lock(&pool->mutex);
// 判断池子中任务的数量是否已经满了
while (pool->queue_num == pool->queue_max_size)
{
// 池子中任务已经存满了---让条件变量阻塞
pthread_cond_wait(&pool->cond_queue_full, &pool->mutex);
}
// 为任务申请动态内存
task_t *p = (task_t *)calloc(1, sizeof(task_t));
if (p == NULL)
{
// 拆互斥锁
pthread_mutex_unlock(&pool->mutex);
return -1;
}
// 给任务结构体成员赋值
p->task = task_fun;
p->args = args;
p->next = NULL;
// 判断任务队列是否为空
if (pool->tail == NULL)
{
// 任务队列为空,就将任务结构存放到任务队列中,并修改头和尾指针指向该任务
pool->head = pool->tail = p;
// 等待的线程全部唤醒
pthread_cond_broadcast(&pool->cond_queue_empty);
}
else
{
// 任务队列不为空
// 前一个尾指针的next指向新的任务
pool->tail->next = p;
pool->tail = p;
}
// 任务数量加1
pool->queue_num++;
// 拆互斥锁
pthread_mutex_unlock(&pool->mutex);
return 0;
}
// 3.销毁线程池
int threadpool_destroy(threadpool_t *pool)
{
// 先修改线程的运行状态,让它处于停止状态
pool->running = 0;
// 等待的所有线程全部唤醒
pthread_cond_broadcast(&pool->cond_queue_empty);
pthread_cond_broadcast(&pool->cond_queue_full);
// 销毁互斥量
pthread_mutex_destroy(&pool->mutex);
// 销毁条件变量
pthread_cond_destroy(&pool->cond_queue_empty);
pthread_cond_destroy(&pool->cond_queue_full);
// 等待所有线程结束
for (int i = 0; i < pool->thread_num; i++)
{
pthread_cancel(pool->id[i]);
// 等待线程的结束
pthread_join(pool->id[i], NULL);
}
// 回收存储id的堆内存
free(pool->id);
pool->id = NULL;
// 将任务结构体中的指针置为NULL---使用指针尾随思想
task_t *curr = pool->head;
task_t *follow = NULL;
// 循环回收任务指针
while (curr)
{
// 尾随指针移动到当前指针指向的节点处
follow = curr;
// 当前指针移动到下一个节点处
curr = curr->next;
// 回收尾随指针指向的节点
free(follow);
follow = NULL;
}
// 将头指针位尾指针设置为NULL
pool->head = NULL;
pool->tail = NULL;
return 0;
}
这段代码实现了一个简单的线程池(Thread Pool),主要功能包括:
线程池的初始化。
向线程池中添加任务。
销毁线程池。
以下是代码的详细分析:
代码结构
任务函数 (task_fun):
线程池中每个线程执行的任务函数,负责从任务队列中获取任务并执行。
线程池初始化函数 (threadpool_init):
初始化线程池,包括线程数量、任务队列大小、互斥锁、条件变量等。
添加任务函数 (threadpool_addtask):
向线程池的任务队列中添加任务。
销毁线程池函数 (threadpool_destroy):
销毁线程池,释放所有资源。
关键数据结构
typedef struct task_t
{
void *(*task)(void *); // 任务函数指针
void *args; // 任务函数参数
struct task_t *next; // 下一个任务
} task_t;
typedef struct threadpool_t
{
pthread_t *id; // 线程 ID 数组
int thread_num; // 线程数量
int queue_num; // 当前任务队列中的任务数量
int queue_max_size; // 任务队列的最大容量
task_t *head; // 任务队列的头指针
task_t *tail; // 任务队列的尾指针
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond_queue_empty; // 条件变量:任务队列为空
pthread_cond_t cond_queue_full; // 条件变量:任务队列为满
int running; // 线程池运行状态(1:运行,0:停止)
} threadpool_t;
函数详细分析
1. task_fun
void *task_fun(void *args)
{
threadpool_t *pool = (threadpool_t *)args;
while (1)
{
pthread_mutex_lock(&pool->mutex);
while (pool->queue_num == 0 && pool->running == 1)
{
pthread_cond_wait(&pool->cond_queue_empty, &pool->mutex);
}
if (pool->running == 0)
{
pthread_mutex_unlock(&pool->mutex);
return NULL;
}
task_t *p = pool->head;
pool->queue_num--;
if (pool->queue_num == 0)
{
pool->head = pool->tail = NULL;
}
else
{
pool->head = pool->head->next;
}
if (pool->queue_num == pool->queue_max_size - 1)
{
pthread_cond_signal(&pool->cond_queue_full);
}
pthread_mutex_unlock(&pool->mutex);
(*p->task)(p->args);
free(p);
p = NULL;
}
return NULL;
}
功能:线程池中每个线程的任务函数。
流程:
加锁,确保线程安全。
如果任务队列为空且线程池正在运行,则阻塞等待条件变量 cond_queue_empty。
如果线程池停止运行,则解锁并退出线程。
从任务队列头部获取任务。
更新任务队列状态(任务数量、头指针、尾指针)。
如果任务队列从满变未满,则发送条件变量信号 cond_queue_full。
解锁并执行任务。
释放任务节点的内存。
2. threadpool_init
int threadpool_init(threadpool_t *pool, int tnum, int qsize)
{
pool->running = 1;
pool->thread_num = tnum;
pool->queue_num = 0;
pool->queue_max_size = qsize;
pool->head = NULL;
pool->tail = NULL;
if (pthread_mutex_init(&pool->mutex, NULL) == -1)
{
return -1;
}
if (pthread_cond_init(&pool->cond_queue_empty, NULL) == -1)
{
pthread_mutex_destroy(&pool->mutex);
return -1;
}
if (pthread_cond_init(&pool->cond_queue_full, NULL) == -1)
{
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond_queue_empty);
return -1;
}
pool->id = (pthread_t *)calloc(tnum, sizeof(pthread_t));
if (pool->id == NULL)
{
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond_queue_empty);
pthread_cond_destroy(&pool->cond_queue_full);
return -1;
}
for (int i = 0; i < tnum; i++)
{
pthread_create(pool->id + i, NULL, task_fun, pool);
}
return 0;
}
功能:初始化线程池。
流程:
初始化线程池结构体成员。
初始化互斥锁和条件变量。
动态分配线程 ID 数组的内存。
创建指定数量的线程,每个线程执行 task_fun。
返回值:成功返回 0,失败返回 -1。
3. threadpool_addtask
int threadpool_addtask(threadpool_t *pool, void *(*task_fun)(void *), void *args)
{
if (pool == NULL || task_fun == NULL)
{
return -1;
}
pthread_mutex_lock(&pool->mutex);
while (pool->queue_num == pool->queue_max_size)
{
pthread_cond_wait(&pool->cond_queue_full, &pool->mutex);
}
task_t *p = (task_t *)calloc(1, sizeof(task_t));
if (p == NULL)
{
pthread_mutex_unlock(&pool->mutex);
return -1;
}
p->task = task_fun;
p->args = args;
p->next = NULL;
if (pool->tail == NULL)
{
pool->head = pool->tail = p;
pthread_cond_broadcast(&pool->cond_queue_empty);
}
else
{
pool->tail->next = p;
pool->tail = p;
}
pool->queue_num++;
pthread_mutex_unlock(&pool->mutex);
return 0;
}
功能:向线程池的任务队列中添加任务。
流程:
加锁,确保线程安全。
如果任务队列已满,则阻塞等待条件变量 cond_queue_full。
动态分配任务节点的内存。
初始化任务节点(任务函数指针、参数、next 指针)。
将任务节点加入任务队列。
如果任务队列从空变为非空,则发送条件变量信号 cond_queue_empty。
解锁。
返回值:成功返回 0,失败返回 -1。
4. threadpool_destroy
<C>
int threadpool_destroy(threadpool_t *pool)
{
pool->running = 0;
pthread_cond_broadcast(&pool->cond_queue_empty);
pthread_cond_broadcast(&pool->cond_queue_full);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond_queue_empty);
pthread_cond_destroy(&pool->cond_queue_full);
for (int i = 0; i < pool->thread_num; i++)
{
pthread_cancel(pool->id[i]);
pthread_join(pool->id[i], NULL);
}
free(pool->id);
pool->id = NULL;
task_t *curr = pool->head;
task_t *follow = NULL;
while (curr)
{
follow = curr;
curr = curr->next;
free(follow);
follow = NULL;
}
pool->head = NULL;
pool->tail = NULL;
return 0;
}
功能:销毁线程池,释放所有资源。
流程:
设置线程池运行状态为 0(停止)。
唤醒所有等待的线程。
销毁互斥锁和条件变量。
取消并等待所有线程结束。
释放线程 ID 数组的内存。
释放任务队列中的节点。
将头指针和尾指针置为 NULL。
返回值:成功返回 0。
代码的优点
线程安全:
使用互斥锁和条件变量确保线程池的线程安全。
任务队列管理:
动态管理任务队列,支持任务的最大数量限制。
资源回收:
在销毁线程池时,释放所有动态分配的内存。
代码的改进建议
增加日志:
添加日志输出,方便调试和监控线程池的运行状态。
任务优先级:
支持任务优先级,高优先级任务优先执行。
动态调整线程数量:
根据任务数量动态调整线程池中的线程数量,提高资源利用率。
错误处理增强:
在动态内存分配和线程操作中,增加错误处理逻辑。
示例使用场景
void *my_task(void *args)
{
printf("执行任务: %s\n", (char *)args);
return NULL;
}
int main()
{
threadpool_t pool;
threadpool_init(&pool, 4, 10); // 初始化线程池,4 个线程,任务队列最大容量为 10
for (int i = 0; i < 20; i++)
{
char *task_args = malloc(32);
sprintf(task_args, "任务 %d", i + 1);
threadpool_addtask(&pool, my_task, task_args);
}
sleep(5); // 等待所有任务执行完成
threadpool_destroy(&pool); // 销毁线程池
return 0;
}
通过以上分析,可以看出这段代码实现了一个功能完善的线程池,适合用于高并发任务处理的场景。
main.c
#include "mysock.h"
#include "threadpool.h"
int main(int argc, char const *argv[])
{
if (argc < 3)
{
puts("命令行缺少参数");
return -1;
}
printf("%s---%s\n", argv[1], argv[2]);
// 调用自己封装的init函数,获取服务端的套接字
int s_sockfd = socket_init(argv[1], atoi(argv[2]));
if (s_sockfd == -1)
{
return -1;
}
// 定义线程池结构体变量
threadpool_t pool;
// 调用线程池的初始化函数
if (threadpool_init(&pool, 3, 10) == -1)
{
puts("线程池初始化失败");
close_socket(s_sockfd);
return -1;
}
// 循环接收客户端的连接请求
while (1)
{
// 调用自己封装的连接函数,获取客户端的套接字
int c_sockfd = build_connet(s_sockfd);
if (c_sockfd == -1)
{
close_socket(s_sockfd);
return -1;
}
// 调用线程池函数添加任务
threadpool_addtask(&pool, doService, &c_sockfd);
}
// 销毁线程池
threadpool_destroy(&pool);
return 0;
}
这段代码实现了一个简单的多线程服务器,核心功能包括:
初始化服务端套接字。
接收客户端的连接请求。
使用线程池处理客户端的请求。
以下是对代码的详细分析:
代码结构
参数检查:
检查命令行参数是否完整。
服务端套接字初始化:
调用 socket_init 函数初始化服务端套接字。
线程池初始化:
调用 threadpool_init 函数初始化线程池。
客户端连接处理:
循环接收客户端的连接请求。
调用 build_connet 函数获取客户端套接字。
使用线程池添加任务,处理客户端请求。
线程池销毁:
调用 threadpool_destroy 函数释放线程池资源。
代码逐行分析
1. 参数检查
if (argc < 3)
{
puts("命令行缺少参数");
return -1;
}
printf(“%s—%s\n”, argv[1], argv[2]);
功能:检查命令行参数的数量是否满足要求。
参数:
argv[1]:IP 地址。
argv[2]:端口号。
逻辑:如果参数少于 3 个,输出错误信息并退出程序。
2. 服务端套接字初始化
int s_sockfd = socket_init(argv[1], atoi(argv[2]));
if (s_sockfd == -1)
{
return -1;
}
功能:初始化服务端套接字。
函数调用:
socket_init(argv[1], atoi(argv[2])):创建并绑定服务端套接字。
逻辑:如果初始化失败,返回 -1。
3. 线程池初始化
threadpool_t pool;
if (threadpool_init(&pool, 3, 10) == -1)
{
puts("线程池初始化失败");
close_socket(s_sockfd);
return -1;
}
功能:初始化线程池。
参数:
&pool:线程池结构体变量。
3:线程数量。
10:任务队列的最大容量。
逻辑:如果初始化失败,关闭服务端套接字并退出程序。
4. 客户端连接处理
while (1)
{
int c_sockfd = build_connet(s_sockfd);
if (c_sockfd == -1)
{
close_socket(s_sockfd);
return -1;
}
threadpool_addtask(&pool, doService, &c_sockfd);
}
功能:循环接收客户端的连接请求,并使用线程池处理请求。
函数调用:
build_connet(s_sockfd):接受客户端连接,返回客户端套接字描述符。
threadpool_addtask(&pool, doService, &c_sockfd):将客户端请求添加到线程池的任务队列中。
逻辑:
如果客户端连接失败,关闭服务端套接字并退出程序。
使用线程池处理客户端的请求,避免阻塞主线程。
5. 线程池销毁
threadpool_destroy(&pool);
功能:销毁线程池,释放资源。
函数调用:
threadpool_destroy(&pool):销毁线程池。
代码的优点
多线程支持:
使用线程池处理客户端请求,避免频繁创建和销毁线程,提高性能。
模块化设计:
将服务端套接字初始化和线程池初始化分离,代码结构清晰。
资源管理:
在线程池销毁时释放所有资源,避免内存泄漏。
客户端
main.c
#include "mysock.h"
#include "threadpool.h"
int main(int argc, char const *argv[])
{
if (argc < 3)
{
puts("用法: ./client <服务器IP> <端口号>");
return -1;
}
int sockfd;
struct sockaddr_in server_addr;
param_t param;
char buffer[256];
// 创建套接字
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("套接字创建失败");
return -1;
}
// 配置服务器地址
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
server_addr.sin_addr.s_addr = inet_addr(argv[1]);
// 连接服务器
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("连接服务器失败");
close(sockfd);
return -1;
}
printf("成功连接到服务器: %s:%s\n", argv[1], argv[2]);
// 客户端交互界面
while (1)
{
printf("=======================\n");
printf("1. 用户注册\n");
printf("2. 用户登录\n");
printf("0. 退出\n");
printf("=======================\n");
printf("请输入选项: ");
int choice;
if (scanf("%d", &choice) != 1)
{
// 清除无效输入
while (getchar() != '\n')
;
printf("无效选项,请输入数字!\n");
continue;
}
if (choice == 0)
{
printf("退出程序。\n");
break;
}
if (choice != 1 && choice != 2)
{
printf("无效选项,请输入 0、1 或 2!\n");
continue;
}
printf("用户名: ");
scanf("%s", param.user);
printf("密码: ");
scanf("%s", param.pass);
param.request = choice;
// 打印发送的数据
printf("发送数据: 用户名=%s, 密码=%s, 请求类型=%d\n", param.user, param.pass, param.request);
// 发送请求数据
if (send(sockfd, ¶m, sizeof(param), 0) < 0)
{
perror("发送数据失败");
close(sockfd);
return -1;
}
// 接收服务器响应
int len = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (len <= 0)
{
if (len == 0)
{
printf("服务器关闭连接。\n");
}
else
{
perror("接收数据失败");
}
close(sockfd);
return -1;
}
buffer[len] = '\0';
printf("服务器响应: %s\n", buffer);
// 登录成功后提供继续或退出选项
if (choice == 2 && strcmp(buffer, "登录成功!") == 0)
{
while (1)
{
printf("==========================\n");
printf("1. 继续操作\n");
printf("0. 退出程序\n");
printf("==========================\n");
printf("请输入选项: ");
int exit_choice;
if (scanf("%d", &exit_choice) != 1)
{
// 清除无效输入
while (getchar() != '\n')
;
printf("无效选项,请输入数字!\n");
continue;
}
if (exit_choice == 0)
{
printf("退出程序。\n");
close(sockfd);
return 0; // 退出程序
}
else if (exit_choice == 1)
{
break; // 继续操作
}
else
{
printf("无效选项,请重新输入。\n");
}
}
}
}
// 关闭套接字
close(sockfd);
printf("关闭连接,程序结束。\n");
return 0;
}
代码结构
参数检查:
检查命令行参数是否完整。
套接字创建和连接:
创建套接字并连接到服务器。
客户端交互界面:
提供用户注册和登录功能。
支持用户退出程序。
服务器交互:
发送用户请求数据到服务器。
接收并显示服务器的响应。
登录成功后的处理:
提供继续操作或退出程序的选项。
资源释放:
关闭套接字,释放资源。
代码逐行分析
1. 参数检查
if (argc < 3)
{
puts("用法: ./client <服务器IP> <端口号>");
return -1;
}
功能:检查命令行参数的数量是否满足要求。
参数:
argv[1]:服务器 IP 地址。
argv[2]:服务器端口号。
逻辑:如果参数少于 3 个,输出使用说明并退出程序。
2. 套接字创建和连接
int sockfd;
struct sockaddr_in server_addr;
// 创建套接字
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("套接字创建失败");
return -1;
}
// 配置服务器地址
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(atoi(argv[2]));
server_addr.sin_addr.s_addr = inet_addr(argv[1]);
// 连接到服务器
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0)
{
perror("连接服务器失败");
close(sockfd);
return -1;
}
printf("成功连接到服务器: %s:%s\n", argv[1], argv[2]);
功能:创建 TCP 套接字并连接到服务器。
逻辑:
使用 socket 函数创建套接字。
配置服务器地址结构体 server_addr。
使用 connect 函数连接到服务器。
如果连接失败,输出错误信息并关闭套接字。
3. 客户端交互界面
while (1)
{
printf("=======================\n");
printf("1. 用户注册\n");
printf("2. 用户登录\n");
printf("0. 退出\n");
printf("=======================\n");
printf("请输入选项: ");
int choice;
if (scanf("%d", &choice) != 1)
{
// 清除无效输入
while (getchar() != '\n')
;
printf("无效选项,请输入数字!\n");
continue;
}
if (choice == 0)
{
printf("退出程序。\n");
break;
}
if (choice != 1 && choice != 2)
{
printf("无效选项,请输入 0、1 或 2!\n");
continue;
}
printf("用户名: ");
scanf("%s", param.user);
printf("密码: ");
scanf("%s", param.pass);
param.request = choice;
...
}
功能:提供用户交互界面,支持用户注册、登录和退出。
逻辑:
输出菜单选项(注册、登录、退出)。
获取用户输入,并进行验证。
如果用户选择退出(0),退出循环。
获取用户名和密码,并设置请求类型。
4. 服务器交互
// 打印发送的数据
printf("发送数据: 用户名=%s, 密码=%s, 请求类型=%d\n", param.user, param.pass, param.request);
// 发送请求数据
if (send(sockfd, ¶m, sizeof(param), 0) < 0)
{
perror("发送数据失败");
close(sockfd);
return -1;
}
// 接收服务器响应
int len = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
if (len <= 0)
{
if (len == 0)
{
printf("服务器关闭连接。\n");
}
else
{
perror("接收数据失败");
}
close(sockfd);
return -1;
}
buffer[len] = '\0';
printf("服务器响应: %s\n", buffer);
功能:发送用户请求数据到服务器,并接收服务器的响应。
逻辑:
打印发送的数据,便于调试。
使用 send 函数发送请求数据到服务器。
使用 recv 函数接收服务器的响应。
如果接收失败或服务器关闭连接,输出错误信息并关闭套接字。
打印服务器的响应。
5. 登录成功后的处理
if (choice == 2 && strcmp(buffer, "登录成功!") == 0)
{
while (1)
{
printf("==========================\n");
printf("1. 继续操作\n");
printf("0. 退出程序\n");
printf("==========================\n");
printf("请输入选项: ");
int exit_choice;
if (scanf("%d", &exit_choice) != 1)
{
// 清除无效输入
while (getchar() != '\n')
;
printf("无效选项,请输入数字!\n");
continue;
}
if (exit_choice == 0)
{
printf("退出程序。\n");
close(sockfd);
return 0; // 退出程序
}
else if (exit_choice == 1)
{
break; // 继续操作
}
else
{
printf("无效选项,请重新输入。\n");
}
}
}
功能:在登录成功后,提供继续操作或退出程序的选项。
逻辑:
如果用户登录成功,输出菜单选项(继续操作、退出程序)。
获取用户输入,并进行验证。
如果用户选择退出(0),关闭套接字并退出程序。
如果用户选择继续操作(1),返回主循环。
6. 资源释放
close(sockfd);
printf(“关闭连接,程序结束。\n”);
return 0;
功能:关闭套接字,释放资源。
逻辑:
使用 close 函数关闭套接字。
输出提示信息,程序结束。
代码的优点
结构清晰:
代码分为多个逻辑块,功能明确,易于理解。
用户交互友好:
提供清晰的菜单选项和错误提示。
健壮性:
处理了多种错误情况(如无效输入、网络连接失败等)。
可扩展性:
可以轻松扩展功能(如新增选项或修改逻辑)。