当前位置: 首页 > article >正文

【Linux后端服务器开发】管道设计

目录

一、管道通信

二、匿名管道

1. 匿名管道通信

2. 匿名管道设计

三、命名管道

comm.hpp

client.cc

serve.cc


一、管道通信

进程通信

数据传输:一个进程需要将它的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源

通知事件:一个进程向另一个(一组)进程发送信息,通知它们发生了某种事件

进程控制:一个进程完全控制另一个进程的执行,如debug

通信本质

OS直接或间接给通信双方提供内存空间

通信的进程双方,能够读取到一份公共资源

管道文件是内存级文件

管道创建

 

管道读写端

  • 如果管道没有了数据,读端在读,默认会阻塞等待正在读取的进程
  • 管道是固定大小的空间,写端写满的时候,会阻塞等待读端读取
  • 写端关闭,读端在读,则读端读完管道数据也关闭
  • 读端关闭,写端在写,OS终止写端

int main() 
{
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);

    cout << "fds[0]: " << fds[0] << endl;
    cout << "fds[1]: " << fds[1] << endl;

    return 0;
}

管道可用于父子、兄弟、祖孙进程之间通信

二、匿名管道

1. 匿名管道通信

#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;

int main() 
{
    // 1. 创建管道文件,打开读写端
    int fds[2];
    int n = pipe(fds);
    assert(n == 0);

    // 2. fork()
    pid_t id = fork();
    if (id == 0) 
    {
        //子进程通信代码
        close(fds[0]);  //关闭读端
        const char* s = "我是子进程,我正在给你发消息";
        int cnt = 0;
        while (true) 
        {
            char buffer[1024];
            snprintf(buffer, sizeof(buffer), "child->parent say: %s[%d][%d]", s, cnt++, getpid());
            write(fds[1], buffer, strlen(buffer));
            sleep(1);
        }
        exit(0);
    }

    //父进程通信代码
    close(fds[1]);  //关闭写端
    while (true) 
    {
        char buffer[1024];
        ssize_t s = read(fds[0], buffer, sizeof(buffer) - 1);
        if (s > 0) 
            buffer[s] = 0;
        cout << "Get Message# " << getpid() << buffer  << " | my pid: " << getpid() << endl;
        //父进程没有sleep
    }
    n = waitpid(id, nullptr, 0);
    assert(n == id);

    return 0;
}

2. 匿名管道设计

  • 将任务存入存入任务表vector中,将子进程存入信息存入subs容器中
  • 父进程随机将任务码发给5个子进程,子进程读取任务码完成任务(父进程为写端,子进程为读端),若子进程未读取任务,则进行阻塞等待
  • 通过随机数分配任务给随机子进程,让子进程负载均衡

代码设计流程:

  1. 建立任务表,建立子进程及和子进程通信的信道
  2. 父进程控制子进程,进行任务分配
  3. 回收子进程信息
#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cassert>
#include <cstring>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <vector>
using namespace std;

#define MakeSeed() srand((unsigned int)time(nullptr) ^ getpid())
#define PROCESS_NUM 5
#define TASK_NUM 10

typedef void(*func_t)();

void DownLoad() 
{
    cout << getpid() << " DownLoad Task" << endl;
    sleep(1);
}

void IOTask() 
{
    cout << getpid() << " IO Task" << endl;
    sleep(1);
}

void FlushTask() 
{
    cout << getpid() << " Flush Task" << endl;
}

void LoadTaskFunc(vector<func_t>& funcMap) 
{
    funcMap.push_back(DownLoad);
    funcMap.push_back(IOTask);
    funcMap.push_back(FlushTask);
}

/

class subEp 
{
public:
    subEp(pid_t subID, int writeFd)
        : _subID(subID), _writeFd(writeFd) 
    {
        char nameBuffer[1024];
        snprintf(nameBuffer, sizeof(nameBuffer), "process-%d[pid(%d)-fd(%d)]", num++, subID, _writeFd);
        _name = nameBuffer;
    }

    static int num;
    string _name;
    pid_t _subID;
    int _writeFd;
};

int subEp::num = 0;

// 读取
int RecvTask(int readFd) 
{
    int code = 0;
    ssize_t s = read(readFd, &code, sizeof(code));
    if (s == sizeof(int)) 
        return code;
    else if (s <= 0) 
        return -1;
    else 
        return 0;
}

void CreateSubProcess(vector<subEp>& subs, vector<func_t>& funcMap) 
{
    vector<int> deleteFd;
    for (int i = 0; i < PROCESS_NUM; ++i) 
    {
        int fds[2];
        int n = pipe(fds);
        assert(n == 0);
        (void) n;

        pid_t id = fork();
        if (id == 0) 
        {
            //建立一对一管道
            for (int i = 0; i < deleteFd.size(); ++i) 
                close(deleteFd[i]);
            //子进程处理任务
            close(fds[1]);
            while (true) 
            {
                // 1. 获取任务码,如果没收到任务码,则阻塞等待
                int commandCode = RecvTask(fds[0]);
                // 2. 执行任务
                if (commandCode >= 0 && commandCode < funcMap.size()) 
                    funcMap[commandCode]();
                else 
                    break;
            }
            exit(0);
        }
        close(fds[0]);
        subEp sub(id, fds[1]);
        subs.push_back(sub);
        deleteFd.push_back(fds[1]);
    }
}

