当前位置: 首页 > article >正文

2024-10-26 进程间通信

1  进程间通信的前提

进程具有独立性,通信的前提:先得让不同的进程看到同一份资源某种形式的内存空间),只能由操作系统提供

实现:数据传输、资源共享、通知事件、进程控制。

2  本地通信

同一台主机,同一个OS,不同进程之间通信。

进程通信的标准:System V IPC 标准,POSIX 标准(可移植操作系统接口标准)。

3  进程间通信分类

管道:匿名管道 pipe、命名管道。

System V IPC:System V 消息队列、System V 共享内存、System V 信号量。

4  管道

who | wc -l

一个组合命令,用于统计当前登录系统的用户数量。

1. who : 列出当前登录系统的用户信息。
2.  | : 管道符,将 who 的输出传递给 wc -l 。
3. wc -l : 统计输入的行数。

文件描述符角度理解匿名管道

进程打开磁盘文件,可使用 fopen/open/fstream 等,首先得告诉对应的路径,根据路径确定是在哪一个分区当中,然后再根据路径解析从根目录开始依次解析,找到文件名和 inode 的映射关系,找到文件的 inode,在特定分区内部找到对应的分组,根据 inode 号找到 inode 属性,文件的内容和属性全有了,所以在内核中就可以创建对应的内核级结构struct file,文件所对应的三个关键字段:inode表、文件操作符表、内核级缓冲区。

上图管道(内存级,特定文件的内核级缓冲区)只能单向通信(父➡️子或子➡️父)

1. 如果不关闭呢?资源浪费、fd 泄露、误操作等问题。

2. 先创建管道,在创建子进程。
3. 为何 rw(读写)同时打开?
        管道的数据流动是单向的,数据从写端流入,从读端流出。因此,管道的两端必须同时存在,才能实现数据的传递。先创建子进程继承权限,然后父进程和子进程分别关闭不需要的一端。

4. 先有简单通信的需求,只需实现单向通信即可,于是根据特性再将其命名为管道。

5. 此管道不需要路径,不需要名字,故被称为匿名管道

另:标准输出和标准错误是两个不同的文件描述符,例,./myexe >log.txt 重定向的完整写法是 ./myexe 1>log.txt,如果想要1、2都重定向,则有 ./myexe >log.txt 2>&1 。为方便 debug,还可以 ./myexe 1>normal.txt 2>error.txt 

 (1)pipe()

#include <unistd.h>
int pipe(int pipefd[2]);

RETURN VALUE
    On success, zero is returned.  On error, -1 is returned, 
    and errno is set appropriately.

pipefd[2] 是输出型参数

(2)匿名管道的4种情况和5个特性

临界资源(Critical Resource)是指在操作系统中,同一时刻只能被一个进程访问的资源。当多个进程并发执行时,如果它们试图同时访问同一个临界资源,可能会导致数据不一致或系统状态错误。

管道的4种场景:
①管道为空且管道正常,read(系统调用)会阻塞;
②管道为满且管道正常,write(系统调用)会阻塞。
注:管道有上限,Ubuntu 是 64KB 。
管道的特性之一是“面向字节流”
③管道写端关闭且读端继续,读端读到0,表示读到文件结尾。
如果写端关闭了,读端读完管道内数据,再读取的时候就会读到返回值0,表示对端关闭(管道角度),同时也表示读到文件结尾(文件角度)。
④管道读端关闭但写端正常,OS会直接杀掉进行写入的进程。
发送 13)SIGPIPE 信号给目标进程)。

匿名管道特性:
①面向字节流;
②具有血缘关系的进程进行IPC,常见于父子;
③文件的生命周期随进程,管道也是;
④单向数据通信;
⑤管道自带同步互斥等保护机制(对共享资源的保护)。

*
在使用 write(2) 系统调用向管道(pipe)写入数据时,如果写入的数据量小于 PIPE_BUF 字节,那么这个写入操作必须是原子的(atomic)。
On Linux, PIPE_BUF is 4096 bytes.
原子操作是指在执行过程中不会被中断的操作。对于 write(2) 来说,原子性意味着如果多个进程同时向同一个管道写入数据,且每个写入的数据量都小于 PIPE_BUF,那么这些写入操作不会相互干扰,每个进程的写入数据都会完整地写入管道,而不会被其他进程的数据打断或混合。

 管道通信的场景:进程池

「Makefile 示例」

BIN=procpool
CC=g++
FLAGS=-c -Wall -std=c++11
LDFLAGS=-o
#SRC=$(shell ls *.cc)
SRC=$(wildcard *.cc)
OBJ=$(SRC:.cc=.o)

