当前位置: 首页 > article >正文

【Linux-进程间通信】匿名管道的应用-进程池实现+命名管道的应用ClientServer通信

匿名管道的应用--进程池/C++实现

当系统中需要处理众多任务时,可以将这些任务分配给多个子进程来分担工作。然而,频繁地创建和销毁进程会导致较高的时间成本。为减少这种开销,可以采取预先创建一组子进程的策略(以避免在任务分配时进行即时创建),并在子进程空闲时让它们处于阻塞状态(以防止子进程完成任务后被立即终止,从而在下一次任务分配时无需重新创建进程)。

为了实现这一目标,我们需要使用匿名管道这一技术。通过在父进程与子进程之间创建匿名管道,我们可以实现父进程向特定的子进程发送任务指令。

但需要注意的是,父进程不应该集中大量的将某个任务派发给同一个子进程,让其它进程处于空闲状态,这样会使得整机效率低下。我们可以通过

轮询或者随机派发的方式给子进程派发任务,实现负载均衡

我们要实现子进程对不同任务的管理,怎么做到呢??先描述再组织

既然随机派发任务,首先要有任务

#pragma once

#include <iostream>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>

#define TaskNum 3

typedef void (*task_t)(); // task_t 函数指针类型

void Print()
{
    std::cout << "I am print task" << std::endl;
}
void DownLoad()
{
    std::cout << "I am a download task" << std::endl;
}
void Flush()
{
    std::cout << "I am a flush task" << std::endl;
}

task_t tasks[TaskNum];

void LoadTask()
{
    srand(time(nullptr) ^ getpid() ^ 17777);
    tasks[0] = Print;
    tasks[1] = DownLoad;
    tasks[2] = Flush;
}

void ExcuteTask(int number)
{
    if (number < 0 || number > 2)
        return;
    tasks[number]();
}

int SelectTask()
{
    return rand() % TaskNum;
}
void work()
{
    while (true)
    {
        int command = 0;
        int n = read(0, &command, sizeof(command));
        if (n == sizeof(int))
        {
            std::cout << "pid is : " << getpid() << " handler task" << std::endl;
            ExcuteTask(command);
        }
        else if (n == 0)
        {
            std::cout << "sub process : " << getpid() << " quit" << std::endl;
            break;
        }
    }
}

先描述


class Channel
{
public:
    Channel(int wfd, pid_t id, const std::string &name)
        : _wfd(wfd), _subprocessid(id), _name(name)
    {
    }
    int GetWfd() { return _wfd; }
    pid_t GetProcessId() { return _subprocessid; }
    std::string GetName() { return _name; }
    void CloseChannel()
    {
        close(_wfd);
    }
    void Wait()
    {
        pid_t rid = waitpid(_subprocessid, nullptr, 0);
        if (rid > 0)
        {
            std::cout << "wait " << rid << " success" << std::endl;
        }
    }
    ~Channel()
    {
    }

private:
    int _wfd;
    pid_t _subprocessid;
    std::string _name;
};

创建信道和子进程

// 形参类型和命名规范
// const &: 输出
// & : 输入输出型参数
// * : 输出型参数
//  task_t task: 回调函数
void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
{
    for (int i = 0; i < num; i++)
    {
        // 1.创建管道
        int pipefd[2] = {0};
        int n = pipe(pipefd);
        if (n < 0)
            exit(1);

        // 2.创建子进程
        pid_t id = fork();
        if (id == 0)
        {
            // child - read
            close(pipefd[1]);
            dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
            task();             // 执行任务
            close(pipefd[0]);
            exit(0);
        }

        // 3.构建一个channel的名称
        std::string channel_name = "Channel-" + std::to_string(i);
        // 父进程 -- write
        close(pipefd[0]);
        // a. 子进程的pid b. 父进程关心的管道的w端
        channels->push_back(Channel(pipefd[1], id, channel_name));
    }
}

通过channel控制子进程

void ctrlProcessOnce(std::vector<Channel> &channels)
{
    sleep(1);
    // a.选择一个任务
    int taskcommand = SelectTask();
    // b.选择一个信道和进程
    int channel_index = NextChannel(channels.size());
    // c.发送任务
    SendTaskCommand(channels[channel_index], taskcommand);
    std::cout<< std::endl;
    std::cout << "taskcommand: " << taskcommand << " channel: "
              << channels[channel_index].GetName() << " sub process: " << channels[channel_index].GetProcessId() << std::endl;
}

