当前位置: 首页 > article >正文

【Linux】深度解析Linux进程间通信:匿名管道原理、实战进程池与高频问题排查。

命名管道在下一篇进行讲解:

【Linux】深入解析Linux命名管道(FIFO):原理、实现与实战应用-CSDN博客

deepseek总结全文

1. 进程间通信(IPC)的核心目的

  • 数据传输:进程间交换数据。

  • 资源共享:多进程共享同一资源(如内存、文件)。

  • 事件通知:进程间发送事件信号(如进程终止通知)。

  • 进程控制:控制其他进程的执行(如调试进程)。

2. 进程间通信的实现前提

  • 共享OS资源:通过内核提供的共享资源(如管道、共享内存)实现通信。

  • 系统调用支持:依赖操作系统提供的接口(如pipe()fork())。

3. 管道的核心机制

  • 匿名管道:仅用于有血缘关系的进程(如父子进程),单向通信。

  • 命名管道(FIFO):允许无血缘关系的进程通过文件系统路径通信。

  • 实现原理

    • 内核维护固定大小的缓冲区(默认4KB),数据以字节流形式传输。

    • 通过文件描述符管理读写端,pipe()返回读端fd[0]和写端fd[1]

    • 父子进程继承文件描述符表,通过关闭不需要的端实现单向通信。

4. 管道的四种关键情况

  • 读阻塞:管道空时,读进程等待数据写入。

  • 写阻塞:管道满时,写进程等待读取释放空间。

  • 读端关闭:写端继续写入会触发SIGPIPE信号(进程终止)。

  • 写端关闭:读端读到EOF(返回0)。

5. 管道的五大特征

  • 血缘限制:匿名管道仅用于父子或兄弟进程。

  • 同步机制:读写顺序性保障数据一致性。

  • 生命周期随进程:管道随进程终止自动销毁。

  • 面向字节流:数据无明确边界,需应用层处理。

  • 半双工模式:单向通信,双向需两个管道。

6. 实战应用与代码示例

  • 进程池:父进程(Master)通过管道向子进程(Worker)分发任务,实现负载均衡。

  • 命令行管道cmd1 | cmd2通过匿名管道连接进程,实现数据流传递。

  • 代码演示

    • 管道创建、父子进程通信、读写端管理。

    • 处理管道阻塞、满缓冲、关闭信号等场景。

7. 关键技术细节

  • PIPE_BUF:单次原子写入的最大字节数(默认4KB),避免数据交错。

  • 引用计数:文件描述符关闭时,内核通过引用计数决定是否释放资源。

  • 写时拷贝(Copy-On-Write):父子进程共享只读数据,修改时触发独立拷贝。

8. 常见问题与解决方案

  • 僵尸进程:父进程需通过waitpid()回收子进程资源。

  • 管道泄漏:未关闭文件描述符导致资源耗尽,需显式关闭不需要的端。

  • 负载均衡:通过轮询或随机选择子进程分配任务。

进程间通信介绍
 

进程间通信目的

进程也是需要某种协同的,所以如何协同的前提条件是通信 --- 数据是有类别的  --  通知就绪单纯的要传递给我的数据与控制相关的信息。

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源。

通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。

进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。

进程如何通信

a.进程间通信,成本可能会稍微高一些

b.进程间通信的前提:

  • 1.先让不同的进程,看到同一份(OS)资源(“一段内存”)
  • 2.一定是由某一个进程先需要通信,让OS创建一个共享资源
  • 3.OS必须提供很多的系统调用

进程间通信为什么有多个种类:

  • 1.OS创建的共享资源不同
  • 2.系统调用的接口不同

进程间通信发展

  • 管道
  • System V进程间通信

             System V,曾经也被称为AT&T System V,是Unix操作系统众多版本中的一支。它最初由AT&T开发,在1983年第一次发布。一共发行了4个System V的主要版本:版本1、2、3和4。System V Release 4,或者称为SVR4,是最成功的版本,成为一些UNIX共同特性的源头,例如 ”SysV 初始化脚本“ (/etc/init.d),用来控制系统启动和关闭,System V Interface Definition (SVID) 是一个System V 如何工作的标准定义。    

       

  • POSIX进程间通信

进程间通信分类

管道

  • 匿名管道pipe
  • 命名管道

System V IPC

  • System V 消息队列
  • System V 共享内存(重点讲,和地址空间有关)
  • System V 信号量

POSIX IPC        (网络通信)

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

管道

什么是管道

  • 管道是Unix中最古老的进程间通信的形式。
  • 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道

当把一个文件打开了两次(这里并不是指以rw的方式,而是分开打开的),一次读文件,一次以写的方式,打开的方式不同。在内核当中会创建两个struct files_struct,其中的文件描述符fd不一样。两个struct file当中都有指向文件所有的属性,操作方法的集合以及内核级文件缓冲区的不同指针。最后所指向的都是同一个。他们的属性和内容都是一样的,struct files_struct会被创建两次。文件只被加载一次。

