当前位置: 首页 > article >正文

Linux 匿名管道实现进程池

文章目录

  • 🚀 基于Linux管道通信的进程池设计与实现
    • 一、项目概述
    • 二、完整代码实现
      • 1. 📁 任务定义头文件(Task.h)
      • 2. 💻 主程序(ProcessPool.cc)
    • 三、🔧 关键技术解析
      • 1. 管道生命周期管理
      • 2. 进程池初始化流程
      • 3. 任务分发机制

在这里插入图片描述

🚀 基于Linux管道通信的进程池设计与实现


一、项目概述

本文实现了一个基于匿名管道的多进程任务处理系统,核心功能包括:

  • 进程池管理:动态创建/销毁子进程 🛠️
  • 任务分发:通过管道发送控制指令 📨
  • 资源回收:自动清理僵尸进程 ♻️
  • 错误处理:管道断裂信号处理 🛑

二、完整代码实现

1. 📁 任务定义头文件(Task.h)

#pragma once
#include <iostream>
#include <functional>
#include <vector>

using func = std::function<void()>;

// 🏭 任务工厂函数
func createLogTask() {
    return []{ 
        std::cout << "🪵 [LOG] Updating system logs" << std::endl;
    };
}

func createRenderTask() {
    return []{ 
        std::cout << "🖥️ [RENDER] Refreshing display" << std::endl;
    };
}

// 📥 加载任务到全局容器
void loadTasks(std::vector<func>* tasks) {
    tasks->push_back(createLogTask());
    tasks->push_back(createRenderTask());
}

2. 💻 主程序(ProcessPool.cc)

#include "Task.h"
#include <unistd.h>
#include <vector>
#include <sys/wait.h>
#include <signal.h>

const int WORKER_NUM = 4;
std::vector<func> g_tasks;

// 📡 通信管道封装
struct Channel {
    int write_fd;       // 父进程写端
    pid_t pid;          // 子进程PID
    std::string name;
    
    Channel(int fd, pid_t p, const std::string& n)
        : write_fd(fd), pid(p), name(n) {}
};

// 🔄 子进程工作循环
void workerMain() {
    while(true) {
        int cmd = 0;
        ssize_t n = read(STDIN_FILENO, &cmd, sizeof(cmd));
        
        if(n <= 0) { // 管道关闭或错误
            std::cerr << "🔌 Worker " << getpid() << " exiting" << std::endl;
            break;
        }
        
        if(cmd >= 0 && cmd < g_tasks.size()) {
            std::cout << "👷 Worker " << getpid() 
                     << " executing task " << cmd << std::endl;
            g_tasks[cmd](); // 执行注册任务
        }
    }
    exit(EXIT_SUCCESS);
}

// 🏗️ 初始化进程池
std::vector<Channel> createWorkers() {
    std::vector<Channel> workers;
    std::vector<int> old_fds;

    for(int i=0; i<WORKER_NUM; ++i) {
        int pipefd[2];
        pipe(pipefd); // 创建新管道

        pid_t pid = fork();
        if(pid == 0) { // 子进程
            for(int fd : old_fds) close(fd); // 关闭旧管道
            close(pipefd[1]); // 关闭写端
            
            dup2(pipefd[0], STDIN_FILENO); // 重定向标准输入
            close(pipefd[0]);
            
            workerMain(); // 永不返回
        }
        
        // 父进程
        close(pipefd[0]); // 关闭读端
        workers.emplace_back(pipefd[1], pid, "worker-"+std::to_string(i));
        old_fds.push_back(pipefd[1]);
    }
    return workers;
}

// 📨 发送控制指令
void dispatchTasks(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        int cmd = rand() % g_tasks.size();
        if(write(w.write_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
            perror("❌ Write task failed");
        }
    }
}

// 🧹 清理资源
void cleanupWorkers(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        close(w.write_fd);
        waitpid(w.pid, nullptr, 0);
    }
}

int main() {
    signal(SIGPIPE, SIG_IGN); // 忽略管道断裂信号
    loadTasks(&g_tasks);
    
    auto workers = createWorkers();
    dispatchTasks(workers);
    
    sleep(1); // 等待任务完成
    cleanupWorkers(workers);
    return 0;
}

三、🔧 关键技术解析

1. 管道生命周期管理

// 👨💻 父进程
int pipefd[2];
pipe(pipefd);
close(pipefd[0]); // 只保留写端

// 👶 子进程
close(pipefd[1]);
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);

设计要点

  • 每个Worker独占一个管道 🚪
  • 父进程持有所有写端文件描述符 🔑
  • 子进程通过标准输入读取指令 📥

2. 进程池初始化流程

graph TD
    A[主进程] --> B[🛠️ 创建管道1]
    B --> C[👶 fork子进程1]
    C --> D[🔀 重定向输入]
    D --> E[🔄 进入工作循环]
    A --> F[🛠️ 创建管道2]
    F --> G[👶 fork子进程2]
    G --> H[🗑️ 关闭旧管道]

3. 任务分发机制

void dispatchTasks(const std::vector<Channel>& workers) {
    for(const auto& w : workers) {
        int cmd = rand() % g_tasks.size();
        write(w.write_fd, &cmd, sizeof(cmd));
    }
}

工作流程

  1. 🎲 随机选择任务编号
  2. 📨 通过管道发送整型指令
  3. 👷 Worker解析指令执行对应任务


http://www.kler.cn/a/585169.html

相关文章:

  • 【webrtc debug tools】 rtc_event_log_to_text
  • 容器技术与Kubernetes概述
  • Ai文章改写出来的文章,怎么过Ai检测?控制指令,测试的一点心得,彻底疯了!
  • Python:面向对象,类和对象,实例方法与实例属性,构造函数
  • ARM:什么是满减栈?为何选择满减栈?
  • 【零基础入门unity游戏开发——unity3D篇】3D物理系统之 —— 3D刚体组件Rigidbody
  • 一些docker命令
  • 微服务全局ID方案汇总
  • LeetCode860☞柠檬水找零
  • 有关Java中的注解和反射
  • 详解数据库范式
  • 《Java对象“比武场“:Comparable与Comparator的巅峰对决》
  • app.config.globalProperties
  • 语言识别模型whisper学习笔记
  • Odoo Http鉴权+调用后端接口
  • 爱普生车规级晶振SG2520CAA智能汽车电子系统的应用
  • JavaScript 函数基础
  • 【区块链】btc
  • HTTP 各版本协议简介
  • 交易所开发:数字市场的核心动力