$(BIN):$(OBJ)
	$(CC) $(LDFLAGS) $@ $^
%.o:%.cc
	$(CC) $(FLAGS) $<

.PHONY:clean
clean:
	rm -f $(BIN) $(OBJ)

.PHONY:test
test:
	@echo $(SRC)
	@echo $(OBJ)

master —— 管道 —→ worker/slaver (一对多)

Channel.hpp

#ifndef __CHANNEL_HPP__
#define __CHANNEL_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
using namespace std;

class Channel
{
public:
    Channel(int wfd, pid_t pid) : _wfd(wfd), _pid(pid)
    {
        _name = "Channel-" + to_string(wfd) + " : " + to_string(pid);
    }
    string name()
    {
        return _name;
    }
    void Send(int cmd)
    {
        ::write(_wfd, &cmd, sizeof(cmd));
    }
    void Close()
    {
        ::close(_wfd);
    }
    int ChildID()
    {
        return _pid;
    }
    int WFD()
    {
        return _wfd;
    }

private:
    int _wfd;
    string _name;
    pid_t _pid;
};

Task.hpp

#pragma once
#include <iostream>
#include <unordered_map>
#include <functional>
using std::cout;
using std::endl;
using task_t = std::function<void()>;

void Download()
{
    cout << "Downloading..., pid = " << getpid() << endl;
}

void Log()
{
    cout << "Logging..., pid = " << getpid() << endl;
}

void Sync()
{
    cout << "Syncing..., pid = " << getpid() << endl;
}
static int num = 0;

class Taskmanage
{
public:
    Taskmanage()
    {
        Inserttask(Download);
        Inserttask(Log);
        Inserttask(Sync);
    }

    void Inserttask(task_t t)
    {
        tasks[num++] = t;
    }

    void Exe(int num)
    {
        if (tasks.find(num) == tasks.end())
            return;
        tasks[num]();
    }

    int Select()
    {
        return rand() % num;
    }

    ~Taskmanage() {}

private:
    std::unordered_map<int, task_t> tasks;
};

Taskmanage tm;

void Worker()
{
    while (true)
    {
        int cmd = 0;
        int n = ::read(0, &cmd, sizeof(cmd));
        if (n == sizeof(cmd))
        {
            tm.Exe(cmd);
        }
        else if (n == 0)
        {
            cout << getpid() << " quit..." << endl;
            break;
        }
        else if (n > 0)
        {
        }
    }
}

 Procpool.hpp

#include <iostream>
#include <string>
#include <unistd.h>
#include <cstdlib>
#include <vector>
#include <functional>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"
#include "Channel.hpp"

using namespace std;
using work_t = function<void()>; // typedef function<void()> work_t;

enum
{
    NonErr = 0,
    UsageErr,
    PipeErr,
    ForkErr
};

class Procpool
{
public:
    Procpool(int n, work_t w) : num(n), work(w)
    {
    }

    // channels是输出型参数
    // work_t work: 回调
    // 创建子进程和子进程的任务是解耦的
    int Initprocpool()
    {
        for (int i = 0; i < num; i++)
        {
            // 构建管道
            int pipefds[2] = {0};
            int n = pipe(pipefds);
            if (n < 0)
            {
                return PipeErr;
            }
            // 创建进程
            pid_t id = fork();
            if (id < 0)
            {
                return ForkErr;
            }
            // 进程间通信
            if (id == 0)
            {
                // 子进程
                // 关闭历史fd
                cout << "Child " << getpid() << " close fd history: ";
                for (auto &e : channels)
                {
                    cout << e.WFD() << ", ";
                    e.Close();
                }
                cout << " END" << endl;

                ::close(pipefds[1]); // 读,关闭写(1)
                cout << "Child read fd: " << pipefds[0] << endl;
                dup2(pipefds[0], 0);
                work();
                ::exit(0); // 子进程退出
            }
            // 父进程
            ::close(pipefds[0]); // 写,关闭读(0)
            // Channel ch(pipefds[1], id);
            // channels.push_back(ch);
            channels.emplace_back(pipefds[1], id);
        }
        return NonErr;
    }

    void Dispitch()
    {
        int pid = 0;
        // 派发任务
        int num = 20;
        while (num--)
        {
            // 确定任务码
            int task = tm.Select();
            // 选择一个子进程channel
            Channel &cur = channels[pid++];
            pid %= channels.size();
            cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;
            cout << "send " << task << " to " << cur.name() << " | " << num << " left." << endl;
            cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl;

            // 派发任务
            cur.Send(task);

            sleep(1);
        }
    }