进程要保证独立性,与文件没有关系。在创建子进程时,各自维护自己的内核数据结构的独立性以及和地址空间代码和数据的独立性。在父进程拷贝给子进程的struct files_struct 文件描述符表当中全都是 struct file *fd_array[] 指向被打开文件的指针。因此文件系统只用被创建一次,不需要再给子进程拷贝一份,子进程的中的fd都和父进程中的一样,指向着同一个文件。

由此做到父进程打开的文件,子进程也能看到

1.为什么父子进程总是会向同一个显示器终端打印数据。

因为子进程会继承父进程的文件描述符表,进而父子会指向同一个文件,他们会将各自写的数据写入同一个内核级缓冲区当中,当OS刷新时,内容被刷新到到同一个显示器。

2.进程默认会打开三个标准输入输出:0 1 2 ,是如何做到的?

因为所有的进程都是bash的子进程,所以只要bash打开了,未来的所有子进程都会打开三个标准输入输出。

3.为什么子进程主动close(0/1/2),不会影响父进程继续使用显示器文件呢?

close(0/1/2)在重定向的时候讲过:【Linux】从文件操作到重定向与缓冲区的底层实现与实战-CSDN博客

因为在struct file内部会包含引用计数(类似硬链接的引用计数的原理)【Linux】文件查找、软硬链接、动静态库-CSDN博客

这里的引用计数是内存级

当子进程close1个的时候,引用计数-1(file->ref_count--;if(ref_count == 0)释放资源)

管道文件

多个进程所看到的同一份资源就称做管道文件

如何实现进程之间通信

通过父子进程看到公共资源,父进程在该缓冲区中写入,子进程从该缓冲器中读,实现进程之间通信。

但是父和子共同通信的时候为了保证合理合法性以及正常通信:

1.管道只允许单向通信(父子同时进行读写会出现数据紊乱)

        a.只有父进程在发消息

        b.只有子进程在发消息

当父进程以读的方式和写的方式分别打开了同一个文件,会出现两个文件描述符表与内核数据结构。假如一个的fd = 3是以读打开的,一个的fd = 4是以写的方式打开的。并且父进程现在只想读取fd = 3文件中的内容,就需要关闭fd = 4。当子进程现在只想在fd = 4文件中写内容,就需要关闭fd = 3。实现父子关闭掉不需要的文件描述符。从而实现父子进程看到同一个内核级缓冲区,子进程向fd = 4文件写内容,父进程从fd = 3的文件中读取到内容。

2.因为通信只需要依靠数据在内核级文件缓冲区中进行读写,不需要刷新到磁盘:

因此需要保证管道不需要刷新到磁盘,管道的接口会跟文件的接口有一些不同。

3.既然父子在通信时,需要关闭掉不需要的文件描述符,在起初为什么还打开?这个关闭是必须的吗?

如果父进程以读|写方式打开了一个fd = 3的文件,那么子进程继承也只会继承以读|写方式打开的fd = 3的文件,管道是不允许存在两个读|写的文件。父进程也不能以rw的方式打开一个文件,子进程也就有rw,可能会产生同时读写操作,虽然是允许的,但是最好不能使用。因此为了让子进程继承下去才都打开,以读的方式和写的方式分别打开。是可以不关闭的,但是要保证不能使用,因此建议关闭不使用的。一个进程打开的文件存在文件描述符表中以数组的方式,也就注定fd的存在是有上限的(一个进程能打开的文件是有上限的),因此避免浪费系统资源,最好关闭不使用的文件。

匿名管道

因为使用管道根本不需要访问到磁盘,因此不需要文件路径和文件名

创建管道的方法

#include <unistd.h>功能:创建一无名管道原型

int pipe(int pipefd[2]);参数

pipefd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端返回值:成功返回0,失败返回错误代码。

双向通信的实现方法:使用两个管道。

管道为什么要单向通信:

为了简单;        

实现原理

  • 单向通信:数据只能在一个方向流动(一端写,另一端读)。

  • 基于内存的缓冲区:内核维护一个固定大小的缓冲区(通常为 4KB)。

  • 依赖文件描述符:通过 pipe() 系统调用创建一对文件描述符(fd[0] 用于读,fd[1] 用于写)。

  • 进程继承:通常由父进程创建管道,再通过 fork() 创建子进程,子进程继承父进程的文件描述符。

创建与使用

步骤:

  1. 调用 pipe():创建管道,获取两个文件描述符。

  2. 调用 fork():创建子进程。

  3. 关闭不需要的描述符

    • 父进程关闭读端(fd[0]),向写端(fd[1])写入数据。

    • 子进程关闭写端(fd[1]),从读端(fd[0])读取数据。

  4. 读写操作:通过 read() 和 write() 函数进行数据传输。

进程通信的准备工作:

