Linux 匿名管道实现进程池
文章目录
- 🚀 基于Linux管道通信的进程池设计与实现
- 一、项目概述
- 二、完整代码实现
- 1. 📁 任务定义头文件(Task.h)
- 2. 💻 主程序(ProcessPool.cc)
- 三、🔧 关键技术解析
- 1. 管道生命周期管理
- 2. 进程池初始化流程
- 3. 任务分发机制
🚀 基于Linux管道通信的进程池设计与实现
一、项目概述
本文实现了一个基于匿名管道的多进程任务处理系统,核心功能包括:
- 进程池管理:动态创建/销毁子进程 🛠️
- 任务分发:通过管道发送控制指令 📨
- 资源回收:自动清理僵尸进程 ♻️
- 错误处理:管道断裂信号处理 🛑
二、完整代码实现
1. 📁 任务定义头文件(Task.h)
#pragma once
#include <iostream>
#include <functional>
#include <vector>
using func = std::function<void()>;
// 🏭 任务工厂函数
func createLogTask() {
return []{
std::cout << "🪵 [LOG] Updating system logs" << std::endl;
};
}
func createRenderTask() {
return []{
std::cout << "🖥️ [RENDER] Refreshing display" << std::endl;
};
}
// 📥 加载任务到全局容器
void loadTasks(std::vector<func>* tasks) {
tasks->push_back(createLogTask());
tasks->push_back(createRenderTask());
}
2. 💻 主程序(ProcessPool.cc)
#include "Task.h"
#include <unistd.h>
#include <vector>
#include <sys/wait.h>
#include <signal.h>
const int WORKER_NUM = 4;
std::vector<func> g_tasks;
// 📡 通信管道封装
struct Channel {
int write_fd; // 父进程写端
pid_t pid; // 子进程PID
std::string name;
Channel(int fd, pid_t p, const std::string& n)
: write_fd(fd), pid(p), name(n) {}
};
// 🔄 子进程工作循环
void workerMain() {
while(true) {
int cmd = 0;
ssize_t n = read(STDIN_FILENO, &cmd, sizeof(cmd));
if(n <= 0) { // 管道关闭或错误
std::cerr << "🔌 Worker " << getpid() << " exiting" << std::endl;
break;
}
if(cmd >= 0 && cmd < g_tasks.size()) {
std::cout << "👷 Worker " << getpid()
<< " executing task " << cmd << std::endl;
g_tasks[cmd](); // 执行注册任务
}
}
exit(EXIT_SUCCESS);
}
// 🏗️ 初始化进程池
std::vector<Channel> createWorkers() {
std::vector<Channel> workers;
std::vector<int> old_fds;
for(int i=0; i<WORKER_NUM; ++i) {
int pipefd[2];
pipe(pipefd); // 创建新管道
pid_t pid = fork();
if(pid == 0) { // 子进程
for(int fd : old_fds) close(fd); // 关闭旧管道
close(pipefd[1]); // 关闭写端
dup2(pipefd[0], STDIN_FILENO); // 重定向标准输入
close(pipefd[0]);
workerMain(); // 永不返回
}
// 父进程
close(pipefd[0]); // 关闭读端
workers.emplace_back(pipefd[1], pid, "worker-"+std::to_string(i));
old_fds.push_back(pipefd[1]);
}
return workers;
}
// 📨 发送控制指令
void dispatchTasks(const std::vector<Channel>& workers) {
for(const auto& w : workers) {
int cmd = rand() % g_tasks.size();
if(write(w.write_fd, &cmd, sizeof(cmd)) != sizeof(cmd)) {
perror("❌ Write task failed");
}
}
}
// 🧹 清理资源
void cleanupWorkers(const std::vector<Channel>& workers) {
for(const auto& w : workers) {
close(w.write_fd);
waitpid(w.pid, nullptr, 0);
}
}
int main() {
signal(SIGPIPE, SIG_IGN); // 忽略管道断裂信号
loadTasks(&g_tasks);
auto workers = createWorkers();
dispatchTasks(workers);
sleep(1); // 等待任务完成
cleanupWorkers(workers);
return 0;
}
三、🔧 关键技术解析
1. 管道生命周期管理
// 👨💻 父进程
int pipefd[2];
pipe(pipefd);
close(pipefd[0]); // 只保留写端
// 👶 子进程
close(pipefd[1]);
dup2(pipefd[0], STDIN_FILENO);
close(pipefd[0]);
设计要点:
- 每个Worker独占一个管道 🚪
- 父进程持有所有写端文件描述符 🔑
- 子进程通过标准输入读取指令 📥
2. 进程池初始化流程
graph TD
A[主进程] --> B[🛠️ 创建管道1]
B --> C[👶 fork子进程1]
C --> D[🔀 重定向输入]
D --> E[🔄 进入工作循环]
A --> F[🛠️ 创建管道2]
F --> G[👶 fork子进程2]
G --> H[🗑️ 关闭旧管道]
3. 任务分发机制
void dispatchTasks(const std::vector<Channel>& workers) {
for(const auto& w : workers) {
int cmd = rand() % g_tasks.size();
write(w.write_fd, &cmd, sizeof(cmd));
}
}
工作流程:
- 🎲 随机选择任务编号
- 📨 通过管道发送整型指令
- 👷 Worker解析指令执行对应任务