void ctrlProcess(std::vector<Channel> &channels, int times = -1)
{
    if (times > 0)
    {
        while (times--)
        {
            ctrlProcessOnce(channels);
        }
    }
    else
    {
        while (true)
        {
            ctrlProcessOnce(channels);
        }
    }
}

回收管道和子进程--->关闭所有的写端、回收子进程

void CleanUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();

    }
    // 注意
    for (auto &channel : channels)
    {
        channel.Wait();
    }
}

设计主函数

// ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        std::cerr << "Usage: " << argv[0] << " processnum " << std::endl;
    }

    int num = std::stoi(argv[1]);
    // 加载任务
    LoadTask();

    // 再组织
    std::vector<Channel> channels;

    // 1. 创建信道和子进程
    CreateChannelAndSub(num, &channels, work1);

    // 2. 通过channel控制子进程
    ctrlProcess(channels, num);

    // 3. 回收管道和子进程. a. 关闭所有的写端 b. 回收子进程
    CleanUpChannel(channels);
    return 0;
}

测试结果

 

测试成功!但是我们发现在回收管道和子进程的过程中我们是先把所有的管道关闭结束后,再进行等待;那我们能不能关闭一个等待一个呢?

void CleanUpChannel(std::vector<Channel> &channels)
{
    for (auto &channel : channels)
    {
        channel.CloseChannel();
        channel.Wait();
    }
}

 

原理看下图:

 

 

 

随着子进程越来越多,那么前面管道的写端就会越来越多;

所以,如果我们关闭一个文件描述符,仅仅只是关闭了父进程的一个,但子进程继承的写端都没有关闭;所以此时这种情况,不能关一个,退一个;在管道内有一个引用计数属性,只要引用计数不为0,不会真正关闭管道,这样子进程也不会真正退出,进程就阻塞了;

  • 为什么分开关闭,先关闭完,再等待就可以了?

因为我们关闭是从上往下的,最后一个管道先释放,最后一个管道释放了,那么它曾经对应指向上一个写端也就自动关闭了,类似于递归,从上往下关,然后从下往上不断读到0的

  • 所以我们也有了新的方式关闭,我们倒着关闭就可以了

  • void CleanUpChannel(std::vector<Channel> &channels)
    {
        int num = channels.size() -1;
        while(num >= 0)
        {
            channels[num].CloseChannel();
            channels[num--].Wait();
        }
    }

  • 我就想一个一个关闭呢?

  • 因为我们是在创建子进程的时候出现了问题,所以我们要修改一下创建子进程的逻辑

    因为第二次创建管道开始,第一个管道就会多出写端,因此我们只需要在第二次创建时作出修改即可;

    因为第一次创建后已经被push_back了,所以在第二次创建的时候,可以把第一次的关闭了;同样后面创建依次如此

  • void CreateChannelAndSub(int num, std::vector<Channel> *channels, task_t task)
    {
        // BUG? --> fix bug
        for (int i = 0; i < num; i++)
        {
            // 1. 创建管道
            int pipefd[2] = {0};
            int n = pipe(pipefd);
            if (n < 0)
                exit(1);
    
            // 2. 创建子进程
            pid_t id = fork();
            if (id == 0)
            {
                if (!channels->empty())
                {
                    // 第二次之后,开始创建的管道
                    for(auto &channel : *channels) channel.CloseChannel();
                }
                // child - read
                close(pipefd[1]);
                dup2(pipefd[0], 0); // 将管道的读端,重定向到标准输入
                task();
                close(pipefd[0]);
                exit(0);
            }
    
            // 3.构建一个channel名称
            std::string channel_name = "Channel-" + std::to_string(i);
            // 父进程
            close(pipefd[0]);
            // a. 子进程的pid b. 父进程关心的管道的w端
            channels->push_back(Channel(pipefd[1], id, channel_name));
        }
    }

 

命名管道的应用——使用Client&Server通信

namedPipe.hpp

#pragma once

#include <iostream>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

const std::string comm_path = "./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096

class NamePiped
{
private:
    const std::string _fifo_path;
    int _id;
    int _fd;