#include<iostream>
#include<unistd.h>
#include<cerrno>
#include<cstring>
int main()
{
    //close(0);用于验证fd的分配
    //1.创建管道
    int pipefd[2];
    int n = pipe(pipefd);  //输出型参数,rfd,wfd
    if(n != 0)
    {
        std::cerr << "errno" << errno << ":" << "errstring" << strerror(errno);
        return 1;
    }
    //注意:pipefd[0]->下标为0 -> r(嘴巴 -> 读),pipefd[1]->下标为1 -> w(笔->写)
    std::cout << "pipefd[0]: " << pipefd[0] << ", pipefd[1]: " << pipefd[1] << std::endl;

    //2.创建子进程
    pid_t id = fork();
    if(id == 0)
    {
        //子进程
        //3.关闭不需要的描述符
        close(pipefd[0]);


        close(pipefd[1]);
        exit(0);
    }
    //父进程
    //3.关闭不需要的描述符
    close(pipefd[1]);


    
    close(pipefd[0]);
    return 0;
}

//打开终端:ctrl + shift + `

让父子进程做他们的工作:

//4.让子进程进行写入
void SubProcessWrite(int wfd)
{

}
//4.让父进程来进行读取
void FatherProcessRead(int rfd)
{

}

添加父进程等待waitpid()函数<sys/wait.h>让子进程被等待

    if (rid > 0)
    {
        std::cout << "wait child process success!" << std::endl;
    }

至此完成整个代码结构的编写;

父子进程进行通信:

//7.创建一个缓冲区
const int size = 1024;
//5.用于获取动态信息,获取子进程的pid,以及一个计数器,
std::string getOtherMessage()
{   
    static int cnt = 0;//计数器
    std::string msgid = std::to_string(cnt);//stoi是字符串转整数
    cnt++;
    pid_t self_id = getpid();//#include<unistd.h>,#include<sys/types.h>
    std::string string_pid = std::to_string(self_id);

    std::string message = "msg_id: ";
    message += msgid;
    message += ", my_pid: ";
    message += string_pid;

    return message;
}

//4.让子进程进行写入
void SubProcessWrite(int wfd)
{
    std::string msg = "father, i am your son process!";
    while(true)
    {
        std::string info = msg + getOtherMessage();
        //6.子进程发送信息给父进程 --- write()系统调用接口---><unistd.h>
        write(wfd, info.c_str(), info.size());//write("向谁写入:wfd","写什么: info.c_str()","写入的字符个数: info.size()")
                                    //在这里,写入管道的时候,没有写入\0,没必要
    }
}
//4.让父进程来进行读取
void FatherProcessRead(int rfd)
{
    char inbuffer[size]; //c99 gnu g99标准是可以使用变量做数组大小
    while(true)
    {   
        ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1);//read(“从哪里读取:rfd”, “读取到哪里:inbuffer”, “期望读取多少:不读\0:sizeof(inbuffer)”)        
        if(n > 0)
        {
            inbuffer[n] = 0;//=='\0'
            std::cout << "father get message: " << inbuffer << std::endl;
        }
    }
}

父进程只要能拿到子进程的所发送的消息,就能打印到显示屏终端

测试代码:

1.每隔一秒获取进程头部信息以及我所写的testpipe管道通信的程序进程的信息,并且过滤掉grep进程自己;

while :; do ps ajx | head -1 && ps ajx | grep testpipe | grep -v grep; echo "-----------------------------------------------------------------------------------"; sleep 1;done

2.观测前准备工作:

运行后结果:

fork之后,子进程是能拿到父进程的数据的   ----   算通信,但也不是,因为不能修改,这是写时拷贝【Linux】操作系统内存管理:地址空间、页表、写时拷贝与进程调度原理_地址空间如何划分-CSDN博客

为什么需要特别写一个std::string getOtherMessage() 交给父进程

因为父子共享同一个全局变量,父子进程去修改的时候(由于写时拷贝的问题),对方都看不到,

因此不能定义全局的char buffer[1024];

1.了解管道的4种情况 

在后面有代码验证

情况1. 如果管道内部是空的并且write fd 没有关闭,读取条件不具备,读进程会被阻塞 -- wait -->读取条件具备 <--- 写入数据

情况2:管道被写满 && read fd不读且没有关闭,管道被写满,写进程会被阻塞(管道被写满,写条件不具备) ---- wait  <----读取数据

情况3:管道一直在读&&写端关闭了wrf,读端read返回值为0,表示读到了文件末尾

情况4:rfd直接关闭,写端wfd还会一直进行写入吗?不会,此时写入已经没有意义,此时的管道已经是一个broken pipe:坏的管道。OS不做浪费时间空间的事情,因此会杀掉对应的进程,OS会自动向wfd进程发送13)SIGPIPE信号,来杀掉wfd进程。相当于进程出现了异常。管道自动销毁。

2.管道的5种特征

a.匿名管道:只能用来进行具有血缘关系的进程之间的通信,常用于父子间进程。

b.自带进程之间的同步机制 :多执行流执行代码的时候,具有明显的顺序性。

c.管道文件的生命周期随进程的

