深入理解 Linux 管道:创建与应用详解(匿名管道进程池)
在现代操作系统中,进程间通信(IPC)是实现多任务、多进程协作的关键技术之一。Linux 提供了多种 IPC 机制,本博客将帮助您详细的理解进程间通信的原理
首先,在学习管道之前,我们先理解一下管道的存在是为了什么,顾名思义,是为了实现进程间的通信。而进程间的通信难道随随便便就可以进行某种交流了吗?显然是不可能的,所以我们讲
进程的通信也是需要某些协同的!所以协同的前提条件是什么呢? 是不是要先能够通信,通信的内容就是数据,数据间又是有类别的,所以要传递的不可能仅仅是数据还有它的相关属性!
那这就难办了,难办怎么办,wc那就别办,当然了进程可不像我们这么任性,难办归难办,还是得办,由于进程间是具有独立性的,进程 = 内核数据结构pcb + 代码数据
所以我们的第一个条件就来了,要想通信,我们是不是需要先让这些独立的进程之间可以看到一份共同的数据啊,你能看到我也能看到,你改动我也能看到,这不就完成初步通信了吗,可我们怎么能看到同一份资源呢,因为进程之间是独立的啊,他俩是不可能干这个事情了,这个时候我们的老大哥OS就来了,操作系统大哥说了:你们俩先别独立了,我这有个桥正好把你两家连上,你俩以后通信就在这通信,这OS发话谁敢不听,没办法,两个进程就这样完成了初步通信,
由上图可以看出,两个进程的通信都在同一个内核级文件缓冲区内,可是现在问题是,进程间的通信是需要成本的,而且成本还很高,那我操作系统的目的是什么,不就是让系统变的简单吗,那如何能缩减工作量呢?很简单--复用内核代码,所有就有了我们的两种管道--命名管道和匿名管道
1.匿名管道,也就是适用于具有血缘关系的进程,什么意思,父进程和子进程还有孙子进程,在我们之前讲子进程的时候知道父子进程之间是应用了写时拷贝技术的,大部分内容是浅拷贝的,因此这时我们采用的是匿名管道,那什么是管道呢?
管道顾名思义就是一端流入,一端流出,也就是单向通信,向我们平常打电话是不是你可以给对方说话,你说话的同时对方也能说话,这叫做双工通信,我们管道是一种特殊的半双工通信,什么意思呢,半双工通信就是你们之间是不能一起沟通的,这样会冲突,双方会听不清对方说的什么,可是它本质上还是支持你们互相说话的,我们特殊就特殊在它不支持互相说话,所以只能一端写,一端读,这样是不是就可以理解管道的通信了,那我们接下来了解一下管道的用法
#include<unistd.h>
int pipe(int fd[2]);
这个fd[2]是输出型参数,
pipe函数定义中的fd参数是一个大小为2的一个数组类型的指针。该函数成功时返回0,并将一对打开的文件描述符值填入fd参数指向的数组。失败时返回 -1并设置errno。
通过pipe函数创建的这两个文件描述符 fd[0] 和 fd[1] 分别构成管道的两端,往 fd[1] 写入的数据可以从 fd[0] 读出。并且 fd[1] 一端只能进行写操作,fd[0] 一端只能进行读操作,不能反过来使用。要实现双向数据传输,可以使用两个管道。
也就是说我们的文件描述符数组中的文件描述符会作为输出参数,默认pipefd[0]是读,pipefd[1]是写端,此时我们的匿名管道也就完成了基础的构建,所以如何使用管道通信呢,就涉及到我们的代码构建
#include <iostream>
#include <unistd.h>
#include <cerrno>
#include <cstring>
#include <string>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"
class Channel
{
public:
Channel(int wfd, pid_t subprocesspid, std::string name)
: _wfd(wfd), _subprocesspid(subprocesspid), _name(name)
{
}
int getWfd() const
{
return _wfd;
}
pid_t getSubprocesspid() const
{
return _subprocesspid;
}
std::string getName() const
{
return _name;
}
// 形参类型和命名规范
// const &: 输出
// & : 输入输出型参数
// * : 输出型参数
// task_t task: 回调函数
void Wait()
{
int status;
if (waitpid(_subprocesspid, &status, 0) == -1)
{
std::cerr << "waitpid failed: " << strerror(errno) << std::endl;
}
if(waitpid(_subprocesspid, &status, 0) > 0)
{
std::cout << "waitpid success" << std::endl;
}
if (WIFEXITED(status))
{
std::cout << _name << " exited with status " << WEXITSTATUS(status) << std::endl;
}
else
{
std::cerr << _name << " exited abnormally" << std::endl;
}
}
void CloseChannel()
{
if (close(_wfd) == -1)
{
std::cerr << "close " << _name << " failed: " << strerror(errno) << std::endl;
}
}
private:
int _wfd;
pid_t _subprocesspid;
std::string _name;
};
void CreateChannelsandSubprocesses(int subprocessnum, std::vector<Channel> *channels, task_t task)
{
for (int i = 0; i < subprocessnum; i++)
{
int pipefd[2];
if (pipe(pipefd) == -1)
{
std::cerr << "pipe failed: " << strerror(errno) << std::endl;
exit(1);
}
pid_t id = fork();
if (id == -1)
{
std::cerr << "fork failed:" << strerror(errno) << std::endl;
}
if (id == 0)
{
if (i != 0)
{
// 说明当前是第二个子进程
for (int j = 0; j < i; j++)
{
close((*channels)[j].getWfd());
}
}
close(pipefd[1]);
dup2(pipefd[0], STDIN_FILENO);
task();
close(pipefd[0]);
exit(0);
}
close(pipefd[0]);
channels->push_back(Channel(pipefd[1], id, "subprocess" + std::to_string(i)));
}
}
int NextChannelIndex(int size)
{
static int index = 0;
return index++ % size;
}
void SendTask(int wfd, int tasknum)
{
if (write(wfd, &tasknum, sizeof(tasknum)) == -1)
{
std::cerr << "write failed: " << strerror(errno) << std::endl;
}
}
void controlProcessonce(std::vector<Channel>& channels)
{
int tasknum = Selecttask();
int channel_index = NextChannelIndex(channels.size());
SendTask(channels[channel_index].getWfd(), tasknum);
}
void controlProcess(std::vector<Channel>& channels,int times = -1)
{
if(times > 0)
{
while(times--)
{
controlProcessonce(channels);
}
}
else
{
while(1)
{
controlProcessonce(channels);
}
}
}
void cleanupchannels(std::vector<Channel>& channels)
{
for(auto& channel:channels)
{
channel.CloseChannel();
channel.Wait();
}
}
int main(int argc, char *argv[])
{
if (argc < 2)
{
std::cerr << "Usage:" << argv[0] << "subprocessnum" << std::endl;
}
int subprocessnum = atoi(argv[1]);
if (subprocessnum <= 0)
{
std::cerr << "subprocessnum must be greater than 0" << std::endl;
}
Loadtask();
std::vector<Channel> channels;
// 1. create subprocessnum subprocesses and establish communication channels
CreateChannelsandSubprocesses(subprocessnum, &channels, work1);
// 2. control the subprocesses
controlProcess(channels, 10);
// 3. close the channels
cleanupchannels(channels);
return 0;
}
大概看完以上代码的小伙伴就可以跟我进行下一步的验证了,这里时间有限,我们仅将实验过程中的代码出现的情况进行一个总结:
管道的四种情况:
1.如果管道内部是空的并且 write fd没有关闭,读取的条件就不具备,读进程就会堵塞,直到读取的条件具备之后也就是写入了数据才会进行读取
2.管道被写满并且 read fd不读且没有关闭,此时管道被写满了,写进程会被堵塞,直到读取后再写入
3.管道如果一直在读而写端突然关闭了wfd,那么read会返回0代表读到了文件结尾EOF
4.rfd直接关闭,可写端还在写入,那么此时OS会用13号信号关掉子进程,表示遇到异常。
5大特征:
1.匿名管道:只用来进行具有血缘关系的进程之间进行通信,常用于父子进程
2.管道内部,自带进程之间同步的机制
3.管道文件的生命周期是随进程的
4.管道文件在通信的时候是面向字节流的,write的次数和读取的次数不是一一匹配的
5.管道的通信模式是一种特殊的半双工模式
到这我们就初步完成了匿名管道的学习,紧接着就是对于进程池的学习,我们的匿名管道正好是父子进程之间的通信,而如果我们有多个子进程,想让他们同时执行不同的任务,就需要我们的进程池了,既可以完成父子间的通信,还可以完成对不同任务的分发,从而显著减少进程创建和销毁的开销,提升系统性能和响应速度。请看下一篇博客哦