当前位置: 首页 > article >正文

Linux - 进程间通信(2)

目录

2、进程池

1)理解进程池

 2)进程池的实现

整体框架:

a. 加载任务

b. 先描述,再组织

I. 先描述

II. 再组织

c. 创建信道和子进程

d. 通过channel控制子进程

e. 回收管道和子进程

问题1:

解答1:

问题2:

解答2:

f. 将进程池本身和任务文件本身进行解耦

3)完整代码

processpool.cc:

Task.hpp:

Makefile:


命令行中的 | ,就是匿名管道

它们的父进程都是bash

2、进程池

1)理解进程池

a. 可以将任务写入管道来给到子进程,从而可以提前创建子进程想让哪个子进程完成任务,我就让写入到哪个子进程相对的管道中

b. 管道里面没有数据,worker进程就在阻塞等待,等待务的到来!!

c. master向哪一个管道进行写入,就是唤醒哪一个子进程来处理任务

d. 均衡的向后端子进程划分任务,称之为负载均衡父进程要进行后端任务的负载均衡

父进程直接向管道里写入固定长度的四字节(int)数组下标(任务码)

函数指针数组中元素分别指向不同的任务,以便master控制worker完成指定工作

 2)进程池的实现
整体框架:
// ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    std::vector<Channel> channels; // 将管道组织起来
    // 1.创建信道和子进程
    CreateChannelAndSub(num, &channels);

    // 2.通过channel控制子进程
    CtrlProcess(channels, 10);

    // 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
    ClearUpChannel(channels);

    return 0;
}
a. 加载任务

我这里用的是打印的方式来模拟任务的分配,通过打印从而了解子进程执行任务的情况,通过种下随机数种子,产生随机数,进而随机的向子进程分配任务,work即为子进程需要做的工作

Task.hpp:

#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if(number < 0 || number > 2) return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}

void work(int rfd)
{
    while (true)
    {
        int command = 0;
        int n = read(rfd, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is: " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process: " << getpid() << " quit" << std::endl;
            break;
        }
    }
}
// 命令行规范 --> ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    return 0;
}
b. 先描述,再组织
I. 先描述

需要控制的信道(即发送端wfd)数量多且繁琐,需要管理起来从而方便控制给子进程发送任务

class Channel
{
private:
    int _wfd;
    int _subprocessid;
    std::string _name;
};

在信道中,我们需要知道发送的文件描述符wfd,还有知道子进程的pid,以及信道的命名(用来区分信道)

II. 再组织
std::vector<Channel> channels;

我们通过用一个vector数组将所有的Channel存储起来,从而实现对它们的增删查改,以方便管理

c. 创建信道和子进程
void CreateChannelAndSub(int num, std::vector<Channel> *channels)
{
    for (int i = 0; i < num; ++i)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0) exit(1); // 创建管道失败

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // Child
            close(pipefd[1]);

            work();

            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个名字
        std::string channel_name = "Channel-" + std::to_string(i);
        // Father
        close(pipefd[0]);
        // 拿到了 a.子进程的pid b.父进程需要的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

用命令行参数的方式传入得到的argv[1]即为输入命令需要的子进程和管道个数

通过for循环,创建 num个 pipe管道以及子进程,当创建完子进程时,需要关闭掉不需要的文件描述符(即wfd -- pipefd[1])(当然,父进程也需要关闭不需要的fd -- rfd),在执行完work(子进程的工作)之后关闭掉rfd(即工作完成了,关闭其管道),然后exit(0)退出进程等待父进程回收

d. 通过channel控制子进程
// 轮询方案 -- 负载均衡
int NextChannel(int channelnum) 
{
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;

    return channel;
}

// 发送相应的任务码到对应管道内
void SendTaskCommand(Channel &channel, int taskCommand)
{
    write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}
void CtrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a. 选择一个任务
    int taskcommand = SelectTask();
    // b. 选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c. 发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout << "=================================" << std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: " 
              << channels[channel_index].GetName() << " sub process: " 
              << channels[channel_index].GetProcessId() << std::endl;
}