d.管道文件在通信的时候,是面向字节流的

e.管道的通信模式,是一种特殊的半双工模式(只能单向通信,不能双向),像是对讲机

a.特征1的验证:

(两个毫不相关的进程无法做到通信,子进程能继承父进程的内核数据结构拷贝父进程的文件描述符表)

能否进行父亲儿子孙子之间的通信:可以

 

        if(fork() > 0)
        {
            exit(0);//往后通信的就是孙子进程
        }

运行结果:

 

b.自带进程之间的同步机制的验证:

多执行流执行代码的时候,具有明显的顺序性。

在写的代码中,父进程并没有加任何的sleep(),子进程写的慢了,父进程读取的也会慢。

如果公共资源可能会存在被多个进程同时访问的情况。因为并发读取而导致的数据不一致的问题。

情况1:

但是在管道这里,这种情况不存在,子进程写一条,父进程读一条,子进程未写的时候,父进程处于等待状态。这就叫进程之间的同步机制。

 现在使子进程一直写,父进程500s再读一次:

 让子进程每次写入一个字符‘A’,并且记录管道的大小,再打印:

情况2:

当子进程写到65536个字符的时候,卡住,父进程一直在等待,管道被写满了(ubuntu管道大小:64KB)


情况3:

让子进程动态的向父进程写入A,B,C,D,E.F几个字符后退出,父进程接收到read的返回值为0就

// 4.让子进程进行写入
void SubProcessWrite(int wfd)
{
    int pipesize = 0;
    char c = 'A';

    std::string msg = "father, i am your son process!";
    while (true)
    {
        write(wfd, &c, 1);
        std::cout << "pipesize: " << ++pipesize << ", write charactor is: " << c++ <<  std::endl;
        if(c == 'G')
        {
            break;
        }
        sleep(1);
    }
    std::cout << "child quit ..." << std::endl;
}
// 4.让父进程来进行读取
void FatherProcessRead(int rfd)
{
    char inbuffer[size]; // c99 gnu g99标准是可以使用变量做数组大小
    while (true)
    {
        // sleep(500);
        ssize_t n = read(rfd, inbuffer, sizeof(inbuffer) - 1); // read(“从哪里读取:rfd”, “读取到哪里:inbuffer”, “期望读取多少:不读\0:sizeof(inbuffer)”)
        if (n > 0)
        {
            inbuffer[n] = 0; //=='\0'
            std::cout << "father get message: " << inbuffer << std::endl;
        }
        // 如果read的返回值为0, 表示写端直接关闭了,我们读到了文件的结尾。
        else if(n == 0)
        {
            std::cout << "child quit, father get return val: " << n <<", father quit too!"<<std::endl;
            break; 
        }
        else if(n < 0)
        {
            std::cerr<< "read error " << std::endl;
        }
    }
}

运行结果:

read的返回值为0 


情况4:

rfd直接关闭,写端wfd还会一直进行写入,写端进程会被操作系统直接使用13号信号关掉,相当于进程出现了异常的代码验证:

1.让子进程一直写入每1s写一次,让父进程读取一次后sleep(1)退出,关闭rfd。

3.使用status,在子进程退出后,获取子进程的退出信息

想要获取到子进程的退出信息需要用到status:这篇文章有专门对status的原理以及使用的详细讲解:【Linux】进程的创建、终止、等待与程序替换函数 保姆级讲解-CSDN博客

    int status = 0;//获取子进程的退出信息

    pid_t rid = waitpid(id, &status, 0); // #include<sys/wait.h>
    if (rid > 0)
    {
        std::cout << "wait child process success!, exit sig: " << (status&0x7f) <<std::endl;
        std::cout << "wait child process success!, exit code(ignore): " << (status>>8&0xff) <<std::endl;
    }

运行后(运行此代码之前关掉孙子进程):


管道的特征4:

d.管道文件在通信的时候,是面向字节流的

1.让子进程不休息一直写

2.让父进程sleep(2),再读取

3.让子进程向标准错误中打,父进程向标准输出中打:

此时开启再开一个终端窗口1,将标准错误打印到这个窗口中:testpipe是我的可执行程序

./testpipe 2> /dev/pts/1

运行结果:

写入的次数和读取的次数不是一一匹配的,这就是面向字节流的最典型的特点。

在计算机通信中,管道中的面向字节流指的是一种数据传输方式:数据像水流一样连续、无明确边界地流动,接收方无法直接区分发送方的多次写入操作。生活中的一个典型例子是 通过吸管喝珍珠奶茶

