进程间通信-进程池
目录
理解
完整代码
完善代码
回收子进程:
不回收子进程:
子进程使用重定向优化
理解
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
void work(int rfd)
{
}
// master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name; // 信道名字
public:
Channel(int wfd, pid_t id, const std::string name) // 构造函数
: _wfd(wfd), _subprocessid(id), _name(name) // 构造函数初始化列表
{
}
int getfd() { return _wfd; }
int getid() { return _subprocessid; }
std::string getname() { return _name; }
~Channel() // 析构函数
{
}
};
// ./processpool 5
int main(int argc, char *argv[])
{
std::vector<Channel> channels;
if (argc != 2) // 说明不用创建子进程,不会用
{
std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
for(int i=0;i<num;i++)
{
int pipefd[2];
int n=pipe(pipefd);//创建管道
if(n<0)exit(1);//创建管道失败,那么就无需与子进程通信了
pid_t id=fork();//创建子进程
if(id==0){//子进程不用创建子进程,只需父进程即可
//child --r
close(pipefd[1]);
work(pipefd[0]);//进行工作
exit(0);
}
//farther --w
close(pipefd[0]);
//a.此时父进程已经有了子进程的pid b.父进程的w端
std::string channel_name="channel-"+std::to_string(i);//构建一个channel名称
channels.push_back(Channel(pipefd[1],id,channel_name));
}
// test
for (auto &channel : channels)
{
std::cout << "***************************************" << std::endl;
std::cout << channel.getname() << std::endl; // 取出
std::cout << channel.getid() << std::endl; // 取出
std::cout << channel.getfd() << std::endl; // 取出
}
return 0;
}
完整代码
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <sys/types.h>
#include "task.hpp"
void work(int rfd)
{
while (true)
{
int command = 0;
int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
if (n == sizeof(int))
{
std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
excuttask(command); // 执行
}
}
}
// master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name; // 信道名字
public:
Channel(int wfd, pid_t id, const std::string name) // 构造函数
: _wfd(wfd), _subprocessid(id), _name(name) // 构造函数初始化列表
{
}
int getfd() { return _wfd; }
int getid() { return _subprocessid; }
std::string getname() { return _name; }
~Channel() // 析构函数
{
}
};
// 形参类型和命名规范
// const & :输出
// & :输入输出型参数
// * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
for (int i = 0; i < num; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
if (n < 0)
exit(1); // 创建管道失败,那么就无需与子进程通信了
pid_t id = fork(); // 创建子进程
if (id == 0)
{ // 子进程不用创建子进程,只需父进程即可
// child --r
close(pipefd[1]);
work(pipefd[0]); // 进行工作
exit(0);
}
// farther --w
close(pipefd[0]);
// a.此时父进程已经有了子进程的pid b.父进程的w端
std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
int nextchannel(int channelnum)
{ // 形成一个0 1 ...到channelnum的编号
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void sendtask(Channel &channel, int taskcommand)
{
write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2) // 说明不用创建子进程,不会用
{
std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
inittask(); // 装载任务
std::vector<Channel> channels;
creatchannelandsun(num, &channels); // 创建信道和子进程
// 通过channel控制子进程
while (true)
{
sleep(1);//每隔一秒发布一个
// 第一步选择一个任务
int taskcommand = selecttask();
// 第二步选择一个信道和进程,其实是在vector中选择
int index_channel = nextchannel(channels.size());
// 第三步发送任务
sendtask(channels[index_channel], taskcommand);
std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
}
return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>
#define tasknum 3
typedef void (*task_t)();//task_t 返回值为void,参数为空的函数指针
void print(){//三个任务列表
std::cout<<"i am a printf task"<<std::endl;
}
void download(){
std::cout<<"i am a download task"<<std::endl;
}
void flush(){
std::cout<<"i am a flush task"<<std::endl;
}
task_t tasks[tasknum];//函数指针数组
void inittask(){//初始化任务
srand(time(nullptr)^getpid());//种时间和pid随机种子
//srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
//seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
//time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
//用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
tasks[0]=print;
tasks[1]=download;
tasks[2]=flush;
}
void excuttask(int n){//执行任务
if(n<0||n>2)return;
tasks[n]();//调用
}
int selecttask(){//随机选择任务
return rand()%tasknum;
}
完善代码
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include<sys/wait.h>
#include <sys/types.h>
#include "task.hpp"
void work(int rfd)
{
while (true)
{
int command = 0;
int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
if (n == sizeof(int))
{
std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
excuttask(command); // 执行
}
else if(n==0){
std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
break;
}
}
}
// master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name; // 信道名字
public:
Channel(int wfd, pid_t id, const std::string name) // 构造函数
: _wfd(wfd), _subprocessid(id), _name(name) // 构造函数初始化列表
{
}
int getfd() { return _wfd; }
int getid() { return _subprocessid; }
std::string getname() { return _name; }
void closechannel(){//关闭文件描述符
close(_wfd);
}
void wait(){
pid_t n=waitpid(_subprocessid,nullptr,0);
if(n>0){
std::cout<<"wait "<<n<<" success"<<std::endl;
}
}
~Channel() // 析构函数
{
}
};
// 形参类型和命名规范
// const & :输出
// & :输入输出型参数
// * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
for (int i = 0; i < num; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
if (n < 0)
exit(1); // 创建管道失败,那么就无需与子进程通信了
pid_t id = fork(); // 创建子进程
if (id == 0)
{ // 子进程不用创建子进程,只需父进程即可
// child --r
close(pipefd[1]);
work(pipefd[0]); // 进行工作
exit(0);
}
// farther --w
close(pipefd[0]);
// a.此时父进程已经有了子进程的pid b.父进程的w端
std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
int nextchannel(int channelnum)
{ // 形成一个0 1 ...到channelnum的编号
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void sendtask(Channel &channel, int taskcommand)
{
write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}
void ctrlprocessonce(std::vector<Channel> &channels){//只做一次任务
sleep(1);//每隔一秒发布一个
// 第一步选择一个任务
int taskcommand = selecttask();
// 第二步选择一个信道和进程,其实是在vector中选择
int index_channel = nextchannel(channels.size());
// 第三步发送任务
sendtask(channels[index_channel], taskcommand);
std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
}
void ctrlprocess(std::vector<Channel> &channels,int times=-1){
if(times>0){//固定次数
while(times--){//根据times控制
ctrlprocessonce(channels);
}
}
else{//缺省一直
while(true){//一直控制
ctrlprocessonce(channels);
}
}
}
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2) // 说明不用创建子进程,不会用
{
std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
inittask(); // 装载任务
std::vector<Channel> channels;
creatchannelandsun(num, &channels); // 创建信道和子进程
// 通过channel控制子进程
ctrlprocess(channels,10);//控制10次退出
//回收进程,把写端关闭那么所有子进程读到0就break退出了
for(auto &channel:channels){
channel.closechannel();
}
//注意进程回收,则遍历关闭
for(auto &channel:channels){
channel.wait();
}//如果不等待那么子进程就是僵尸进程了
return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>
#define tasknum 3
typedef void (*task_t)();//task_t 返回值为void,参数为空的函数指针
void print(){//三个任务列表
std::cout<<"i am a printf task"<<std::endl;
}
void download(){
std::cout<<"i am a download task"<<std::endl;
}
void flush(){
std::cout<<"i am a flush task"<<std::endl;
}
task_t tasks[tasknum];//函数指针数组
void inittask(){//初始化任务
srand(time(nullptr)^getpid());//种时间和pid随机种子
//srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
//seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
//time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
//用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
tasks[0]=print;
tasks[1]=download;
tasks[2]=flush;
}
void excuttask(int n){//执行任务
if(n<0||n>2)return;
tasks[n]();//调用
}
int selecttask(){//随机选择任务
return rand()%tasknum;
}
回收子进程:
不回收子进程:
子进程使用重定向优化
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include<sys/wait.h>
#include <sys/types.h>
#include "task.hpp"
// void work(int rfd)//执行任务工作
// {
// while (true)
// {
// int command = 0;
// int n = read(rfd, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
// if (n == sizeof(int))
// {
// std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
// excuttask(command); // 执行
// }
// else if(n==0){
// std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
// break;
// }
// }
// }
void work()//执行任务工作
{
while (true)
{
int command = 0;
int n = read(0, &command, sizeof(command)); // 父进程写了一个整数,那么我也读一个整数
//此时从标准输入去读,没有管道的概念了,对于子进程来说有人通过标准输入将任务给你
if (n == sizeof(int))
{
std::cout<<"pid is "<<getpid()<<"chuli task"<<std::endl;
excuttask(command); // 执行
}
else if(n==0){
std::cout<<"sub process: "<<getpid()<<" quit"<<std::endl;
break;
}
}
}
// master
class Channel
{
private:
int _wfd;
pid_t _subprocessid;
std::string _name; // 信道名字
public:
Channel(int wfd, pid_t id, const std::string name) // 构造函数
: _wfd(wfd), _subprocessid(id), _name(name) // 构造函数初始化列表
{
}
int getfd() { return _wfd; }
int getid() { return _subprocessid; }
std::string getname() { return _name; }
void closechannel(){//关闭文件描述符
close(_wfd);
}
void wait(){
pid_t n=waitpid(_subprocessid,nullptr,0);
if(n>0){
std::cout<<"wait "<<n<<" success"<<std::endl;
}
}
~Channel() // 析构函数
{
}
};
// 形参类型和命名规范
// const & :输出
// & :输入输出型参数
// * :输出型参数
void creatchannelandsun(int num, std::vector<Channel> *channels)
{
for (int i = 0; i < num; i++)
{
int pipefd[2];
int n = pipe(pipefd); // 创建管道
if (n < 0)
exit(1); // 创建管道失败,那么就无需与子进程通信了
pid_t id = fork(); // 创建子进程
if (id == 0)
{ // 子进程不用创建子进程,只需父进程即可
// child --r
close(pipefd[1]);
dup2(pipefd[0],0);//将管道的读端,重定向到标准输入
//本来应该在管道的读端读任务,现在做重定向,把0号文件描述符指向管道的读端
work(); // 进行工作,此时不用传参
exit(0);
}
// farther --w
close(pipefd[0]);
// a.此时父进程已经有了子进程的pid b.父进程的w端
std::string channel_name = "channel-" + std::to_string(i); // 构建一个channel名称
channels->push_back(Channel(pipefd[1], id, channel_name));
}
}
int nextchannel(int channelnum)//选定一个管道
{ // 形成一个0 1 ...到channelnum的编号
static int next = 0;
int channel = next;
next++;
next %= channelnum;
return channel;
}
void sendtask(Channel &channel, int taskcommand)//派发什么任务
{
write(channel.getfd(), &taskcommand, sizeof(taskcommand)); // 写
}
void ctrlprocessonce(std::vector<Channel> &channels){//只做一次任务
sleep(1);//每隔一秒发布一个
// 第一步选择一个任务
int taskcommand = selecttask();
// 第二步选择一个信道和进程,其实是在vector中选择
int index_channel = nextchannel(channels.size());
// 第三步发送任务
sendtask(channels[index_channel], taskcommand);
std::cout<<std::endl;
std::cout<<"taskcommand: "<<taskcommand<<" channel: "<<channels[index_channel].getname()<<" subprocess: "<<channels[index_channel].getid()<<std::endl;
}
void ctrlprocess(std::vector<Channel> &channels,int times=-1){//控制派发任务次数
if(times>0){//固定次数
while(times--){//根据times控制
ctrlprocessonce(channels);
}
}
else{//缺省一直
while(true){//一直控制
ctrlprocessonce(channels);
}
}
}
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2) // 说明不用创建子进程,不会用
{
std::cerr << "usage: " << argv[0] << "processnum" << std::endl;
return 1;
}
int num = std::stoi(argv[1]); // 表明需要创建几个子进程,stoi转化整数
inittask(); // 装载任务
std::vector<Channel> channels;
creatchannelandsun(num, &channels); // 创建信道和子进程
// 通过channel控制子进程
ctrlprocess(channels,10);//控制10次退出
//或者ctrlprocess(channels);//使用缺省参数一直控制
//回收进程,把写端关闭那么所有子进程读到0就break退出了
for(auto &channel:channels){
channel.closechannel();
}
//注意进程回收,则遍历关闭
for(auto &channel:channels){
channel.wait();
}//如果不等待那么子进程就是僵尸进程了
return 0;
}
//以前我们都是用.h表示头文件声明,.cpp表示实现
//那么我们用.hpp也是c++的一种头文件,他允许将声明和实现和在一个文件里,那么就有一个好处,像这种代码无法形成库,即使形成库也是开源形成的
#pragma once
#include<iostream>
#include<ctime>
#include<stdlib.h>
#include<unistd.h>
#include <sys/types.h>
#define tasknum 3
typedef void (*task_t)();//task_t 返回值为void,参数为空的函数指针
void print(){//三个任务列表
std::cout<<"i am a printf task"<<std::endl;
}
void download(){
std::cout<<"i am a download task"<<std::endl;
}
void flush(){
std::cout<<"i am a flush task"<<std::endl;
}
task_t tasks[tasknum];//函数指针数组
void inittask(){//初始化任务
srand(time(nullptr)^getpid());//种时间和pid随机种子
//srand(seed): 这个函数用于用指定的 seed(种子)初始化随机数生成器。
//seed 的值决定了 rand() 生成的随机数序列。如果使用相同的种子值,每次生成的随机数序列都是一样的。
//time(nullptr) 提供了一个基于当前时间的种子值。getpid() 提供了一个进程的唯一标识符。
//用异或操作符 ^ 将这两个值混合在一起,产生一个更为变化的种子值。
tasks[0]=print;
tasks[1]=download;
tasks[2]=flush;
}
void excuttask(int n){//执行任务
if(n<0||n>2)return;
tasks[n]();//调用
}
int selecttask(){//随机选择任务
return rand()%tasknum;
}
那么有个问题?为什么不能关闭一个再等待
为什么呢?这是一个bug
为什么全部关闭后再挨个等待就可以呢?
假如有10个子进程,那么第一个子进程有一个读端10个写端,第二个子进程9个写端1个读端,最后一个进程1个读端1个写端;
如果我们把它全部关完了,那么此时他从上往下遍历,管道最后只有最后一个会释放,那么它对应的上一个管道的写端也会释放,所以递归式的逆向关闭;
需要注意的是函数名就是地址;