void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            CtrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            CtrlProcessOnce(channels);
        }
    }
}

向其发送任务之前,我们需要先选择一个任务,通过随机种子随机数的方式,随机选择我们的一个任务,拿到其任务码(即指针数组下标),然后选择相应的信道和进程(信道和进程一体的),从而向管道发送任务码给子进程

e. 回收管道和子进程
void ClearUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
    }
    
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

我们先将所有的信道关闭,然后在逐个将子进程等待回收

问题1:

那为什么不能边关闭信道边回收呢??

解答1:

在我们创建子进程的过程中,由于父子进程之间的继承,从而导致子进程会拥有父进程的文件描述符内容(即指向同一地方),如果我们边关闭边回收的话,如上图所示,当我们关闭父进程的第一个管道的wfd时,这时候第一个管道的读端的引用计数并未清0,因为子进程2它继承了父进程指向第一个管道的wfd(读端),从而使得读端阻塞,进程不退出,然后wait子进程的时候就会阻塞

在work结束后,才会到下一步close和exit退出子进程;

work结束需要的条件是 n == 0,即读端返回值为0,即

因此上述那种边关闭信道,边wait子进程的方法会阻塞

问题2:

为什么这种方法又能成功回收呢??

解答2:

因为当我们将所有信道关闭时,关闭到最后一个子进程对应的管道的wfd的时候,该管道的读端的引用计数就会为0,从而读端读到0,该子进程退出,随子进程退出就会使得该子进程指向的前面管道的读端回收,就不会造成前面那种情况

f. 将进程池本身和任务文件本身进行解耦

用回调函数可以很好的改善代码的耦合性

通过文件描述符重定向 dup2将标准输入(文件描述符 0)重定向到 rfd 所代表的文件,然后再回调task()函数

// 重定向

这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端

(不用管从哪里接收信息,直接认为从标准输入拿到信息即可)

--- 将管道的逻辑和执行任务的逻辑进一步进行解耦

// task_t task : 回调函数

有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了

--- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它

3)完整代码
processpool.cc:
#include <iostream>
#include <string>
#include <unistd.h>
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

class Channel
{
public:
    Channel(int wfd, pid_t id, const std::string name)
        : _wfd(wfd), _subprocessid(id), _name(name)
    {
    }

    int GetWfd() 
    { 
        return _wfd; 
    }

    pid_t GetProcessId() 
    { 
        return _subprocessid; 
    }

    std::string GetName() 
    { 
        return _name; 
    }

    void CloseChannel() 
    { 
        close(_wfd); 
    }

    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0);
        if (rid > 0)
        {
            std::cout << "wait " << rid << " success" << std::endl;
        }
    }

    ~Channel()
    {
    }

private:
    int _wfd;
    int _subprocessid;
    std::string _name;
};

// 形参和命名规范
// const & : 输入型参数
// & : 输入输出型参数
// * : 输出型参数

// task_t task : 回调函数
// 有了它,我们进程池本身的代码和我们任务本身两个文件就彻底解耦了
// --- 即既不关心从哪个文件描述符,直接默认从0里面去读,也不关心将来谁调它,因为子进程会自动回调它
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    for (int i = 0; i < num; ++i)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1); // 创建管道失败

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // Child
            close(pipefd[1]);

            dup2(pipefd[0], 0);
            task();

            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个名字
        std::string channel_name = "Channel-" + std::to_string(i);
        // Father
        close(pipefd[0]);
        // 拿到了 a.子进程的pid b.父进程需要的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

int NextChannel(int channelnum) // 轮询方案 -- 负载均衡
{
    static int next = 0;
    int channel = next;
    next++;
    next %= channelnum;

    return channel;
}

void SendTaskCommand(Channel &channel, int taskCommand)
{
    write(channel.GetWfd(), &taskCommand, sizeof(taskCommand));
}

void CtrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a. 选择一个任务
    int taskcommand = SelectTask();
    // b. 选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c. 发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout << "=================================" << std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: " << channels[channel_index].GetName()
              << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}
void CtrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            CtrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            CtrlProcessOnce(channels);
        }
    }
}

void ClearUpChannel(std::vector<Channel> &channels)
{
    // for (auto &channel : channels)
    // {
    //     channel.CloseChannel();
    //     channel.Wait();
    // }

    // int num = channels.size() -1;
    // while(num >= 0)
    // {
    //     channels[num].CloseChannel();
    //     channels[num--].Wait();
    // }

    for (auto &channel : channels)
    {
        channel.CloseChannel();
    }
    
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

// ./processpool 3
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum" << std::endl;
    }

    int num = std::stoi(argv[1]);
    LoadTask(); // 加载任务

    std::vector<Channel> channels; // 将管道组织起来
    // 1.创建信道和子进程
    CreateChannelAndSub(num, &channels, work);

    // 2.通过channel控制子进程
    CtrlProcess(channels, 10);

    // 3.回收管道和子进程 a.关闭所有的写端 b.回收子进程
    ClearUpChannel(channels);

    // // for test
    // for(auto &channel : channels)
    // {
    //     std::cout << "====================" << std::endl;
    //     std::cout << channel.GetName() << std::endl;
    //     std::cout << channel.GetWfd() << std::endl;
    //     std::cout << channel.GetProcessId() << std::endl;
    // }
    // sleep(100);
    return 0;
}
Task.hpp:
#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 177); // 种一个随机种子
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if(number < 0 || number > 2) return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}

// void work(int rfd)
// {
//     while (true)
//     {
//         int command = 0;
//         int n = read(rfd, &command, sizeof(command));
//         if (n == sizeof(int))
//         {
//             std::cout << "pid is: " << getpid() << " handler task" << std::endl;
//             ExcuteTask(command);
//         }
//         else if (n == 0)
//         {
//             std::cout << "sub process: " << getpid() << " quit" << std::endl;
//             break;
//         }
//     }
// }

// 这样做可以彻底的让我们的子进程执行对应的work时,再也不需要知道有什么管道的读端
// (不用管从哪里接收信息,直接认为从标准输入拿到信息即可)
// 将管道的逻辑和执行任务的逻辑进一步进行解耦
void work()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is: " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process: " << getpid() << " quit" << std::endl;
            break;
        }
    }
}
Makefile:
processpool:processpool.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f processpool
原文地址:https://blog.csdn.net/cy18779588218/article/details/145386373
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.kler.cn/a/523179.html

相关文章:

  • matlab-simulink
  • java项目之基于推荐算法的图书购物网站源码(ssm+mybatis+mysql)
  • Python中的队列详解
  • 基于Ubuntu2404搭建k8s-1.31集群
  • WPF 设置宽度为 父容器 宽度的一半
  • 2025 年前端开发现状分析:卷疯了还是卷麻了?
  • python flask 使用 redis写一个例子
  • STranslate 中文绿色版即时翻译/ OCR 工具 v1.3.1.120
  • C语言数据类型及取值范围
  • 构建一个有智能体参与的去中心化RWA零售生态系统商业模型
  • go理论知识记录(入门2)
  • 一文大白话讲清楚webpack进阶——4——webpack原理
  • 云计算技术深度解析与代码使用案例
  • Vue.js组件开发-实现下载时暂停恢复下载
  • 团体程序设计天梯赛-练习集——L1-024 后天
  • 自动化运维在云环境中的完整实践指南
  • LeetCode 16. 排列序列
  • 安卓入门四十四 其他动画
  • 每日 Java 面试题分享【第 15 天】
  • Leetcode::119. 杨辉三角 II
  • Golang :用Redis构建高效灵活的应用程序
  • 2024收尾工作
  • Linux环境基础开发工具的使用(apt, vim, gcc, g++, gbd, make/Makefile)
  • 【C语言----数组详解】
  • Gurobi基础语法之 LinExpr 类
  • AI学习指南Ollama篇-Ollama的多模态应用探索