面向字节流例子解析:吸管喝奶茶 vs 字节流

  1. 发送方(你)
    你不断用吸管吸入奶茶和珍珠。每一次“吸”的动作,可能吸到奶茶、珍珠或混合体(类似程序多次写入数据到管道)。

  2. 管道(吸管)
    吸管中的奶茶和珍珠是连续流动的,没有明确的分隔符(比如“这一口是奶茶,下一口是珍珠”)。这就像字节流中的数据:没有固定结构或边界

  3. 接收方(你的嘴)
    你每次喝到的量可能不同:可能一口全是奶茶,或一口混合了珍珠和奶茶(类似接收方读取的数据可能包含多次写入的内容,或需要多次读取才能拼成完整数据)。

  4. 关键问题
    如果吸到一颗珍珠卡在吸管中间,你可能需要多吸几次才能吃到它——这就像接收方需要处理不完整的数据边界(例如:半条消息在第一次读取,剩下半条在第二次)。

通俗的讲:

自来水管道
水从水龙头连续流出,用水桶接水时,每次接的量取决于水桶大小和开关时间(类似接收缓冲区大小和读取频率)。

对比:面向消息 vs 面向字节流

  • 面向消息(如UDP)
    类似用快递寄包裹,每个包裹独立(消息边界明确),快递员不会拆开你的箱子混装货物。
    例子:你寄出三本书,对方收到三个包裹,每包一本书。

  • 面向字节流(如TCP)
    类似用吸管喝奶茶,所有数据是连续的“流”,接收方需要自己判断消息边界(比如通过分隔符或长度标识)。
    例子:你分三次写入数据 A|B|C,接收方可能一次读到 A|B|C,也可能分多次读到 AB|C 或 A|BC


PIPE_BUF

在Linux中,PIPE_BUF 是管道(pipe)的一个关键参数,它定义了单次原子写入操作的最大字节数。以下是核心要点:


1. 什么是 PIPE_BUF

  • 原子性保证
    如果进程向管道单次写入的字节数 ≤ PIPE_BUF,则这次写入是原子的——数据会作为一个连续块传输,不会被其他进程的写入操作打断。
    (例如:多个进程同时写管道时,小数据块不会交错)

  • 默认大小
    在大多数Linux系统中,PIPE_BUF 的值为 4096字节(4KB),但具体数值取决于内核实现(可通过 ulimit -p 查看)。


2. 为什么需要 PIPE_BUF

  • 避免数据混乱
    若多个进程并发写入管道,且每次写入 ≤ PIPE_BUF,则每个进程的数据块在管道中保持完整,不会与其他进程的数据混合。
    (类似多人同时向水管注水,若每人每次倒水不超过桶容量,则各自的水流不会混在一起)

  • 超过 PIPE_BUF 的后果
    若单次写入超过 PIPE_BUF,写入操作可能被拆分成多个非原子块,导致数据在接收端交错(需额外同步机制)。


3. 示例场景

// 进程A写入管道
write(pipe_fd, "Hello", 5);   // ≤ PIPE_BUF → 原子写入
// 进程B同时写入管道
write(pipe_fd, "World", 5);   // ≤ PIPE_BUF → 原子写入

// 读取结果可能是 "HelloWorld" 或 "WorldHello",但不会出现 "HWeolrllod"(交错)

4. 查看 PIPE_BUF 的值

# 命令行查看(单位:字节)
$ ulimit -p
4096  # 典型值(可能因系统不同而异)
# 或通过C代码获取:
#include <unistd.h>
printf("PIPE_BUF = %d\n", PIPE_BUF);

5. 与管道缓冲区的区别

  • 管道总容量
    管道的总缓冲区可能更大(如64KB),但 PIPE_BUF 仅约束单次写入的原子性。

  • 写满管道的处理
    若管道已满,write() 会阻塞(默认行为)或返回 EAGAIN(非阻塞模式),但这与 PIPE_BUF 无关。


总结

  • PIPE_BUF 的作用:确保小数据块的原子写入,简化多进程/线程同步。

  • 关键规则:单次写入 ≤ PIPE_BUF → 数据完整;反之 → 可能被分割。

  • 适用场景:需并发写入管道时,优先控制单次写入量 ≤ PIPE_BUF,避免数据交错。

管道的应用:

1.命令行上的|是匿名管道

命令行上用管道连接起来的指令,形成了几个进程,并且他们的父进程一样,因为都是bash,他们三个是兄弟关系,这些管道是匿名管道

sleep 1000 | sleep 2000 | sleep 3000
ps ajx | grep -v grep | grep sleep

2.进程池:

父进程创建多个子进程,父进程作为写入端,子进程作为读取端,父进程就作为master、子进程就作为worker/slaver。当worker进程对应的管道中没有数据的时候,worker进程就处于阻塞等待状态,等待任务的到来,master向哪一个管道写入,哪一个worker进程就被唤醒来处理任务。

管道可以实现进程间的协同:

由父进程将任务交给对应的子进程,收到任务的子进程就处理,没有收到任务的子进程就阻塞等待。由此来实现多进程之间的协作。

负载均衡:

父进程能够合理协调每个子进程的工作,做到负载均衡的向后端子进程分发任务。

#include<iostream>
#include<string>
#include<vector>
//管道头文件:
#include<unistd.h>


class Channel  //信道
{
private:
    int _wfd;
    pid_t _subprocessid; //子进程id
    std::string name;
};

