Linux——进程池
前言:大佬写博客给别人看,菜鸟写博客给自己看,我是菜鸟。
1.实现思路
思路:通过创建匿名管道,来实现父子进程之间的通信
注1:父写,子读
注2:匿名管道只能用来进行具有血管关系的进程进行进程间的通信
①.父进程通过for循环创建五个管道和子进程,管道与子进程间一一对应。五个子进程处于阻塞状态,等待父进程向管道内写入数据。
注:父进程在创建子进程时,子进程会继承父进程的所有数据,包括管道。
②.通过自定义类Channel,管理类成员 pipefd[0]和子进程的pid;
注:当管道创建时,同一管道间的pipe[0]和pipe[1]是一一对应的,即假设我向管道1写入,那么子进程则会从管道1读取数据;
③.大体结构——创建三个自定义类
Ⅰ.管道(Channel):
成员包含:pipe[0]以及对应子进程,建立管道和子进程间一一对应的关系;
函数包含:向管道写入数据、管道关闭、等待子进程
Ⅱ.管道管理(Channel),
成员包含:通过vector<Channel> _channel 对管道数据进行管理;
函数包含:管道选取、管道关闭、子进程回收。
Ⅲ.进程池(ProcessPool),
成员包含:管道管理实例化对象_cm、任务管理实例化对象_tm、进程数量/管道数量;
函数包含:进程池初始化/创建、子进程阻塞等待管道写入并执行、执行任务
注:父进程创建管道后(该管道会返回文件描述符,假设为3,4),再创建子进程,子进程能够进程父进程的管道。因为父进程负责写入,因此需要把读取端口关掉,假设关闭3。这样父进程就通过文件描述符4向管道写入数据。而整个管道的创建过程是循环创建的,当父进程再次创建管道时,此时文件描述符为3、5,重复刚才的操作,这样父进程就能通过文件描述符5向管道写入数据,直至所有管道创建完毕。
问:既然子进程会继承父进程,那在第二次或者后面的循环过程中,子进程会不会继承父进程的写端?从而实现子进程向管道写入?
答:会!
问:这样子会出现什么问题?
答:当我们通过父进程关闭管道,回收子进程时,你以为管道已经关闭了,但实际上还有子进程能够向这个管道进行写入操作,实际上管道未关闭,子进程并不会退出,而是会阻塞。
解决措施1:从后往前关闭管道,对于最后一个管道而言,没有子进程能够向其写入,父进程是唯一的写入端,因此关闭这个管道后,子进程能够顺利关闭,因为信道关闭了,子进程也就退出了,而当前子进程能够向上一个管道写入,但是当前子进程退出了,因此上一个管道的写入端就少了一个(实际上关闭当前子进程后,上一个管道的写入端就只剩父进程一个了,这样循环向上,就能够确保通过父进程能够关闭所有的管道,从而使得所有的子进程关闭而非处于阻塞状态)
解决措施2:创建子进程后,在处理子进程时,遍历vector<Channel> _channel,因为内部存储了所有管道对应的写端,然后在子进程内调用 _channel.Close();关闭所有写端
问:这样调用,子进程是否会关闭父进程的管道?
答:不会,父子进程相互独立。父子进程如果要修改原始数据会发生写实拷贝。
匿名管道的创建方式:
int pipefd[2] = {0};
int n = pipe(pipefd);
其中
pipefd[0]为输入\向管道读
pipefd[1]为输出\从管道写
2.实现代码
2.1Main.cc
主要分三部分:
①.进程池初始化
②.执行任务
③.回收、结束进程池
int main()
{
// 创建进程池对象
ProcessPool pp(gdefaultnum);
// 启动进程池
pp.Start();
// 自动派发任务
int cnt = 10;
while(cnt--)
{
pp.Run();
sleep(1);
}
// 回收,结束进程池
pp.Stop();
return 0;
}
2.2进程池(ProcessPool)
2.2.1进程池大体成员以及函数
class ProcessPool
{
public:
ProcessPool(int num) : _process_num(num)
{
//进程池创建时,等级需要执行的任务,这部分通过另一个自定义类来实现
}
void Work(int rfd)
{
//子进程阻塞等待父进程向管道写入数据,读取后执行相应任务
}
bool Start()
{
//进程池初始化
}
void Run()
{
//执行任务
}
void Stop()
{
//关闭管道、回收子进程
}
~ProcessPool()
{}
private:
ChannelManager _cm;
int _process_num;
TaskManager _tm;
};
2.2.2任务登记/管理
TaskManager自定义类:
通过函数指针/随机函数来实现任务的随机选取
typedef void (*task_t)(); //函数指针 void Task1() { std::cout << "印日志的任务" << std::endl; } void Task2() { std::cout << "下载的任务" << std::endl; } void Task3() { std::cout << "上传的任务" << std::endl; } class TaskManager { public: TaskManager() { srand(time(nullptr));//生成随机数种子 } void Register(task_t t) { _tasks.push_back(t);//将当前函数指针插入到vector数组中 } int Code() { return rand() % _tasks.size();//返回一个随机数,大小不超过最大任务数量-1 } void Execute(int code) //根据code执行相应任务 { if(code >= 0 && code < _tasks.size()) { _tasks[code]();//回调函数 } } ~TaskManager() {} private: std::vector<task_t> _tasks; };
开始时将所有任务插入vector<task_t> _tasks中:
ProcessPool(int num) : _process_num(num) { _tm.Register(Task1); _tm.Register(Task2); _tm.Register(Task3); }
注:这部分可以自定义,写你想要实现的任务即可。
2.2.3进程池初始化
大致思路为:外循环循环五次,
①.每次创建一个管道
②.每次创建一个子进程
③.关闭父子进程相应的管道,父写子读
④.子进程执行阻塞等待 / 父进程执行将当前通道信息(pipefd[1])和对应子进程subid通过ChannelManager,插入到verctor<Channel> _channels 中。
bool Start() { for (int i = 0; i < _process_num; i++) { // 1. 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) return false; // 2. 创建子进程 pid_t subid = fork(); if (subid < 0) return false; else if (subid == 0) { // 子进程 //关闭不需要的写端 _cm.CloseAll(); // 3. 关闭不需要的文件描述符 close(pipefd[1]); Work(pipefd[0]); close(pipefd[0]); exit(0); } else { // 父进程 // 3. 关闭不需要的文件描述符 close(pipefd[0]); // 写端:pipefd[1]; _cm.Insert(pipefd[1], subid); } } return true; }
注:上述循环过程结束后,通过_cm.Insert();创建了五个自定义类_channels,这五个_channels中,包含了:
1.各自的管道
2.对应的子进程pid
后续在关闭管道后,可以通过pid来回收子进程(对应管道关闭了,子进程就没有存在的必要了)
2.2.4子进程阻塞等待父进程写入
void Work(int rfd) { while (true) { int code = 0; ssize_t n = read(rfd, &code, sizeof(code)); if (n > 0) { if (n != sizeof(code)) { continue; } std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl; _tm.Execute(code); } else if (n == 0) { std::cout << "子进程退出" << std::endl; break; } else { std::cout << "读取错误" << std::endl; break; } } }
注:read返回值的含义:
当 n > 0: 成功读取数据,n表示实际读取的字节数
当 n == 0:管道关闭或者没有更多数据可读,表示读到了流的末尾
当 n < 0:读取出错
如果管道内没有数据,read()调用会阻塞,直到有数据可读取。因此不会直接调用n==0的逻辑。
如果管道关闭或没有更多数据,read返回0,进入 n==0 的逻辑,表示子进程退出
2.2.5父进程向管道写入数据
void Run()
{
// 1. 选择一个任务
int taskcode = _tm.Code();//返回一个随机数,随机数不大于总任务个数
// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务
auto &c = _cm.Select();
// 3. 向管道写入数据/发送任务,然后2.2.4中的子进程会读取到管道中的数据,并开始执行任务
c.Send(taskcode);
}
2.3管道(Channel)
2.3.1管道大体成员以及函数
class Channel
{
public:
Channel(int fd, pid_t id) : _wfd(fd), _subid(id)
{
//实例化对象中包含写入管道的接口以及子进程pid
}
~Channel()
{}
void Send(int code)
{
//向管道写入数据
int n = write(_wfd, &code, sizeof(code));
(void)n; // ?
}
void Close()
{
//关闭管道
close(_wfd);
}
void Wait()
{
//进程等待
pid_t rid = waitpid(_subid, nullptr, 0);
(void)rid;
}
private:
int _wfd;
pid_t _subid;
};
2.4管道管理(ChannelManager)
2.4.1管道管理大体成员以及函数
class ChannelManager
{
public:
ChannelManager() : _next(0)
{
}
void Insert(int wfd, pid_t subid)
{
//
_channels.emplace_back(wfd, subid);
}
//管道选取
Channel &Select()
{
auto &c = _channels[_next];
_next++;
_next %= _channels.size();
return c;
}
//管道关闭
//void StopSubProcess()
//{
// for (auto &channel : _channels)
// {
// channel.Close();
// std::cout << "关闭: " << channel.Name() << std::endl;
// }
//}
//回收子进程
//void WaitSubProcess()
//{
// for (auto &channel : _channels)
// {
// channel.Wait();
// std::cout << "回收: " << channel.Name() << std::endl;
// }
//}
//关闭子进程中多余的写窗口
void CloseAll()
{
for(auto& C : _channel)
{
C.Close();
}
{
//从后往前关
void CloseAndWait()
{
for (int i = _channel.size()-1; i>=0; i--)
{
channel.Close();
std::cout << "关闭: " << channel.Name() << std::endl;
channel.Wait();
std::cout << "回收: " << channel.Name() << std::endl;
}
}
~ChannelManager() {}
private:
std::vector<Channel> _channels;//通过vector<Channel> 将实例化的管道管理起来
int _next;
};
3.命名管道
匿名管道通过父子进程之间的继承关系,能够使得父子进程间看到同一份资源。
问:那么对于两个没有任何血缘关系的进程而言,如何能看到同一份资源?
答:通过命名管道——打开同一个路径下的同一个文件,文件有路径,路径具有唯一性
3.1命名管道的创建
int mkfifo ( const char *filename, mode_t mode);例: mkfifo( "fifo" , 0644 );
3.2匿名管道和命名管道的区别
①:匿名管道由pipe函数创建并打开
②:命名管道由mkfifo创建,并由open打开
注:这样一看,命名管道是不是和文件很相似?