Linux·进程间通讯(管道)
从本节开始将使用vscode写代码,语言也切换成C++,同时OS从centOS换成ubentu。
进程之间可能存在数据传输、资源共享、通知事件、进程控制等需求,但是进程又具有独立性,所以就需要专门的进程间通讯技术(ipc)来满足需求。进程间通讯(IPC)的原理很简单,就是让不同的进程能看到同一份资源,但是单个进程申请空间肯定只有自己能看到,因此提公共供资源的人必须是操作系统。
如果这个公共资源是以文件的形式提供就叫管道,如果以内存块的形式提供就叫共享内存,如果以队列的形式提供一个个数据块就叫消息队列,如果是计数器就叫信号量。
进程间通讯有两种方案,本地通讯和网络通讯,网络通讯后面说。而本地通讯就是同一个主机,同一个操作系统不同进程之间的通讯,本地通讯的标准叫systemV,网络和多线程的标准叫posix标准。还有一种管道通讯方式,不属于systemV标准。
1. 管道 pipe
管道是Linux系统的前身Unix中最古老的的进程间通讯的形式,我们把从一个进程连接到另一个进程的一个数据流称为一个"管道"。
绿色行命令结尾的 & 符号表示转到后台运行。
我们可以发现,通过管道连接的多个进程之间是兄弟关系,也就是说在管道加持下两个命令会变成两个进程,也就是说管道帮助了这两个进程间进行通讯了。
1.1 匿名管道
父进程创建子进程时,子进程会把文件描述符表拷贝,把struct file拷贝。再向右属于文件部分,与进程部分解耦,因此父进程和子进程会同时指向同一份inode和文件内核缓冲区。此时不同的进程就看到了同一份资源。
但是现在相当于再磁盘级别的文件中进行读写通讯,它们如果同时向里写一定会出现乱序,同时刷新到磁盘也要有消耗,因此管道的发明者在文件系统的原理上将磁盘断开了,发明一种纯内存级别的文件系统,这个系统就叫管道。
管道的实现原理如下
父进程以读方式打开一次文件,再以写方式打开一次同一个文件,然后fork创建子进程,此时子进程继承了父进程的文件描述符表,因此也同样在读和写同一个文件,之后再关闭父进程的读方式和子进程的写方式,或反过来也行。
这样就能完成一个进程向文件中写,一个文件从文件中读的方法,实现进程间的通讯,但是这种通讯必须只能是单向通讯。
这里我们不能以wr读写方式让父进程打开文件,因为子进程也会继承wr之后没办法关闭单种操作方式,也就形成不了单向通讯了。
最开始进程间通讯的需求只有单项通讯,因此内核设计者就用最简单的方案实现了进程间通讯,并将这种通讯方案命名为pipe管道。
这种内核级文件跟磁盘没关系,不用创建文件名和路径,因此这个文件不需要名字,因此叫它匿名管道。
1.2 管道接口
pipe 创建管道
指令 man 2 pipe 查看
这个系统接口的作用就是创建管道
参数pipefd是一个输出型参数,两位元素的数组,会把打开文件的读写端带出来。其中下标为0的元素是管道的读端,下标为1的元素是管道的写端。
返回值,创建成功返回0,创建失败返回-1并设置错误码
可以看到这里创建管道之前,我们先搞上一个数组用来接收传出来的参数。
同时还可以看到我下面用的打印接口时cerr而不是cout,我们知道cout是打印在fd=1的位置,cerr是打印在fd=2的位置,之后我们可以通过重定向,将正确信息和错误信息分开提取。
现在管道创建好了,下一步就是创建子进程了,今天我们想创建的任务是子进程去写入,父进程去读取。
到这里我们让父子进程同时看到了管道文件,并关闭了子进程的读端,和父进程的写端,下一步就是进行进程间通讯了。
代码逻辑很简单,子进程通过管道的写入口向管道中写不断更新的字符串,父进程只读管道文件中的字符串并打印出来。
可以看到成功实现了父子进程之间的通讯。
仔细看我的代码,可以看到子进程写完一次信息之后会等1秒,但是父进程读信息却是没有休眠的,但是结果却显示父进程确实间隔一秒打印一次,那在这1秒中,父进程在干嘛?
很明显,父进程在阻塞。
当两个进程在同时操作一份资源的时候,有可能这个进程还没写完呢,另一个进程就读走了,这种情况叫数据不一致问题,也叫进程安全问题。因此在数据共享的时候,我们要想办法把共享的资源保护起来,这种被保护起来的资源叫临界资源。
之所以我们没有刻意的保护但是确实能看到父进程在没有管道中数据时阻塞的结果,是因为管道内部自己做了,也就是说这是read和pipe系统调用帮我们把管道文件保护起来了。
管道文件也是有大小的,ubentu系统下管道的大小时64KB,也就是说,当向管道中写入64KB大小的文件后管道就满了,此时子进程就会阻塞等待管道中的内容被读走再写入。
管道四现象
1. 管道为空&&管道正常,read会阻塞
2. 管道为满&&管道正常,write会阻塞
3. 管道写端关闭&&读端继续,读端读到0,表示读到文件结尾
4. 管道写端正常&&读端关闭,OS会直接杀掉写入的进程,杀进程的信号是13
匿名管道五特性
1. 面向字节流,在读管道资源内容的时候不考虑写了多少次,直接需要读多少就直接读走
2. 具有血缘关系的的进程进行IPC,常用于父子
3. 文件的声明周期随进程,管道也是。
4. 管道只进行单向数据通讯,如果有双向需要,建立两个管道
5. 管道自带同步、互斥等对共享资源的保护机制,多个执行流同一时刻只允许一个执行流访问管道资源
1.3 管道的应用 进程池
如上图,一个进程通过管道向多个预先创建好的子进程发送命令,子进程的任务就是获取命令然后命令行解析并执行。这种预先把进程创建出来,然后需要什么任务就让子进程执行什么任务,这种结构就叫进程池,负责进行任务派发的进程又叫master进程,负责工作的进程又叫worker/slaver进程。
像这种管道命令我们就能明白它在干什么了,将前一条命令的输出重定向到管道的写端,把后一条命令的输入重定向到管道的读端
下面我们着手模拟一下内存池。
1.3.1 通用makefile写法
第一步我们先搞一个makefile出来
红框标出的所有内容是我们在定义变量,方便后面的编译链接阶段中不用任何有关本次代码的内容,这样一来这个makefile就是通用的了,之后可以拿去到任何编译链接的场景中用,只需要改一下这些变量就可以了。
所有的变量名都是可以自定义的,这里我用的比较通用的风格来写的。
BIN 就是我们未来要形成的可执行文件名
CC 是选择的编译器
FLAGS 是选择的编译方式,-c编译成.o文件,-Wall warning all 全部错误都报警
LDFLAGS 选择的链接方案 -o
SRC 编译使用的源文件,这里有两种写法,都能达到当前目录的所有 .cc 文件,更推荐第二种,也就是没注释掉的那一种,wildcard是makefile内置的一个函数就是用于罗列当前目录的某个文件。
OBJ 链接依赖的.o文件,直接借用SRC变量的内容,替换一下结尾就好了。
下面就是编译链接的过程,直接可以使用前面定义好的变量
$@ 目标文件 $^依赖文件列表 用依赖文件列表中的所有内容直接连接形成目标文件
%.o:%.c 将当前目录下的所有 .cc 文件依次展开形成同名 .o 文件 $< 的含义就是将依赖列表中的展开后的多个文件一个一个的拿出来 通过编译器去一个一个的处理。
最后clean就不用解释了,清理一下.o和可执行文件。
至此,我们完成了一个通用的makefile文件。
1.3.2 进程池
进程池初始化
对于worker进程来讲,每人都只有一个pipe的读端要看,但是对于master,需要管理多个pipe的写端,因此我们要对pipe进行先描述再组织的管理。
我们将单个管道资源描述为Channel(频道)类,类中记录该对象的写端入口_wfd 和 管道名称,_who中记录这个管道是给哪个子进程用的。
主函数这里我们采用命令行参数列表,让用户自定义要在进程池中先创建出几个进程,如果用户输入的命令行参数不是2个,说明用户不会用这个程序,那我们就打印使用提示Usage()。
接下来我们把框架画出来
跟前面的思路差不多,侠士根据用户需求创建循环次数,每一次循环创建一个管道和子进程,同时子进程关闭写端,父进程关闭读端,同时将子进程中预留出Worker()函数用于子进程接收工作命令。
但是写到这里就出现了一个问题,此时每一个循环都会创建一个pipe但是到下一个循环pipe就会被回收,因此我们需要把管道整合管理起来,防止pipe管道变成一次性的。
我们创建一个顺序表,并在每次循环执行到父进程时将管道的写端和子进程pid记录,防止管道资源被释放。
接下来我们对代码做一个整体的重构,引入枚举错误码,以及包装子进程执行的方法
使用函数包装器重命名出一个类型 work_t 包装一个void()类型函数,将来这个函数方法就是子进程的任务方法。
我们将子进程的0号文件描述符也就是标准输入描述符,dup2重定向到管道的写入端,之后子进程获取任务就可以直接从0号文件描述符获取。
如此做是为了解耦子进程和管道,便于子进程获取各种类型的任务
也想要子进程执行哪种任务就初始化哪种任务的名称,在初始化进程池函数中是不知道子进程要执行什么任务的,只有这里的参数可以指明子进程要执行哪种任务,通过函数包装器和文件描述符替换重定向,完成子进程和管道的完全解耦。
派发任务
现在我们完成了进程池的初始化,接下来我们要考虑如何派发任务。
我们规定,父进程向子进程派发任务码的时候是一个int一个int派发的,也就是说一个整形代表一个任务,同时可以发现一个整形是4个字节,也就是说父进程是4个字节为单位派发任务码的,那子进程也要以4个字节的单位接收任务码来获取任务。这种操作方案从网络的角度讲叫定址协议。
在派发任务的时候要保证每个子进程的任务量差不多,这个工作叫负载均衡。
负载均衡的实现方式可以选择:1.轮询 2.随机 3.Channel中记录历史任务数,下面我们选择轮询方案。
新起一个记录任务的文件,hpp后缀是可以把方法和实现都写在一起的头文件,一般用于开源代码。
此时master就将任务发送给了对应的子进程管道中了,下一步就是子进程获取任务。
此时进程池就可以跑起来了,我在派发任务的模块中新增了一点派发提示打印
这里可以看到master在派发随机任务,并采用轮询调度的方式发给每一个子进程。
退出进程池
管道有一个特性,如果写端关闭,那读端就会认为读到了0(文件结尾),那此时让子进程退出就好了
这个Close函数就是关闭写端入口。
完整代码
Task.hpp
#pragma once
#include <iostream>
#include <unordered_map>
#include <functional>
#include <ctime>
using task_t = std::function<void()>;
static int number = 0;
void DownLoad()
{
std::cout << "下载任务" << std::endl;
}
void Log()
{
std::cout << "日志任务" << std::endl;
}
void Sql()
{
std::cout << "数据库同步任务" << std::endl;
}
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
InsertTask(DownLoad);
InsertTask(Log);
InsertTask(Sql);
}
void InsertTask(task_t t)
{
tasks[number++] = t;
}
int SelectTask()
{
return rand() % number;
}
void Excute(int number)
{
if (tasks.find(number) == tasks.end()) // 没找到方法
return;
tasks[number]();
}
private:
std::unordered_map<int, task_t> tasks;
};
TaskManager TM;
void Worker()
{
// 从标准输入读 read->0
while (true)
{
int cmd = 0;
int n = ::read(0, &cmd, sizeof(cmd));
if (n == sizeof(cmd))
{
TM.Excute(cmd);
}
else if (n == 0)
{
std::cout << "pid: " << getpid() << " quit..." << std::endl;
break;
}
else
{
}
}
}
Channel.hpp
#ifndef ___CHANNEL_HPP__
#define ___CHANNEL_HPP__
#include <iostream>
#include <string>
#include <unistd.h>
class Channel
{
public:
Channel(int wfd, int who)
: _wfd(wfd), _who(who)
{
// Channel-3-1234
_name = "Channel-" + std::to_string(wfd) + "-" + std::to_string(who);
}
std::string Name()
{
return _name;
}
void Send(int cmd)
{
::write(_wfd, &cmd, sizeof(cmd));
}
void Close() // 关闭写端,结束子进程
{
::close(_wfd);
}
pid_t Id()
{
return _who;
}
int Wfd()
{
return _wfd;
}
~Channel()
{
}
private:
int _wfd; // 写端入口
std::string _name;
pid_t _who;
};
#endif
Procpool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <functional>
#include "Task.hpp"
#include "Channel.hpp"
using work_t = std::function<void()>;
enum
{
UsageError = 1,
PipeError,
ForkError
};
class ProcPool
{
public:
// 构造进程池
ProcPool(int n, work_t w)
: processnum(n), work(w)
{
}
// channels为输出型参数
// work_t work 回调
int InitprocessPool() // 初始化进程池
{
for (int i = 0; i < processnum; i++)
{
// 先创建管道,连接好父进程之后再创建子进程
int pipefd[2] = {0};
int n = pipe(pipefd);
if (n < 0) // 创建管道失败
return PipeError;
// 创建管道成功创建子进程
pid_t id = fork();
if (id < 0) // 创建子进程失败
return ForkError;
else if (id == 0) // 子进程
{
// 关闭历史wfd
std::cout << getpid() << ", child close history fd: ";
for (auto &c : channels)
{
std::cout << c.Wfd() << " ";
c.Close();
}
std::cout << " over" << std::endl;
::close(pipefd[1]); // 子进程关闭写
// 重定向Worker的标准输入到管道输入端
dup2(pipefd[0], 0);
work(); // 子进程进行工作
exit(0); // 子进程工作做完直接结束
}
else // 父进程
{
::close(pipefd[0]); // 父进程关闭读
// Channel ch(pipefd[1], id); //将写端记录
// channels.push_back(ch);
channels.emplace_back(pipefd[1], id);
}
}
return 0;
}
// 派发任务
void DispatchTask()
{
// 派发任务
int who = 0;
int tasknum = 20;
while (tasknum--)
{
// 选择一个任务(整数)
int task = TM.SelectTask();
// 选择一个子进程chennal
Channel &curr = channels[who++];
who %= channels.size();
std::cout << "###############" << std::endl;
std::cout << "send " << task << " to" << curr.Name() << "任务还剩: " << tasknum << std::endl;
std::cout << "###############" << std::endl;
// 派发任务
curr.Send(task);
sleep(3);
}
}
void CleanProcPool()
{
// 退出进程池
for (auto &c : channels)
{
c.Close();
pid_t rid = ::waitpid(c.Id(), nullptr, 0);
if (rid > 0)
{
std::cout << "child " << rid << " wait...success" << std::endl;
}
}
// // 退出进程池
// for (auto &c : channels)
// {
// c.Close();
// }
// for (auto &c : channels)
// {
// pid_t rid = ::waitpid(c.Id(), nullptr, 0);
// if (rid > 0)
// {
// std::cout << "child " << rid << " wait...success" << std::endl;
// }
// }
}
private:
std::vector<Channel> channels;
int processnum;
work_t work;
};
Main.cc
#include "Procpool.hpp"
// 使用提示
void Usage(std::string proc)
{
std::cout << "Usage:" << proc << "process-num" << std::endl;
}
int main(int argc, char *argv[])
{
if (argc != 2) // 如果用户输入的参数不是2个
{
Usage(argv[0]); // 使用提示
return UsageError;
}
// 创建指定个数个进程
int num = std::stoi(argv[1]);
ProcPool *pp = new ProcPool(num, Worker);
//初始化进程池
pp->InitprocessPool();
//派发任务
pp->DispatchTask();
//退出进程池
pp->CleanProcPool();
// std::vector<Channel> channels; // 管理管道
// // 初始化进程池
// InitprocessPool(num, channels, Worker);
// // 派发任务
// DispatchTask(channels);
// // 退出进程池
// CleanProcPool(channels);
delete pp;
return 0;
}
1.4 命名管道
可以用如下命令看到一个命名管道
可以看到我们创建了一个黄色的文件 fifo(first in first out) 它的标志位是以 p 开头,代表它是一个管道文件
效果是什么呢?
我们打开两个终端,一个终端中向管道写入内容之后会发现这个终端卡住了
之后我们用另一个终端把内容从管道中可以读到显示器上,此时完成两个进程间用命名管道的通讯。
我们还可以用函数mkfifo用代码,在指定路径下创建指定权限的管道文件。此时一个进程以写方式打开命名管道,一个以读方式打开命名管道,此时就能完成两进程之间的通讯了。
创建成功函数返回0,如果错误返回-1,同时设置错误码。
之所以叫命名管道,因为这是一个真正存在的文件,一个文件一定有路径+文件名,它具有唯一性。而进程间通讯的本质就是让不同的进程看见同一份资源,此时如果有了一份唯一的命名管道资源,就可以让不同的进程用同一个文件系统路径标识出这个为一个管道资源。
其实普通文件也可以进行进程间通讯,不过是一个进程向普通文件中写,一个从普通文件中读,只要两个进程能看到同一份资源就完全可以通讯。那命名管道是以管道的形式将通讯方案保护了起来,维护读写顺序和最大读写量,而最重要的是普通文件会向磁盘中刷新信息,但是管道的方案就是在内核中暂存信息,这会节省很大的时间。
我们可以把命名管道看作是普通文件的特殊化处理,虽然它是一个有名有姓的文件,但是当操作系统看见它的时候就会用管道的方案处理信息,同时也是因为它的有名有姓,也让非父子血缘的进程之间可以看到同一份资源,进而完成通讯。
下面我们就直接写代码
首先规划是启动两个进程Client和Server,让他俩之间进行通讯,首先我们需要两个main入口,和一个管理通讯的人。这个管道公共资源肯定是要加在一个main入口中的,也就是说一个进程要完成创建&&使用资源的任务,一个进程要完成获取&&使用资源的任务,在此我们让Server入口创建资源。
当我们创建好通讯时要用的命名管道后,在通讯结束的时候还需要把它删除,也就是要在析构函数中删除管道文件。
使用指令 man 2 unlink 查看
这个unlink就是移除指定路径下的文件,其实相当于rm指令
返回值删除成功就是0,删除失败就是-1并设置错误码。
成功完成命名管道的创建和删除。
让server以只读方式打开和关闭管道文件。
给server提供一个从管道读信息的接口
client的打开和关闭管道的方案于server类似,不过是以只写的方式打开,再提供一个发送信息的接口
我们再在外面包上clinet发送信息server接收信息的逻辑就可以使用了
此时我们就可以看到clinet进程输入信息之后,可以被server收到并打印出来,如果不发消息server就会一直等待。
最后如果读端打开文件时写端还没有打开,读端就会阻塞在open函数
完整代码
Comm.hpp
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
const std::string gpipeFile = "./fifo";
const mode_t gmode = 0600; // 允许拥有者读写
const int gdefualt = -1; // 默认错误文件描述符
const int gsize = 1024; // 约定通讯缓冲区大小
const int gForRead = O_RDONLY;
const int gForWrite = O_WRONLY;
int OpenPipe(int flag)
{
int fd = ::open(gpipeFile.c_str(), flag);
if (fd < 0)
{
std::cerr << "open error" << std::endl;
}
return fd;
}
void ClosePipeHelp(int fd)
{
if (fd >= 0)
::close(fd);
}
Client.hpp
#pragma once
#include <iostream>
#include "Comm.hpp"
class Client
{
public:
Client()
: _fd(gdefualt)
{
}
bool OpenPipeFWrite()
{
_fd = OpenPipe(gForWrite);
if (_fd < 0)
{
return false;
}
return true;
}
int SendPipe(const std::string &in)
{
return ::write(_fd, in.c_str(), in.size());
}
void ClosePipe()
{
ClosePipeHelp(_fd);
}
~Client()
{
}
private:
int _fd;
};
Server.hpp
#pragma once
#include <iostream>
#include "Comm.hpp"
class Server
{
public:
Server()
: _fd(gdefualt)
{
umask(0);
int n = ::mkfifo(gpipeFile.c_str(), gmode);
if (n < 0)
{
std::cerr << "mkfifo error" << std::endl;
return;
}
std::cout << "make fifo success!" << std::endl;
}
bool OpenPipeFRead()
{
_fd = OpenPipe(gForRead);
if(_fd < 0)
{
return false;
}
return true;
}
int RecvPipe(std::string *out)
{
char buffer[gsize];
ssize_t n = ::read(_fd, buffer, sizeof(buffer)-1);
if(n > 0)
{
buffer[n] = '\0';
*out = buffer;
}
return n;
}
void ClosePipe()
{
ClosePipeHelp(_fd);
}
~Server()
{
int n = unlink(gpipeFile.c_str());
if (n < 0)
{
std::cerr << "unlink error" << std::endl;
return;
}
std::cout << "unlink fifo success!" << std::endl;
}
private:
int _fd;
};
CliMain.cc
#include "Client.hpp"
#include <iostream>
#include "Comm.hpp"
int main()
{
Client client;
client.OpenPipeFWrite();
std::string message;
while(true)
{
std::cout << "Please Enter# ";
std::getline(std::cin, message);
client.SendPipe(message);
}
client.ClosePipe();
return 0;
}
SerMain.cc
#include "Server.hpp"
#include <iostream>
int main()
{
Server server;
server.OpenPipeFRead();
std::string message;
while (true)
{
if (server.RecvPipe(&message) > 0)
{
std::cout << "client say# " << message << std::endl;
}
else // 读到文件结尾,退出进程
{
break;
}
}
std::cout << "client quit me too!" << std::endl;
server.ClosePipe();
return 0;
}