int main()
{
    std::vector<Channel> channels;  //将对信道的管理,转换成对vector的增删查改
}

做好准备工作:创建好管道,以及子进程,子进程进行读,父进程进行写入。在子进程开始工作前,关闭子进程的写入。关闭父进程的读取。通过命令行参数,使用循环,来创建几个子进程,几个管道。

// 能够做到:命令行参数两个
//./processpool 5 创建5个子进程

int main(int argc, char *argv[])
{
    // 命令行参数不是2个
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
        return 1;
    }
    //1.通过命令行输出的子进程个数来创建几个子进程
    int num = std::stoi(argv[1]); // 将命令行所记录的第二个参数赋值给num

    std::vector<Channel> channels; // 将对信道的管理,转换成对vector的增删查改

    for (int i = 0; i < num; i++)
    {
        //3.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if(n < 0) exit(1);

        //2.创建子进程
        pid_t id = fork();
        if(id == 0)
        {
            //child --read
            close(pipefd[1]);//关闭子进程的写
            work(pipefd[0]); //让子进程去工作,退出循环
            close(pipefd[0]);
            exit(0);
        }
        //father
        close(pipefd[0]);
        //继续创建子进程
    }
}

初始化各个信道:

 Channel(int wfd, pid_t id, const std::string &name)
        : _wfd(wfd), _subprocessid(id), _name(name)
    { }

构建子进程的名字,方便后续对他们进行管理:

        close(pipefd[0]);
        // a.子进程的pid b.父进程关心的管道写入端
        //构建子进程的名字
        std::string channel_name = "Channel-" + std::to_string(i);
        channels.push_back(Channel(pipefd[1], id, channel_name));
        // 继续创建其他子进程

让每个子进程都不退出:

void work(int fd)
{
    while (true) sleep(1);
}

获取wfd、id、name

1.创建Get方法:

 int GetWfd() { return _wfd; }
 pid_t GetId() { return _subprocessid; }
 std::string GetName { return _name; }

2.打印:

    // for test
    for (auto &channel : channels)
    {
        std::cout << "------------------------------------------" << std::endl;
        std::cout << channel.GetName() << std::endl;
        std::cout << channel.GetWfd() << std::endl;
        std::cout << channel.GetId() << std::endl;
    }

运行:

while :; do ps ajx | head -1 && ps ajx | grep processpool | grep -v grep; echo "-----------------------------------------------------------------------------------"; sleep 1;done

形参类型和命名规范:

1.const& 表示输出型参数

2.& :输入输出型参数

3.* : 输出型参数

重构代码:

1. CreateChannelAndSub() 创建子进程和进程池

// 1.创建子进程和进程池
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{
    // 创建子进程和进程池
    for (int i = 0; i < num; i++)
    {
        // 3.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1);

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // child --read
            close(pipefd[1]); // 关闭子进程的写
            work(pipefd[0]);  // 让子进程去工作,退出循环
            close(pipefd[0]);
            exit(0);
        }
        // 构建子进程的名字
        std::string channel_name = "Channel-" + std::to_string(i);

        // father
        close(pipefd[0]);
        // a.子进程的pid b.父进程关心的管道写入端
        channels->push_back(Channel(pipefd[1], id, channel_name));

        // 继续创建其他子进程
    }
}

2.通过信道来控制对应的子进程

   这里的思路是通过函数指针数组来存放每个任务的方法,再将固定长度4字节的数组下标任务码分给子进程。

1.构建任务列表Task.hpp
#pragma once

#include<iostream>

#define TaskNum 3

typedef void (*task_t) ();

void Print()
{
    std::cout << "i'm a print task!" << std::endl;
}

void Download()
{
    std::cout << "i'm a download task!" << std::endl;
}

void FlushStdout()
{
    std::cout << "i'm a flushstdout task!" << std::endl;
}


task_t tasks[TaskNum];

void LoadTask()
{
    tasks[0] = Print;
    tasks[1] = Download;
    tasks[2] = FlushStdout;
}

//执行任务
void ExcuteTask(int number) 
{
    if(number < 0 || number > 2) return ;
    tasks[number]();
}
    a.选择一个任务 -- 随机选择 --- srand --- #include <cstdlib>

    准备随机任务:

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 12457); //用于增加随机数
    tasks[0] = Print;
    tasks[1] = Download;
    tasks[2] = FlushStdout;
}
int SelectTask()
{
    return rand() % TaskNum; //这里就只会有0,1,2几个值
}

做选择:

    int taskcommand = SelectTask();
b.选择一个进程

更换下标:随着函数不断被调用,轮询的使用信道,使用子进程

//选择信道 -- 拿下标
int NextChannel(int channelnum)
{
    static int next = 0; //初始化用next
    int channel = next; //默认channel为0
    next++;
    next %= channelnum;//防止下标越界,随着函数不断被调用,形成一个循环使用子进程
    return channel;
}

