【Linux】使用管道实现一个简易版本的进程池
文章目录
- 使用管道实现一个简易版本的进程池
- 流程图
- 代码
- makefile
- Task.hpp
- ProcessPool.cc
- 程序流程:
使用管道实现一个简易版本的进程池
流程图
代码
makefile
ProcessPool:ProcessPool.cc
g++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:
rm -f ProcessPool
Task.hpp
#pragma once
#include <iostream>
#include <vector>
typedef void (*task_t)(); //定义了一个函数指针类型task_t,它指向返回类型为void且不接受任何参数的函数。
void task1()
{
std::cout << "lol 刷新日志" << std::endl;
}
void task2()
{
std::cout << "lol 更新野区,刷新出来野怪" << std::endl;
}
void task3()
{
std::cout << "lol 检测软件是否更新,如果需要,就提示用户" << std::endl;
}
void task4()
{
std::cout << "lol 用户释放技能,更新用的血量和蓝量" << std::endl;
}
void LoadTask(std::vector<task_t> *tasks) // 该函数接受一个指向std::vector<task_t>的指针,并将其作为参数
{
tasks->push_back(task1); //将task1函数的地址添加到向量中。
tasks->push_back(task2);
tasks->push_back(task3);
tasks->push_back(task4);
}
ProcessPool.cc
#include "Task.hpp" // 包含任务相关的头文件
#include <string>
#include <vector>
#include <cstdlib>
#include <ctime>
#include <cassert>
#include <unistd.h>
#include <sys/stat.h>
#include <sys/wait.h>
const int processnum = 10; // 设定进程池大小为10
std::vector<task_t> tasks; // 存储任务的向量
// 定义channel类,用于管理进程间通信
class channel
{
public:
channel(int cmdfd, int slaverid, const std::string &processname)
:_cmdfd(cmdfd), _slaverid(slaverid), _processname(processname)
{}
public:
int _cmdfd; // 用于向子进程发送命令的文件描述符
pid_t _slaverid; // 子进程ID
std::string _processname; // 子进程名称,用于日志显示
};
// 子进程执行的函数
void slaver()
{
while(true)
{
int cmdcode = 0;
// 从标准输入(被重定向到管道)读取命令
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应的任务
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(n == 0) break; // 管道关闭时退出
}
}
// 初始化进程池
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfds; // 存储历史文件描述符
for(int i = 0; i < processnum; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
assert(!n);
(void)n;
pid_t id = fork(); // 创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " "; // 打印当前文件描述符的值,用于显示子进程正在关闭哪些文件描述符。
close(fd); // 关闭文件描述符
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
}
}
// 打印调试信息
void Debug(const std::vector<channel> &channels)
{
for(const auto &c :channels)
{
std::cout << c._cmdfd << " " << c._slaverid << " " << c._processname << std::endl;
}
}
// 显示菜单
void Menu()
{
std::cout << "################################################" << std::endl;
std::cout << "# 1. 刷新日志 2. 刷新出来野怪 #" << std::endl;
std::cout << "# 3. 检测软件是否更新 4. 更新用的血量和蓝量 #" << std::endl;
std::cout << "# 0. 退出 #" << std::endl;
std::cout << "#################################################" << std::endl;
}
// 控制子进程执行任务
void ctrlSlaver(const std::vector<channel> &channels)
{
int which = 0;
while(true)
{
int select = 0;
Menu();
std::cout << "Please Enter@ ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
int cmdcode = select - 1;
// 轮询方式分配任务给子进程
std::cout << "father say: " << " cmdcode: " <<
cmdcode << " already sendto " << channels[which]._slaverid << " process name: "
<< channels[which]._processname << std::endl;
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
which++;
which %= channels.size();
}
}
// 清理进程池
void QuitProcess(const std::vector<channel> &channels)
{
for(const auto &c : channels){
close(c._cmdfd); // 关闭所有管道
waitpid(c._slaverid, nullptr, 0); // 等待所有子进程结束
}
}
int main()
{
LoadTask(&tasks); // 加载任务列表
srand(time(nullptr)^getpid()^1023); // 初始化随机数种子
std::vector<channel> channels; //
InitProcessPool(&channels); // 初始化进程池
ctrlSlaver(channels); // 控制子进程执行任务
QuitProcess(channels); // 清理进程池
return 0;
}
程序流程:
1.main
函数首先调用LoadTask(&tasks)
,将task1
到task4
四个任务的函数地址存入全局tasks
向量。
2.srand(time(nullptr)^getpid()^1023);
初始化随机数种子
3.std::vector<channel> channels;
,这行代码的作用是定义一个名为 channels
的向量(std::vector
),用于存储 channel
类型的对象。它的主要作用是管理多个 channel
对象,每个 channel
对象代表一个子进程的通信通道。
-
每个
channel
对象包含以下信息:-
_cmdfd
:用于向子进程发送命令的文件描述符(管道写端)。 -
_slaverid
:子进程的进程ID(PID)。 -
_processname
:子进程的名称,用于日志和调试。
-
-
channels
向量存储了所有子进程的通信信息,父进程可以通过它管理所有子进程。
4.InitProcessPool(&channels);
,初始化进程池
// 初始化进程池
void InitProcessPool(std::vector<channel> *channels)
{
std::vector<int> oldfds; // 存储历史文件描述符
for(int i = 0; i < processnum; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
assert(!n);
(void)n;
pid_t id = fork(); // 创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " ";
close(fd);
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
}
}
5.std::vector<int> oldfds;
的作用是存储父进程中已经创建的管道的写端文件描述符(pipefd[1]
)。它的主要目的是在创建新的子进程时,确保子进程能够关闭不需要的文件描述符,避免资源泄露和潜在的问题。
为什么需要
oldfds
?
文件描述符的继承:
当父进程通过
fork()
创建子进程时,子进程会继承父进程的所有打开的文件描述符。如果父进程创建了多个管道(每个子进程对应一个管道),那么每个子进程都会继承所有管道的文件描述符,即使这些管道是用于其他子进程的。
资源泄露问题:
如果子进程不关闭不需要的文件描述符,这些文件描述符会一直保持打开状态,导致资源泄露。
例如,假设父进程创建了 10 个子进程,每个子进程都会继承 10 个管道的文件描述符,但实际上每个子进程只需要一个管道的读端文件描述符。
避免干扰:
- 如果子进程不关闭不需要的文件描述符,可能会导致意外的行为。例如,某个子进程可能会错误地读取其他子进程的管道数据。
6.for(int i = 0; i < processnum; i++)
,循环 processnum=10
次,每次创建一个子进程和一个管道。
7.int pipefd[2];
pipefd
是一个长度为 2 的整型数组,用于存储管道的两个文件描述符:
pipefd[0]
:管道的 读端文件描述符,用于从管道中读取数据。pipefd[1]
:管道的 写端文件描述符,用于向管道中写入数据。
8.int n = pipe(pipefd);
调用 pipe
系统函数来创建一个管道,并将结果存储在变量 n
中。
1.
pipe
系统函数的作用
pipe
是一个系统调用,用于创建一个管道。管道的本质是一个内核缓冲区,用于在两个进程之间传递数据。管道有两个端点:
- 读端:用于从管道中读取数据。
- 写端:用于向管道中写入数据。
pipe
函数的原型如下:int pipe(int pipefd[2]);
2. 参数
pipefd[2]
pipefd
是一个长度为 2 的整型数组,用于存储管道的两个文件描述符:
pipefd[0]
:管道的 读端文件描述符,用于从管道中读取数据。pipefd[1]
:管道的 写端文件描述符,用于向管道中写入数据。
3. 返回值
n
- 如果
pipe
调用成功,返回0
。- 如果
pipe
调用失败,返回-1
,并设置errno
表示错误原因。
4. 代码解析
int n = pipe(pipefd);
pipe(pipefd)
:调用pipe
函数创建管道。n
:存储pipe
函数的返回值,用于检查管道是否创建成功。
9.assert(!n);
,(void)n;
assert(!n)
:确保管道创建成功。如果pipe
调用失败,程序会终止。(void)n
:忽略未使用的变量警告。
10.pid_t id = fork();
,创建子进程
if(id == 0) // 子进程
{
// 关闭历史文件描述符
std::cout << "child: " << getpid() << " close history fd: ";
for(auto fd : oldfds) {
std::cout << fd << " ";
close(fd);
}
std::cout << "\n";
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], 0); // 将管道读端重定向到标准输入
close(pipefd[0]); //关闭读端
slaver(); // 执行子进程任务
std::cout << "process : " << getpid() << " quit" << std::endl;
exit(0);
}
11.在子进程中,id
为 0。
12.std::cout << "child: " << getpid() << " close history fd: ";
打印当前子进程的PID,用于区分不同子进程
" close history fd: "
,说明接下来要关闭的文件描述符
for(auto fd : oldfds) {
std::cout << fd << " ";// 打印当前文件描述符的值,用于显示子进程正在关闭哪些文件描述符。
close(fd);// 关闭文件描述符
}
在子进程中遍历 oldfds
向量,关闭所有不需要的文件描述符。
具体来说,它的目的是确保子进程只保留与自己相关的文件描述符,关闭其他无关的文件描述符,从而避免资源泄露和潜在的问题。
close(pipefd[1]); // 子进程关闭写端,因为子进程只需要读取命令
dup2(pipefd[0], 0); // 将父进程管道读端重定向到标准输入
close(pipefd[0]); //关闭父进程读端
slaver(); // 执行子进程任务
dup2函数将管道的读端(pipefd[0])复制到标准输入(0)
这意味着之后从标准输入读取的数据实际上是从管道读取的
后续代码中可以直接使用read(0,…)来读取父进程发送的数据
数据流向:
父进程 ---> 写端(pipefd[1]) ---> 管道 ---> 读端(重定向到标准输入) ---> 子进程
子进程:
- 关闭写端(pipefd[1])
- 将读端重定向到标准输入
- 关闭原读端(因为已重定向)
15.进入子进程函数
// 子进程执行的函数
void slaver()
{
while(true)
{
int cmdcode = 0;
// 从标准输入(被重定向到管道)读取命令
int n = read(0, &cmdcode, sizeof(int));
if(n == sizeof(int))
{
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应的任务
if(cmdcode >= 0 && cmdcode < tasks.size()) tasks[cmdcode]();
}
if(n == 0) break; // 管道关闭时退出
}
}
while(true)
,无限循环,持续监听命令
int cmdcode = 0;
int n = read(0, &cmdcode, sizeof(int));
read(0, …):从标准输入读取数据,因为前面做了重定向,实际是从管道读取
&cmdcode:存储读取数据的地址
sizeof(int):读取int大小的数据
n:返回实际读取的字节数
if(n == sizeof(int)) { // 成功读取到完整的命令
// 打印调试信息
std::cout <<"slaver say@ get a command: "<< getpid() << " : cmdcode: " << cmdcode << std::endl;
// 执行对应任务
if(cmdcode >= 0 && cmdcode < tasks.size())
tasks[cmdcode](); // 调用任务函数
}
if(cmdcode >= 0 && cmdcode < tasks.size())
,确保cmdcode
非负,确保cmdcode
小于任务数组大小,防止数组越界访问
tasks[cmdcode]();
,tasks[cmdcode]
获取对应的函数指针,()
操作符调用该函数。
// 假设cmdcode = 0
tasks[0](); // 调用task1(),输出"lol 刷新日志"
// 假设cmdcode = 1
tasks[1](); // 调用task2(),输出"lol 更新野区,刷新出来野怪"
// 假设cmdcode = 2
tasks[2](); // 调用task3(),输出"lol 检测软件是否更新"
// 假设cmdcode = 3
tasks[3](); // 调用task4(),输出"lol 更新用户血量和蓝量"
if(n == 0) break;
,管道关闭时退出
16.slaver()
结束,返回刚刚的
std::cout << "process : " << getpid() << " quit" << std::endl; //打印退出信息,getpid帮助我们确认哪个进程正在退出
exit(0); // 立即终止当前进程
17.然后执行InitProcessPool()
函数的剩下来部分
// 父进程
close(pipefd[0]); // 关闭读端
// 创建新的channel并添加到channels中
std::string name = "process-" + std::to_string(i);
channels->push_back(channel(pipefd[1], id, name));
oldfds.push_back(pipefd[1]);
sleep(1);
close(pipefd[0]);
,父进程只需写入命令,不需要读。及时关闭不需要的文件描述符
std::string name = "process-" + std::to_string(i);
,为每个子进程创建唯一名称。
std::to_string(i) : 将数字i转为字符串,“+” : 字符串拼接运算符。
效果如:process-0, process-1, process-2…
channels->push_back(channel(pipefd[1], id, name));
,push_back
在容器末尾添加新元素。创建临时 channel 对象并添加到 vector
channel
是一个结构体,存储子进程信息:
void InitProcessPool(std::vector<channel> *channels)
struct channel {
int fd; // 管道写端
pid_t pid; // 子进程ID
std::string name; // 进程名称
channel(int _fd, pid_t _pid, const std::string& _name)
: fd(_fd), pid(_pid), name(_name)
{}
};
oldfds.push_back(pipefd[1]);
,添加管道写端的文件描述符。
保存文件描述符的用途:
- 用于后续关闭文件描述符
- 防止文件描述符泄漏
- 进程间通信的管理
- 资源清理
sleep(1);
,休眠1s。
18.进入main函数,执行ctrlSlaver(channels);
// 控制子进程执行任务
void ctrlSlaver(const std::vector<channel> &channels)
{
int which = 0;
while(true)
{
int select = 0;
Menu();
std::cout << "Please Enter@ ";
std::cin >> select;
if(select <= 0 || select >= 5) break;
int cmdcode = select - 1;
// 轮询方式分配任务给子进程
std::cout << "father say: " << " cmdcode: " <<
cmdcode << " already sendto " << channels[which]._slaverid << " process name: "
<< channels[which]._processname << std::endl;
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
which++;
which %= channels.size();
}
}
轮询机制
int which = 0; // 轮询索引
which++;
which %= channels.size(); // 循环轮询
实现了循环分配任务给不同子进程
如果有3个进程,which的值会是 0,1,2,0,1,2…
任务选择
while(true) {
int select = 0;
Menu(); // 显示菜单
std::cout << "Please Enter@ ";
std::cin >> select; // 获取用户输入
if(select <= 0 || select >= 5) break; // 退出条件
int cmdcode = select - 1; // 将用户输入的选项编号转换为程序内部使用的命令代码。
}
发送任务示例
// 显示任务分配信息
std::cout << "father say: " << " cmdcode: " << cmdcode
<< " already sendto " << channels[which]._slaverid
<< " process name: " << channels[which]._processname << std::endl;
// 向子进程发送命令
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
cmdcode要执行的命令编号(0代表hello,1代表calc等)
_slaverid: 子进程的PID(进程ID)
_processname: 子进程的名称
write(channels[which]._cmdfd, &cmdcode, sizeof(cmdcode));
channels[which]._cmdfd:管道的写端文件描述符
&cmdcode:命令代码的地址
sizeof(cmdcode):发送的字节数(int类型通常是4字节)
19.返回主函数,执行QuitProcess(channels);
,清理进程池。
void QuitProcess(const std::vector<channel> &channels)
{
// 遍历所有channel对象
for(const auto &c : channels){
// 1. 关闭管道
close(c._cmdfd); // 关闭管道写端
// 2. 等待子进程结束
waitpid(c._slaverid, nullptr, 0); // 阻塞等待直到子进程结束
}
}
20.return 0;