void SendTask(const subEp& process, int taskNum) 
{
    cout << "send task num " << taskNum << " to " << process._name << endl;
    int n = write(process._writeFd, &taskNum, sizeof(taskNum));
    assert(n == sizeof(int));
    (void)n;
}

void LoadBlanceContrl(const vector<subEp>& subs, vector<func_t>& funcMap, int count) 
{
    int procNum = subs.size();
    int taskNum = funcMap.size();
    while (count--) 
    {
        int subIDx = rand() % procNum;
        int taskIDx = rand() % taskNum;
        SendTask(subs[subIDx], taskIDx);
        sleep(1);
    }
    for (int i = 0; i < procNum; ++i) 
        close(subs[i]._writeFd);
}

void WaitProcess(vector<subEp>& subs) 
{
    int procNum = subs.size();
    for (int i = 0; i < procNum; ++i) 
    {
        waitpid(subs[i]._subID, nullptr, 0);
        cout << "wait sub process success ... " << subs[i]._subID << endl;
    }
}

int main() 
{
    MakeSeed();
    // 1. 建立子进程并建立和子进程通信的信道
    //    [子进程id,wfd]
    vector<func_t> funcMap;
    LoadTaskFunc(funcMap);
    vector<subEp> subs;
    CreateSubProcess(subs, funcMap);

    // 2. 父进程控制子进程
    LoadBlanceContrl(subs, funcMap, TASK_NUM);

    // 3. 回收子进程信息
    WaitProcess(subs);

    return 0;
}

三、命名管道

让不同进程打开指定名称(路径+文件名)的同一个文件

comm.hpp

#pragma once

#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <string>
#include <cstring>
#include <cerrno>
#include <cassert>
#include <unistd.h>
#include <fcntl.h>
#include <fstream>

#define NAMED_PIPE "/root/test/pipe/named_pipe"

bool CreateFilo(const std::string& path) 
{
    umask(0);
    int n = mkfifo(path.c_str(), 0666);
    if (n == 0) 
    {
        return true;
    }
    else 
    {
        std::cout << "errno: " << errno << "err string: " << strerror(errno) << std::endl;
        return false;
    }
}

void RemoveFifo(const std::string& path) 
{
    int n = unlink(path.c_str());
    assert(n == 0);     //意料之中用assert判断,意料之外用 if else
    (void)n;
}

client.cc

#include "comm.hpp"

int main() 
{
    std::cout << "client begin" << std::endl;
    int wfd = open(NAMED_PIPE, O_WRONLY, 0666);
    std::cout << "client end" << std::endl;
    if (wfd < 0) 
        exit(1);

    //write
    char buffer[1024];
    while (true) 
    {
        std::cout << "please say# ";
        fgets(buffer, sizeof(buffer), stdin);
        if (strlen(buffer) > 0) 
            buffer[strlen(buffer) - 1] = 0;        
        ssize_t n = write(wfd, buffer, strlen(buffer));
        assert(n == strlen(buffer));
        (void)n;
    }

    close(wfd);
    return 0;
}

serve.cc

#include "comm.hpp"

int main() 
{
    bool r = CreateFilo(NAMED_PIPE);
    assert(r);
    (void)r;

    std::cout << "serve begin" << std::endl;
    int rfd = open(NAMED_PIPE, O_RDONLY);
    std::cout << "serve end" << std::endl;
    if (rfd < 0) 
        exit(1);

    //read
    char buffer[1024];
    while (true) 
    {
        ssize_t s = read(rfd, buffer, sizeof(buffer) - 1);
        if (s > 0) 
        {
            std::cout << "client->serve# " << buffer << std::endl;
        }
        else 
        {
            perror("read");
            exit(1);
        }
    }

    close(rfd);

    RemoveFifo(NAMED_PIPE);
    return 0;
}


http://www.kler.cn/a/37277.html

相关文章:

  • 2023大数据面试总结
  • 【C#】并行编程实战:同步原语(1)
  • jquery html特殊字符反转义,JS - 实现HTML标签的转义、反转义的几种方法
  • Boundless Hackathon @Stanford 主题黑客松活动闭幕,一文回顾
  • 解决小程序 scroll-view 里面的image有间距、小程序里面的图片之间有空隙的问题。
  • 自然语言处理从入门到应用——LangChain:快速入门-[链(Chains)、代理(Agent:)和内存(Memory)]
  • 二维码识别 OCR 原理及如何应用于物流和仓储管理中
  • “简单易懂的排序:深入了解直接选择排序“
  • msvcp140.dll是什么文件?如何修复丢失的msvcp140.dll文件
  • 【C++】STL之string功能及模拟实现
  • vue中给数字新增四舍五入属性
  • docker-compose常用模板
  • 从小白开始学习CAD(一)
  • MySQL常用命令1
  • vscode 端口转发实现端口映射,实现端口自由
  • Leetcode 43 字符串相乘
  • xShell中使用vim编辑时,无法粘贴外来文本
  • [C++] C++特殊类设计 以及 单例模式:设计无法拷贝、只能在堆上创建、只能在栈上创建、不能继承的类, 单例模式以及饿汉与懒汉的场景...
  • VMware ESXi 7.0 U3n macOS Unlocker OEM BIOS 集成网卡驱动和 NVMe 驱动 (集成驱动版)
  • Qt5.15.2 Webassembly源码裁剪编译