获取下标:

    int channel_index = NextChannel(channels.size());
c.发送任务

使用哪一个信道--子进程,来完成哪个任务

    SendTaskCommand(channels[channel_index], taskcommand);
void SendTaskCommend(const Channel &channel,int taskcommend)
{
    //通过write向指定的文件描述符中发送任务函数指针数组的下标
    write(channel.GetWfd(), &taskcommend, sizeof(taskcommend));
}
d. 执行任务:

从rfd中读取到给我写的任务函数指针数组的下标

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        //读上父进程0给我写的整数赋给command
        int n = read(rfd, &command, sizeof(command));
        if(n == sizeof(int))
        {

            ExcuteTask(command);
        }
    }
}

控制子进程的方式

void ControlProOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a.选择一个任务
    int taskcommand = SelectTask();

    // b.选择一个进程
    int channel_index = NextChannel(channels.size());

    // c.发送任务 -- 使用哪一个信道--子进程,来完成哪个任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout << std::endl;
    std::cout << "taskcommand: " << taskcommand << ", channel: "
              << channels[channel_index].GetName() << ", sub process: " << channels[channel_index].GetId()
              << std::endl;
}

void ControlPro(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {

        {
            void ControlProOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            void ControlProOnce(channels);
        }
    }
}

再给work添加提示字:

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        // 读上父进程0给我写的整数赋给command
        int n = read(rfd, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << "header task" << std::endl;
            ExcuteTask(command);
        }
    }
}

 控制5次:

运行代码:

3.回收管道和子进程

将文件描述符的wfd关闭,所有子进程的rfd也会关闭,当 n = 0 的时候就说明wfd已经关闭,就退出执行任务:

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        // 读上父进程0给我写的整数赋给command
        int n = read(rfd, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << ", header task" << std::endl;
            ExcuteTask(command);
        }
        else if(n == 0)
        {
            break;
        }
    }
}

 a.关闭所有的写端 b.回收子进程

    // 3.回收管道和子进程  a.关闭所有的写端 b.回收子进程
    ClearUpChannel(channels);

b.关闭一个对象的写端,获得父进程等待每个子进程成功的信号。

    void CloseChannel()
    {
        if (_wfd >= 0)
            close(_wfd);
    }
    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0); // 不关心退出码
        if (rid > 0)
        {
            std::cout << "father wait: " << rid << " success!" << std::endl;
        }
    }

c. 遍历以关闭所有的信道,父进程等待所有子进程,避免成为僵尸进程。

void ClearUpChannel(std::vector<Channel> &channels)
{
    for(auto &channel: channels)
    {
        channel.CloseChannel();
    }
    for(auto &channel: channels)
    {
        channel.Wait();
    }
}

改进 --- 解耦

1. 通过重定向,读取管道直接从标准输入读取

void work()
{
    while (true)
    {
        int command = 0;
        // 读上父进程0给我写的整数赋给command
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << ", header task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

这样做的好处:让子进程工作时,不需要知道管道的读端,读取所有任务直接从标准输入读取。子进程直接认为任务是从标准输入来的,读取到0,就直接退出。这样做就能让管道的逻辑,和子进程要执行的任务的逻辑解耦。

2.所有的子进程都会做一些任务

将work()直接作为创建管道和进程的的参数。并将其放入task.hpp文件当中,作为任务列表的一员,因为work本身也是一个任务,作为进程执行的入口函数,这样做能让进程池本身代码与要执行的任务的代码彻底解耦

 CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)

task_t 是我定义的函数指针:创建任务列表的时候定义的:

task_t task ----> 回调函数

void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    // 创建子进程和进程池
    for (int i = 0; i < num; i++)
    {
        // 3.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1);

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // child --read
            close(pipefd[1]); // 关闭子进程的写
            dup2(pipefd[0], 0); //将管道的读端,重定向到标准输入
            task();  // 让子进程去工作,退出循环
            close(pipefd[0]);
            exit(0);
        }
        // 构建子进程的名字
        std::string channel_name = "Channel-" + std::to_string(i);

        // father
        close(pipefd[0]);
        // a.子进程的pid b.父进程关心的管道写入端
        channels->push_back(Channel(pipefd[1], id, channel_name));

        // 继续创建其他子进程
    }

}

这样做的好处是,一旦传入了多个执行流,主进程会继续向后执行,新创建的进程就会回调式的执行我们所传入的task()。

传入不同的work,执行不同的任务。就不用在原work上修改。

    CreateChannelAndSub(num, &channels, work);

修复bug***

bug在哪,怎么造成的

实际上现在的代码还有一个隐藏很深的bug

在回收channel的时候,子进程就退出了,那么直接在一个循环当中让父进程等待回收不就行了:

void ClearUpChannel(std::vector<Channel> &channels)
{
    // for (auto &channel : channels)
    // {
    //     channel.CloseChannel();
    // }
    // for (auto &channel : channels)
    // {
    //     channel.Wait();
    // }

    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }
}

