Linux | 进程池技术解析:利用无名管道实现并发任务处理(含实现代码)
在血海里游泳,一直游到海水变蓝。——何小鹏
2024.8.31
目录
一、进程池
二、使用匿名管道实现进程池的核心
前置知识:管道的四种情况和五个特征
三、代码实现
四、实现代码详解
main()
2、loadTask()
3、channelInit()
问题:为什么要将子进程的管道读端重定向至标准输入?
4、ctrlProcess()
5、channelClose()
一、进程池
多任务处理是提高系统性能和响应速度的关键。进程池技术作为一种有效的资源管理和任务调度策略,已经成为并发编程中不可或缺的一部分。本文探讨进程池的工作原理,以及如何通过使用无名管道来实现高效的并发任务处理。
- 定义:进程池由一组预先创建的空闲进程(资源进程)和管理这些进程的管理进程组成。
- 作用:优化资源管理和提高系统效率,通过预先创建进程减少频繁创建和销毁进程的开销。
- 预期并发效果:虽然进程池中的进程数量固定,但可以并行处理多个任务,实现并发效果。
二、使用匿名管道实现进程池的核心
- 主函数逻辑:创建任务,创建子进程池,发送任务,关闭写端和回收子进程。
- 父进程创建进程池:使用
fork()
函数创建子进程,子进程阻塞在read()
函数处等待任务。 - 子进程退出时机:当所有管道写端关闭时,子进程通过
read()
函数返回0值判断退出。
前置知识:管道的四种情况和五个特征
管道的四种情况:
- 如果管道是空的,则读取端被阻塞
- 如果管道是满的,则写入端被阻塞
- 如果关闭了管道的读端,那管道没必要存在,被13号信号杀死
- 如果关闭了管道的写端,读取完毕后管道返回0,表示读到了文件末尾
管道的五个特征:
- 匿名管道只能用于有血缘关系的进程通信,常用于父子间通信。
- 管道内部实现了同步机制,读写具有明显的顺序性。
- 管道的生命周期是随进程的,随着进程使用管道而创建缓冲区, 随进程的退出而释放销毁。
- 管道通通信是面向字节流的,读写次数是可以不匹配的,读到的数据可能是单次残缺的,也可能是多次堆积的
- 管道通信是特殊的半双工模式,半双工是指支持读写,但不能同时读写,特殊在只支持信息的单向传递。
三、代码实现
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/wait.h>
#include<sys/types.h>
// 定义任务函数指针类型
typedef void (*task_t)();
#define taskNum 3 // 定义任务数量
int n; // 进程数量
task_t tasks[taskNum]; // 任务函数指针数组
struct channel
{
int _wfd; // 管道写端文件描述符
pid_t _task; // 子进程ID
std::string _name; // 通道名称
// 构造函数
channel(int wfd, pid_t task, const std::string& name = "channel")
:_wfd(wfd),
_task(task),
_name(name)
{}
// 关闭管道写端
void Close()
{
close(_wfd);
}
};
std::vector<channel> v; // 通道向量
// 打印任务
void print()
{
std::cout << "this is print task" << std::endl;
}
// 下载任务
void download()
{
std::cout << "this is download task" << std::endl;
}
// 刷新任务
void flush()
{
std::cout << "this is flush task" << std::endl;
}
// 加载任务
void loadTask()
{
tasks[0] = print;
tasks[1] = download;
tasks[2] = flush;
}
// 执行任务
void ExecuteTask()
{
int read_num = 0;
int task_index = 0;
while (true)
{
read_num = read(0, &task_index, sizeof(int));
if (read_num == -1)
{
std::cerr << "管道读取失败!错误码:" << errno << std::endl;
exit(-1);
}
else if (read_num == 0) // 读到0代表写端关闭,直接停止
{
std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;
exit(0);
}
else
{
if (task_index >= 0 && task_index < 3)
{
std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;
tasks[task_index]();
}
else
{
std::cerr << "无效的任务索引:" << task_index << std::endl;
}
}
}
}
// 初始化通道
void channelInit()
{
for(int i = 0; i < n; i++)
{
int pipefd[2];
pipe(pipefd);
pid_t id = fork();
if(id < 0)
return;
else if(id == 0)
{
close(pipefd[1]);
dup2(pipefd[0], 0);
for(int i = 0; i < v.size(); i++)
{
v[i].Close();
}
ExecuteTask();
}
std::string channel_name = "channel_"+std::to_string(i);
close(pipefd[0]);
v.push_back(channel(pipefd[1], id, channel_name));
}
}
// 获取下一个通道索引
int nextChannel()
{
static int next = 0;
int channel = next;
next++;
next %= n;
return channel;
}
// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{
write(chan._wfd, &taskCommand, sizeof(taskCommand));
}
// 选择任务
int selectTask()
{
return rand() % taskNum;
}
// 控制进程执行一次
void ctrlProcessOnce()
{
int taskCommand = selectTask();
int channel_index = nextChannel();
sendTaskCommand(v[channel_index], taskCommand);
}
// 控制进程执行多次
void ctrlProcess(int times = -1)
{
while(times--)
{
ctrlProcessOnce();
sleep(1);
}
}
// 关闭通道并等待子进程退出
void channelClose()
{
int status = 0;
for(int i = 0; i < n; i++)
{
v[i].Close();
wait(&status);
int exit_status = (status&0x7f);
int exit_code = ((status>>8)&0xff);
std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;
std::cout << std::endl;
}
}
// 主函数
int main()
{
loadTask();
std::cout << "请输入你要创建的进程个数"<<std::endl;
std::cin >> n;
int count = -1;
std::cout << "请输入要执行程序的次数" << std::endl;
std::cin >> count;
channelInit();
ctrlProcess(count);
channelClose();
sleep(3);
return 0;
}
四、实现代码详解
main()
负责执行初始化任务、创建进程池、控制进程执行任务,以及关闭通道并等待子进程退出。
#include<iostream>
#include<unistd.h>
#include<vector>
#include<string>
#include<sys/types.h>
typedef void (*task_t)();
#define taskNum 3
int n;
task_t tasks[taskNum];
struct channel
{
int _wfd;
pid_t _task;
std::string _name;
channel(int wfd, pid_t task, const std::string& name = "channel")
:_wfd(wfd),
_task(task),
_name(name)
{}
void Close()
{
close(_wfd);
}
};
std::vector<channel> v; // 存储创建的子进程信息,包括读wid/进程id/进程名称
// 需要执行的三个函数
void print()
{
std::cout << "this is print task" << std::endl;
}
void download()
{
std::cout << "this is download task" << std::endl;
}
void flush()
{
std::cout << "this is flush task" << std::endl;
}
int main()
{
std::cout << "请输入你要创建的进程个数"<<std::endl;
std::cin >> n;
loadTask(); // 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中
channelInit();
ctrlProcess();
return 0;
}
2、loadTask()
将函数通过函数指针将不同的任务函数加载到数组中,以便后续可以通过索引来调用相应的任务
// 通过函数指针解耦调用函数,将需要运行的函数放入函数指针数组中
void loadTask()
{
tasks[0] = print;
tasks[1] = download;
tasks[2] = flush;
}
3、channelInit()
创建子进程并让子进程read阻塞等待任务。子进程通过管道的读端来接收任务索引,并执行相应的任务函数。
void channelInit()
{
for(int i = 0; i < n; i++)
{
int pipefd[2];
pipe(pipefd);
pid_t id = fork();
if(id < 0)
return ;
else if(id == 0)
{
close(pipefd[1]);
dup2(pipefd[0], 0); // 为什么要重定向?——实现监听通道和子进程的解耦
// 重定向后只需要固定监听0号文件描述符收到的文件即可,无需根据特定子进程监听不同的wid
for(int i = 0; i < v.size(); i++)
{
v[i].Close(); //子进程必须关闭继承父进程的、多余的wfd
}
ExecuteTask(); // 子进程在此阻塞等待
}
std::string channel_name = "channel_"+std::to_string(i);
close(pipefd[0]);
v.push_back(channel(pipefd[1], id, channel_name));
}
}
// 监听被重定向的wid 获得函数指针数组下标,执行相应函数
void ExecuteTask()
{
int read_num = 0;
int task_index = 0;
while (true)
{
read_num = read(0, &task_index, sizeof(int));
if (read_num == -1)
{
std::cerr << "管道读取失败!错误码:" << errno << std::endl;
exit(-1);
}
else if (read_num == 0) // 读到0代表写端关闭,直接停止
{
std::cout << "子进程 " << getpid() << " 任务读取完成或管道关闭,退出。" << std::endl;
break;
}
else
{
if (task_index >= 0 && task_index < 3)
{
std::cout << "子进程 " << getpid() << " 执行任务 " << task_index << std::endl;
tasks[task_index]();
}
else
{
std::cerr << "无效的任务索引:" << task_index << std::endl;
}
}
}
}
问题:为什么要将子进程的管道读端重定向至标准输入?
实现监听通道和子进程的解耦。每个子进程的读端rid不同,后续read监听需要根据不同子进程调整rid。而重定向后只需要固定监听0号文件描述符收到的文件即可,实现了不同子进程监听同一个文件描述符的解耦。
4、ctrlProcess()
父进程根据轮询策略向子进程发送任务,控制子进程执行任务。
// 获取下一个通道索引
int nextChannel()
{
static int next = 0;
int channel = next;
next++;
next %= n;
return channel;
}
// 向通道发送任务命令
void sendTaskCommand(channel& chan, int taskCommand)
{
write(chan._wfd, &taskCommand, sizeof(taskCommand));
}
// 选择任务
int selectTask()
{
return rand() % taskNum;
}
// 控制进程执行一次
void ctrlProcessOnce()
{
int taskCommand = selectTask();
int channel_index = nextChannel();
sendTaskCommand(v[channel_index], taskCommand);
}
// 控制进程执行多次
void ctrlProcess(int times = -1)
{
while(times--)
{
ctrlProcessOnce();
sleep(1);
}
}
5、channelClose()
用于关闭所有通道,并等待所有子进程退出,同时获取子进程的退出状态和退出码。
// 关闭通道并等待子进程退出
void channelClose()
{
int status = 0;
for(int i = 0; i < n; i++)
{
v[i].Close();
wait(&status);
int exit_status = (status&0x7f);
int exit_code = ((status>>8)&0xff);
std::cout << "子进程 " << getpid() << " 的退出状态为 " << exit_status << " 退出码为 " << exit_code << std::endl;
std::cout << std::endl;
}
}