    void Cleanprocpool()
    {
        // 关一个收一个(需要每次子进程关闭历史写端fd)
        for (auto &e : channels)
        {
            e.Close();
        }
        for (auto &e : channels)
        {
            pid_t rid = ::waitpid(e.ChildID(), nullptr, 0);
            if (rid > 0)
            {
                cout << "Child " << rid << " wait success" << endl;
            }
        }

        // 逆序关闭
        // for(int i = channels.size() - 1; i >= 0; i--){
        //     channels[i].Close();
        //     pid_t rid = ::waitpid(channels[i].ChildID(), nullptr, 0);
        //     if (rid > 0)
        //     {
        //         cout << "Child " << rid << " wait success" << endl;
        //     }
        // }

        // for (auto &e : channels)
        // {
        //     e.Close();
        // }
        // for (auto &e : channels)
        // {
        //     pid_t rid = ::waitpid(e.ChildID(), nullptr, 0);
        //     if (rid > 0)
        //     {
        //         cout << "Child " << rid << " wait success" << endl;
        //     }
        // }
    }

    void Debugprint()
    {
        for (auto &e : channels)
        {
            cout << e.name() << endl;
        }
    }

private:
    vector<Channel> channels;
    int num;
    work_t work;
};

Main.cc

#include "Procpool.hpp"
#include "Task.hpp"

void Usage(string proc)
{
    cout << "Usage: " << proc << "proc_num" << endl;
}

int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return UsageErr;
    }
    int num = stoi(argv[1]);
    vector<Channel> channels;

    Procpool *pp = new Procpool(num, Worker);
    // 初始化进程池
    pp->Initprocpool();
    // Debugprint(channels);
    // 派发任务
    pp->Dispitch();
    // 退出进程池
    pp->Cleanprocpool();


    // 初始化进程池
    //Initprocpool(num, channels, Worker);
    // Debugprint(channels);
    // 派发任务
    //Dispitch(channels);
    // 退出进程池
    //Cleanprocpool(channels);
    delete pp;
    return 0;
}

让所有子进程都从标准输入里读取命令 dup2(pipefd[0], 0)

任务码通常用于进程间通信(IPC)中,帮助进程识别和处理不同的任务。

 派发任务原则:负载均衡(任务量差不多),实现方法:轮询、随机(长维度)、历史任务数。

如何退出?父进程将 master 写端描述符关闭,所有子进程读端读到0后break,父进程wait进行回收。

如果不是两次遍历,而是在一次遍历中先关闭写端,再对该子进程wait的话,会阻塞,因为当初往下创建子进程时,会导致第一个管道的写端不断增多,从前到后创建的每个管道对应的写端数量依次递减。

法一:逆序关闭子进程

法二:子进程关闭历史上master的所有写端(channels 保存的就是历史上master打开的所有写端)


http://www.kler.cn/a/526462.html

相关文章:

  • 996引擎 - NPC-添加NPC引擎自带形象
  • AI学习指南HuggingFace篇-Hugging Face 的环境搭建
  • 智慧园区系统助力企业智能化升级实现管理效率与安全性全方位提升
  • ROS2---基础操作
  • 20个整流电路及仿真实验汇总
  • (笔记+作业)书生大模型实战营春节卷王班---L0G2000 Python 基础知识
  • Python 梯度下降法(三):Adagrad Optimize
  • 第27章 苏睿所长的关键沟通
  • CS1.5在Win10下有声音黑屏无图像如何设置
  • dify实现原理分析-rag-数据检索的实现
  • 基于强化学习的机器人自主导航与避障
  • 初阶数据结构:链表(二)
  • 电梯系统的UML文档14
  • 10.共享内存 信号量集 消息队列
  • 【2】阿里面试题整理
  • windows怎么查看进程运行时的参数?
  • 【Numpy核心编程攻略:Python数据处理、分析详解与科学计算】1.22 形状操控者:转置与轴交换的奥秘
  • (●ˇ∀ˇ●)思维导图计划~~~
  • 进阶数据结构——高精度运算
  • 探索性测试与自动化测试的结合
  • android 音视频系列引导
  • Python3 【集合】项目实战:3 个新颖的学习案例
  • 【笑着写算法系列】二分查找
  • 在 WSL2 中重启 Ubuntu 实例
  • 特殊Token区域与共享区域
  • 分享|借鉴传统操作系统中分层内存系统的理念(虚拟上下文管理技术)提升LLMs在长上下文中的表现