    bool OpenNamedPipe(int mode)
    {
        _fd = open(_fifo_path.c_str(), mode);
        if (_fd < 0)
            return false;
        return true;
    }

public:
    NamePiped(const std::string &path, int who)
        : _fifo_path(path), _id(who), _fd(DefaultFd)
    {
        if (_id == Creater)
        {
            int res = mkfifo(_fifo_path.c_str(), 0666);
            if (res != 0)
            {
                perror("mkfifo");
            }
            std::cout << "creater create named pipe" << std::endl;
        }
    }
    bool OpenForRead()
    {
        return OpenNamedPipe(Read);
    }
    bool OpenForWrite()
    {
        return OpenNamedPipe(Write);
    }

    int ReadNamedPipe(std::string *out)
    {
        char buffer[BaseSize];
        int n = read(_fd, buffer, sizeof(buffer));
        if (n > 0)
        {
            buffer[n] = 0;
            *out = buffer;
        }
        return n;
    }

    int WriteNamedPipe(const std::string &in)
    {
        return write(_fd, in.c_str(), in.size());
    }
    
    ~NamePiped()
    {
        if (_id == Creater)
        {
            int res = unlink(_fifo_path.c_str());
            if (res != 0)
            {
                perror("unlink");
            }
            std::cout << "creater free named pipe" << std::endl;
        }
        if (_fd != DefaultFd)
            close(_fd);
    }
};
  • client.cc

#include "namedPipe.hpp"

// write
int main()
{
    NamePiped fifo(comm_path, User);
    if (fifo.OpenForWrite())
    {
        std::cout << "client open namd pipe done" << std::endl;
        while (true)
        {
            std::cout << "Please Enter> ";
            std::string message;
            std::getline(std::cin, message);
            fifo.WriteNamedPipe(message);
        }
    }

    return 0;
}

server.cc

#include "namedPipe.hpp"

// server read: 管理命名管道的整个生命周期
int main()
{
    NamePiped fifo(comm_path, Creater);
    // 对于读端而言,如果我们打开文件,但是写还没来,我会阻塞在open调用中,直到对方打开
    // 进程同步
    if (fifo.OpenForRead())
    {
        std::cout << "server open named pipe done" << std::endl;

        sleep(3);
        while (true)
        {
            std::string message;
            int n = fifo.ReadNamedPipe(&message);
            if (n > 0)
            {
                std::cout << "Client Say> " << message << std::endl;
            }
            else if(n == 0)
            {
                std::cout << "Client quit, Server Too!" << std::endl;
                break;
            }
            else
            {
                std::cout << "fifo.ReadNamedPipe Error" << std::endl;
                break;
            }
        }
    }

    return 0;
}


http://www.kler.cn/a/374128.html

相关文章:

  • QTreeWidget使用记录(2)
  • Redis特性和应用场景以及安装
  • 新材料产业数据管理:KPaaS平台的创新驱动
  • 【问题解决】pnpm : 无法将“pnpm”项识别为 cmdlet、函数、脚本文件或可运行程序的名称。
  • docker Desktop开启远程访问端口
  • 社区交流系统设计与实现
  • 手机收银云进销存管理软件,商品档案Excel格式批量导入导出,一键导入Excel的商品档案
  • 跨可用区的集群k8s的基本操作和配置理解
  • 【开源免费】基于SpringBoot+Vue.JS网上订餐系统(JAVA毕业设计)
  • SQL 通用数据类型
  • 【数据库设计】规范设计理论之数据依赖的公理系统(1)
  • 百数功能更新——表单提交支持跳转到外部链接并支持传参
  • ssm基于WEB的人事档案管理系统的设计与实现+jsp
  • 【c++ gtest】使用谷歌提供的gtest和抖音豆包提供的AI大模型来对代码中的函数进行测试
  • WPF自定义日历控件Calendar 的方法
  • STM32F103C8T6学习笔记2--LED流水灯与蜂鸣器
  • 树莓派5调取本地视频
  • Spring Boot框架:校园社团信息管理的新选择
  • cmake编译时arch=compute_32,code=sm_32 -gencode 的含义
  • Java面试经典 150 题.P274. H 指数(011)
  • 【Hive sql面试题】找出连续活跃3天及以上的用户
  • 用示波器如何调方波?
  • GitHub个人主页美化
  • 【Paper Note】利用Boundary-aware Attention边界感知注意力机制增强部分伪造音频定位
  • Java | Leetcode Java题解之第523题连续的子数组和
  • linux之netlink 内核源码分析