运行程序后会出现:进程阻塞住无法退出

这是因为:

在创建子进程和管道时有一个隐藏很深的bug,随着子进程的创建,由于父进程拷贝给每一个子进程写端。每一个被创建的子进程的写端越来越多。

在刚开始,父进程创建管道的读端3号和写端4号,文件3号4号都是打开的的,第一个子进程1号继承后,关闭不需要的文件描述符,子进程1号关闭的4(写入端),父进程也关闭了3(读取端),保留4号(用于写入)。子进程1号现在通过3号读取父进程从4号文件写入同一个管道中的内容。

父进程继续创建新的管道,由于父进程在与1子进程进行通信时已经关闭3号文件(用于读取内容),而文件描述符的分配规则是,分配现有的最小的号数,也就是3号,3号又作为读取被父进程创建,同时父进程也创建了5号文件(用于写入),将他们一起拷贝给了2号子进程。2号子进程关闭5号文件(用于写入),保留3号文件(用于读取)。父进程关闭了3号。但是,父进程并未关闭在和1号子进程通信的4号(写入端),现在在拷贝给2号子进程的时候将4号(写入端)一起拷贝给他了。

在未来继续创建子进程和管道的时候由于父进程拷贝给每一个子进程上一个子进程未关闭的写端。每一个被创建的子进程的写端累积下来越来越多。如果创建了10个子进程,那么1号子进程将会有10个写端。

当我们关闭1个父进程写入端的文件描述符:但是其他子进程的写入端并未被关闭,第一个子进程的指向管道文件的9个写入端。依旧存在,第一个管道文件还存在着9个引用计数。读端想要继续读,却无法读入内容,子进程就不会退出,在wait的时候父进程根本等不到子进程,就阻塞了,代码无法继续执行。

    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }

那为什么:分开写,就能解决问题?

void ClearUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
    }
    for (auto &channel : channels)
    {
        channel.Wait();
    }

    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }
}

这样写的思路是,先关闭所有的文件描述符,再遍历一遍去等待,就能解决。因为关闭完所有的写入端文件描述符,到最后一个子进程时的管道时,他的写入端只有父进程的一个,这个管道的写入端被关闭,那么他的读取端也会被关闭,这个子进程就结束,他所拷贝的之前父进程与其他子进程通信的写入端也都被释放,那么他的上一个子进程的写入端(也就是最后一个子进程的写入)被关闭。他的上一个子进程的写入端也为0,因此上一个子进程也结束,并且关闭掉指向前面的与其他子进程写入端,这样逆向的类似于递归,所有的写入端都被释放,子进程们也就都结束,再遍历一遍等待也就能成功。

解决方法:

1.在创建子进程之前先判断channels信道是否为空,判断是否是第一次创建子进程,不是就关闭其他子进程的写入端,这只是关闭了父进程拷贝给子进程的写入端,对父进程的写入不会产生影响

修改代码:此时关闭父进程的写端,后直接等待子进程的退出

void ClearUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }
}

运行结果:就成功了

结语:

       随着这篇关于题目解析的博客接近尾声,我衷心希望我所分享的内容能为你带来一些启发和帮助。学习和理解的过程往往充满挑战,但正是这些挑战让我们不断成长和进步。我在准备这篇文章时,也深刻体会到了学习与分享的乐趣。  

         在此,我要特别感谢每一位阅读到这里的你。是你的关注和支持,给予了我持续写作和分享的动力。我深知,无论我在某个领域有多少见解,都离不开大家的鼓励与指正。因此,如果你在阅读过程中有任何疑问、建议或是发现了文章中的不足之处,都欢迎你慷慨赐教。               

        你的每一条反馈都是我前进路上的宝贵财富。同时,我也非常期待能够得到你的点赞、收藏,关注这将是对我莫大的支持和鼓励。当然,我更期待的是能够持续为你带来有价值的内容,让我们在知识的道路上共同前行。


http://www.kler.cn/a/612983.html

相关文章:

  • vue3为什么不需要时间切片
  • 将任何网站变成独立的桌面应用 开源免费 Tuboshu
  • IvorySQL 初始化(initdb)过程深度解析
  • UniApp开发多端应用——流式语音交互场景优化
  • 分布式ID生成器:雪花算法原理与应用解析
  • Spring-CacheKey 设置注意事项
  • 正则表达式-万能表达式
  • PDF 文本提取为何如此困难?– 原因和解决方案
  • string模拟实现-C++
  • 数据结构之约瑟夫环的问题
  • Causal Effect Inference withDeep Latent-Variable Models
  • 聚焦交易能力提升!EagleTrader 模拟交易系统打造交易成长新路径
  • LabVIEW从需求快速开发的方法
  • 5-管理员-维护权限
  • 【PyTorch】
  • JAVASE知识梳理(一)
  • Pyside6 开发 使用Qt Designer
  • G i t
  • 硬件测试工装设计不合理的补救措施
  • 剑指Offer26 -- 树