【Linux】进程间通信(匿/命名管道、共享内存、消息队列、信号量)
文章目录
- 1. 进程通信的目的
- 2. 管道
- 2.1 原理
- 2.2 匿名管道
- 2.3 管道通信场景:进程池
- 2.4 命名管道
- 3. System V共享内存
- 3.1 操作共享内存
- 3.2 使用共享内存通信
- 4. System V 消息队列(了解)
- 5. System V 信号量(了解)
- 5.1 信号量
- 6. IPC的理解
- 6.1 用户角度
- 6.2 内核角度
1. 进程通信的目的
在前面的学习中,我们学到的进程都是孤立的(进程具有独立性),最密切的关系就是父子关系;但是在实际中,并不是单个进程去完成某些任务,进程间也需要协作。
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享同样的资源。
- 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
- 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。
由于进程具有独立性,那么进程间直接通信是不可能的,除非有专门的进程通信技术。
为了解决进程间通信的问题,就引入了进程间通信。
那进程间是如何通信的呢?
本质是:
要先让不同的进程看到同一份资源
,并且该资源不能是进程的(由于存在写时拷贝),只能是操作系统提供的公共资源。
- 如果公共资源是文件的形式:管道
- 如果公共资源是内存块的形式:共享内存
- 如果公共资源是队列的形式:消息队列
- 如果是个计数器:信号量
进程间通信有三个标准:
- 管道
- SystemV
- POSIX
2. 管道
2.1 原理
首先我们要知道,管道是Unix系统中比较古老的通信方式。
有了文件系统的知识后,我们知道文件加载到内存中的过程入下图:
如果此时父进程创建子进程,那子进程就需要以父进程的task_struct、file_struct、file等为模板
创建自己的;但是文件相关的内容(inode、内核缓冲区)子进程也会创建吗?- - 不会,因为操作系统内不会重复的打开一个文件。
如果内核缓冲区父子进程共用一份,这不就是OS提供的公共资源了吗?
但是,现在如果让父子进程之间通信,就不需要将内核缓冲区中的内容刷新到磁盘中了;因为不仅逻辑上讲不通,而且还浪费资源。
如果将内核缓冲区设计成一种纯内存级别的,然后用它来专门负责进程间的通信不就可以了吗?- - 是的,我们的管道就是这样产生的。
管道特点:只能进行单向通信(需要关掉相应的文件描述符)
2.2 匿名管道
内存级别的内核缓冲区就是匿名管道(匿名:无需指定路径、没有名字)。
创建管道的系统调用: pipe
- 头文件: #include <unistd.h>
- 功能:创建一无名管道 原型 int pipe(int fd[2]);
- 参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
- 返回值:成功返回0,失败返回错误代码
使用pipe实现通信:
#include <iostream>
#include <string>
using namespace std;
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
int main()
{
int fds[2] = {0};
// 1. 创建管道
int n = pipe(fds); // fds输出形参数
if (n == -1)
{
cerr << "pipe fail" << endl;
return 1;
}
// 2. 创建子进程
pid_t id = fork();
if (id < 0)
{
return 2;
}
if (id == 0)
{
// child
// 3. 关闭read
::close(fds[0]);
int count = 0;
while (true)
{
string messages = "hello pipe ";
messages += to_string(getpid());
messages += ",";
messages += to_string(count);
// 开始通信
::write(fds[1], messages.c_str(), messages.size());
count++;
sleep(1);
}
exit(0);
}
else
{
// father
// 3.关闭write
::close(fds[1]);
char buffer[1024];
while(true)
{
size_t n = ::read(fds[0],buffer,1024);
if(n > 0)
{
buffer[n] = '\0';
cout << "child->father,message:" << buffer << endl;
}
}
pid_t rid = waitpid(id, nullptr, 0);
cout << "father wait child success,rid:" << rid << endl;
}
return 0;
}
管道的四种场景:
- 管道正常 && 管道为空,read(系统调用)会阻塞
- 管道正常 && 管道为满,write(系统调用)会阻塞
- 管道写端关闭,读端继续,读端无内容可读,表示到达文件结尾
- 管道读端关闭,写端继续,写端无需再写,OS会直接杀掉写端进程( SIGKILL )
匿名管道的特性:
- 面向字节流(不关心对方写的多少)
- 主要用于具有“血缘关系”的进程进行IPC(Inter-Process Communication),常用于父子
- 文件的生命周期随进程,管道也是
- 管道只能进行单向通信,如若进行双向,那就建立两个管道
- 管道自带同步互斥等保护机制(对共享资源的)
2.3 管道通信场景:进程池
在我们之前所写的代码中,进程都是各忙各的;那我们能不能将多个进程协同起来,让一个进程向其它进程派发任务呢?
根据管道的场景:管道正常 && 管道为空,read(系统调用)会阻塞。那么当父进程不向指定的管道中写数据,那与管道相连的进程就必须阻塞。
此时,父进程就可以通过向管道中写数据来控制子进程,此时多进程之间就进行了协同,这种模式就叫做进程池
下面我们就来简单实现一个进程池
先准备一个通用的makefile
BIN = ProcessPool ## binary executable files
##---------1. 编译选项---------
CC = g++
#-Wall:warning all 显示所有警告
FLAGS = -c -Wall -std=c++11
##---------2. 链接选项---------
LDFLAGS = -o
##---------3. 获得相关依赖文件---------
#罗列所有的.cc
SRC = $(shell ls *.cc)
#wildcard是make的函数
#SRC = $(wildcard *.cc)
#形成同名.o
OBJ = $(SRC:.cc=.o)
##--------- 4. 执行编译、链接操作---------
$(BIN):$(OBJ)
$(CC) $(LDFLAGS) $@ $^
%.o:%.cc
$(CC) $(FLAGS) $<
.PHONY:clean
clean:
rm -f $(BIN) $(OBJ)
接下来,我们要按照下面的步骤书写代码:
- 先有num个管道,父进程同时对多个管道进行管理
- 创建num个子进程
- 关闭相应的读写端
- 建立信道
- 父进程向信道中写任务,子进程读任务
Channel.hpp
#include <iostream>
using namespace std;
#include<string>
class Channel
{
public:
Channel(int wfd,int who)
:_wfd(wfd)
,_who(who)
{
_name = "Channel-" + to_string(wfd) + "-" + to_string(who);
}
~Channel()
{}
int Get_wfd()
{
return _wfd;
}
void Close()
{
close(_wfd);
}
string GetName()
{
return _name;
}
private:
int _wfd; //标记每个管道的写端
string _name; //管道名
int _who;
};
ProcessPoll.cc
#include "Channel.hpp"
#include <vector>
#include <unistd.h>
#include <sys/types.h>
enum
{
USEERR = 0,
PIPEFAIL,
FORKFAIL
};
void Usage(string ProcessName)
{
cout << "Usage:" << ProcessName << " process num" << endl;
}
void ExecuteTask()
{
}
int main(int argc, char *argv[])
{
// 检测是否正确使用
if (argc != 2)
{
Usage(argv[0]);
return USEERR;
}
int num = stoi(argv[1]); // 获得进程个数
vector<Channel> channels(num);
// 2. 创建指定个进程
for (int i = 0; i < num; i++)
{
// 1.先创建管道
int pipefd[2] = {0}; // 返回的两个文件描述符,指向同一个管道
int n = ::pipe(pipefd);
if (n == -1)
return PIPEFAIL;
pid_t id = fork();
if (id < 0)
return FORKFAIL;
if (id == 0)
{
// 子进程关闭写端
::close(pipefd[1]);
// 子进程要关闭 从父进程继承下来的之前的管道
if (i > 0)
{
cout << "child " << getpid() << " shut history fd" << endl;
for (auto &c : channels)
{
cout << c.Get_wfd() << " ";
c.Close();
}
cout << endl;
}
// 执行任务
dup2(pipefd[0],0);//让所有的子进程都从0里读,无需再传递管道的fd
ExecuteTask();
exit(0);
}
// 父进程关闭读端
::close(pipefd[0]);
channels.emplace_back(pipefd[1], id);
}
return 0;
}
在创建管道和进程时,注意下方情况,需要为子进程关闭不属于它管道的读端,否则在等待子进程时,子进程未到文件尾,不退出,父进程会阻塞式等待。
此时就会将先前进程的管道的读端在当前进程关闭了
下面使用面向对象的思想将代码拆分一下:
Channel.hpp
#pragma once
#include <iostream>
#include <unistd.h>
using namespace std;
#include <string>
class Channel
{
public:
Channel() = default;
Channel(int wfd, pid_t who)
: _wfd(wfd), _who(who)
{
_name = "Channel-" + to_string(wfd) + "-" + to_string(who);
}
~Channel()
{
}
int Get_wfd()
{
return _wfd;
}
void Close()
{
close(_wfd);
}
void Send(int task)
{
write(_wfd,&task,sizeof(task));
}
pid_t GetPid()
{
return _who;
}
string GetName()
{
return _name;
}
private:
int _wfd; // 标记每个管道的写端
string _name; // 管道名
pid_t _who;
};
ProcessPool.hpp
#include "Channel.hpp"
#include "Task.hpp"
#include <vector>
#include <functional>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
// 包装器
using work_t = function<void()>;
enum
{
OK = 0,
USEERROR,
PIPEERROR,
FORKERROR
};
class ProcessPool
{
public:
ProcessPool(int processNum, work_t work)
: num(processNum), Worker(work)
{}
~ProcessPool()
{}
int InitProcessPoll()
{
// 2. 创建指定个进程
for (int i = 0; i < num; i++)
{
// 1.先创建管道
int pipefd[2] = {0}; // 返回的两个文件描述符,指向同一个管道
int n = ::pipe(pipefd);
if (n == -1)
return PIPEERROR;
pid_t id = fork();
if (id < 0)
return FORKERROR;
if (id == 0)
{
// 子进程关闭写端
::close(pipefd[1]);
// 子进程要关闭 从父进程继承下来的之前的管道
if (i > 0)
{
// cout << "child " << getpid() << " shut history fd" << endl;
for (auto &c : channels)
{
// cout << c.Get_wfd() << " ";
c.Close();
}
// cout << endl;
}
// 执行任务
dup2(pipefd[0], 0); // 让所有的子进程都从0里读,无需再传递管道的fd
Worker();
exit(0);
}
// 父进程关闭读端
::close(pipefd[0]);
channels.emplace_back(pipefd[1], id);
}
return OK;
}
void DispatchTask()
{
int process = 0;
int sum = 20;
while (sum--)
{
// a. 选择任务,一个整数
int task = taskManeger.Select();
// b. 轮循选择一个子进程/管道
Channel &cur = channels[process++];
process %= channels.size();
cout << "-------------------------------" << endl;
cout << "send " << task << "to " << cur.GetName() << " 剩余任务:" << sum << endl;
cout << "-------------------------------" << endl;
// c. 给子进程 派发任务(向相应的管道中写)
cur.Send(task);
sleep(1);
}
}
void ClearProcessPool()
{
for (auto &c : channels)
{
c.Close();
pid_t rid = waitpid(c.GetPid(), nullptr, 0);
if (rid > 0)
{
cout << "father wait pid:" << rid << " success" << endl;
}
}
}
// void Debug(vector<Channel> &channels)
// {
// for (auto &c : channels)
// {
// cout << "pipe name:" << c.GetName() << endl;
// }
// }
private:
vector<Channel> channels;
int num;
work_t Worker;
};
Task.hpp
#pragma once
#include <unordered_map>
#include <functional>
#include <iostream>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>
using task_t = std::function<void()>;
static int number = 0;
void Print()
{
std::cout << "打印任务... pid: " << getpid() << std::endl << std::endl;
}
void DownLoad()
{
std::cout << "下载任务...pid: " << getpid() << std::endl<< std::endl;
}
void SQL()
{
std::cout << "数据库任务...pid: " << getpid() << std::endl<< std::endl;
}
void Install()
{
std::cout << "安装任务...pid: " << getpid() << std::endl<< std::endl;
}
class TaskManager
{
public:
TaskManager()
{
srand(time(nullptr));
this->Insert(Print);
this->Insert(DownLoad);
Insert(SQL);
Insert(Install);
}
~TaskManager()
{
}
void Insert(task_t task)
{
tasks[number++] = task;
}
int Select()
{
return rand() % number;
}
void Execute(int num)
{
if (tasks.find(num) == tasks.end())
return;
tasks[num]();
}
private:
std::unordered_map<int, task_t> tasks;
};
TaskManager taskManeger; //创建全局的任务管理对象
void Work()
{
while (true)
{
int task = 0;
int n = read(0, &task, sizeof(task));
if (n == 0)
{
cout << "child pid :" << getpid() << " quit success" << endl;
break; // 写端关闭,读到了文件尾
}
else if (n == sizeof(task))
{
taskManeger.Execute(task);
}
else
{
// to dot
}
}
}
main.cc
#include "ProcessPool.hpp"
void Usage(string ProcessName)
{
cout << "Usage:" << ProcessName << " process num" << endl;
}
int main(int argc, char *argv[])
{
// 检测是否正确使用
if (argc != 2)
{
Usage(argv[0]);
return USEERROR;
}
int num = stoi(argv[1]); // 获得进程个数
ProcessPool* pp = new ProcessPool(num,Work);
// 1. 创建进程池
pp->InitProcessPoll();
// 2. 派发任务
pp->DispatchTask();
// 3. 清理进程池
pp->ClearProcessPool();
delete pp;
return 0;
}
2.4 命名管道
匿名管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
命名管道是一种特殊类型的文件
,创建命名管道的系统命令:mkfifo
此时,我们就能真正的“看见”一个管道文件了。有了路径和文件名,就能唯一确定一个文件,这样不同的进程就看到了同一份资源了。
它有自己的inode,文件类型为p。
p类型文件:只使用内核文件缓冲区,不做刷新。
如果我让两个进程一个去命名管道中写,一个读,这不就完成通信了么?
在一个进程中,也可以使用函数调用:mkfilo
,来创建命名管道。
删除一个命名管道使用:unlink
由于命名管道有路径和名字,那么不同的进程就可以像打开文件一样使用不同的权限打开命名管道,从而实现进程间通信。
3. System V共享内存
3.1 操作共享内存
共享内存是最快的IPC形式
一旦这样的内存映射到它的进程的地址空间的共享区,这些进程间数据传递不再涉及到内核。换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。
类似于下图:
那如何在物理内存中得到一块共享内存呢?
- 创建一个共享内存的系统调用:
shmget
这个函数有三个参数:
- key:用来唯一标识该共享内存
- size:所创建的共享内存的大小
- shmflg:创建共享内存时的选项,常用IPC_CREAT,IPC_EXCL
- IPC_CREAT:如果当前key所对应共享内存不存在,则创建;若存在,返回。保证能返回一个共享内存
- IPC_EXCL:单独使用没有意义。通常IPC_CREAT | IPC_EXCL,如果共享内存不存在,创建它;如果已经存在,则出错返回。保证能返回一个全新的共享内存
返回值:
- 成功,返回共享内存的id值,该值与key有关
- 失败,返回-1,错误码被设置
- 到这里一直有一个疑问,为什么共享内存的key值要用户传递?内核自动生成不香吗?
- 既然不能让内核生成,那就只能自己创建,并且让这两个进程都能看到。
但是让用户自己设定一个又不好,因为既没有一定的规律,又可能出现大量重复的key,然后导致创建shm失败。- 为了解决上述问题,系统提供了一个专门用来生成key的函数:
ftok
- 使用
此时,我们就可以使用shmget创建共享内存了:
在这里,补充几条关于共享内存的指令:
ipcs -m
:查共享内存ipcrm -m + shmid
:删除shmid对应的共享内存
既然有了一个共享内存了,那进程应该怎么使用呢?
- 挂接一个共享内存的系统调用:
shmat
(at:attach)
返回值:
- 成功:返回被挂接的 当前进程地址空间的地址
- 失败:返回(void *) - 1
为什么挂接失败了呢?
在共享内存的属性中,有一个perms的属性,表示当前shm的权限;没有权限,你怎么挂接呢?
所以,我们需要在创建共享内存时设置权限,可直接在shmflg参数上直接按位与上相应的权限位
- 去关联(取消挂接)一个共享内存的系统调用:
shmdt
(dt:delete attach)
参数:shmat所返回的地址。
- 删除一个共享内存的系统调用:
shmctl
一旦失败,-1会被返回。
此时,我们共享内存的创建、连接、去连接、删除就搞定了。
3.2 使用共享内存通信
要想使用共享内存通信,两个进程,进程1先创建shm && 使用;进程2 获取shm && 使用。然后一个进程向所挂接的内存中写,另一个读即可完成通信。
因此,可以将共享内存专门抽离作为一个类。
然后创建全局共享内存的对象,以便进程都能看到
#pragma once
#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
const std::string gpathName = "/home/wk/linux"; // 路径名可随意指定
const int gpro_id = 101; // 项目id随便指定
const int gSize = 4096;
const mode_t gmode = 0600;
class ShareMemary
{
void CreateHelper(int flag)
{
// 1.获取一个key
_key = ftok(gpathName.c_str(), gpro_id);
// 2. 创建共享内存
_shmid = shmget(_key, gSize, flag);
if (_shmid == -1)
std::cout << "创建shm失败" << std::endl;
else
std::cout << "创建成功:shmid: " << _shmid << std::endl;
}
public:
ShareMemary()
: _shmid(-1), _key(0), _addr(nullptr)
{
}
~ShareMemary()
{
}
void CreateSHM()
{
if (_shmid == -1)
CreateHelper(IPC_CREAT | IPC_EXCL | gmode);
}
void GetSHM()
{
CreateHelper(IPC_CREAT);
}
void Attach()
{
_addr = shmat(_shmid, nullptr, 0);
if ((long long)_addr == -1)
std::cout << "挂接失败" << std::endl;
else
printf("挂接成功:%p\n", _addr);
}
void DeAttach()
{
if (_addr != nullptr)
shmdt(_addr);
}
void DeleteSHM()
{
int del = shmctl(_shmid, IPC_RMID, nullptr);
if (del == -1)
std::cout << "删除失败" << std::endl;
else
std::cout << "删除成功" << std::endl;
}
void* GetAddr()
{
return _addr;
}
private:
int _shmid;
key_t _key;
void *_addr;
};
ShareMemary shm; //创建shm对象,以便其余进程都能看到
此时,即可完成通信工作。
但是,此时我们的Server不会像管道那样,没东西可读就阻塞,它反而一直在读;这就有可能造成Client还没写完,Server就读了,也就是没有对资源加保护,有可能造成错误。
我们对加了保护的资源叫做临界资源。
访问公共资源的代码,叫做临界区;未访问公共资源的叫做非临界区。
所以,我们目前可使用管道的特性:管道正常&&管道为空,则阻塞,控制Server读数据的时刻。
因此,可在Server与Client间维护一个管道,Server读,Client写。当Client讲资源完整写道共享内存中,然后向管道发送一个信号,Server从管道中读到该信号就不再read那里阻塞了,此时读共享内存中的资源就是正确的了。
下面的代码模拟了进程间同步的过程。
4. System V 消息队列(了解)
消息队列提供了进程间发送数据块的方法,每个数据块都有一个类型标识。
消息队列基于消息,而管道则基于字节流。
一个或多个进程可以向消息队列写入消息,而一个或多个进程可以从消息队列中读取消息。
认识消息队列相关的方法:
msgget
:获取消息队列
ipcs -q
:查看消息队列的指令ipcrm -q + id
:删除消息队列指令
msgctl
:消息队列删除的系统调用
msgsnd
:发送消息msgrcv
:接收消息
由于消息具有类型,那么在接收的时候就可以接收指定类型的消息了。
经过上述的学习,我们发现它的接口与共享内存非常的相似,因为它们都遵循System V标准。
5. System V 信号量(了解)
由于信号量也是遵循System V标准的,所以它的常用方法和前面的类似。
信号量相关的操作:
- semget:获取
- semctl:删除
- ipcs -s :查看信号量的指令
- ipcrm -s + id :删除信号量的指令
但是,信号量主要是用与同步和互斥的。
在多个执行流能够访问同一份资源的时候,被保护起来的资源叫做临界资源(一次只允许一个进程使用)。
保护的常见方式:
- 互斥:任何时刻,只允许一个执行流(进程)访问资源。
- 同步:多个执行流,访问临界资源的时候,具有一定的顺序性。
因此,我们所写的代码 = 访问临界资源的代码(临界区) + 不访问临界资源的代码(非临界区)
。所谓的对共享资源的保护,本质是对访问共享资源的代码进行保护。
5.1 信号量
信号量:本质是一个对资源进行预订的计数器。
那么在访问临界资源的时候,就需要先抢占申请信号量;如果是这样,那么多个进程就会转头区抢占申请信号量了,可是谁来保护信号量呢?
因此信号量必须解决下面两个问题:
- 信号量必须能被多个进程看到 。
- 信号量的- -与++操作(PV操作)必须具有原子性(原子性是指一个操作是不可中断的,即该操作要么全部执行成功,要么全部执行失败,不存在执行到中间某个状态的情况。)
由于资源可能有多份,所以信号量可以有多个。因此System V申请信号量,是以信号量集的方式,提供给使用者的。
信号量的操作:
6. IPC的理解
System V是如何实现IPC的,和管道为什么不同呢?
6.1 用户角度
首先我们要知道操作系统是如何管理IPC的:先描述,再组织。IPC有哪些属性呢?
根据上面我们可以发现,它们内部都有一个ipc_perm的东西。我们可以推测一下,在OS层面,IPC是同类资源。
我们也可以获取IPC对应的属性
6.2 内核角度
我们知道IPC资源要被所有进程看到,它一定是全局的。所以IPC资源在内核中一定是一个全局变量
下面我们来看内核源代码:
我们发现在消息队列、信号量与共享内存的源码中,结构体开头位置都是 kern_ipc_perm,这点和我们上面从用户层看到的是一样的。
此时,所有的IPC资源都可以直接被柔性数组直接指向。
例如
- p[0] = (struct kern_ipc_perm) &(shmid_kernel)
- p[1] = (struct kern_ipc_perm) &(msg_queue)
- p[2] = (struct kern_ipc_perm) &(sem_array)
…
那么不就可以使用柔性数组,管理所有的IPC资源了吗?数组下标就是之前的xxxid,即xxxget的返回值!这也就是为什么,之前我们见到的各种IPC资源的id是连续的了。
所以,所有的IPC资源,区分IPC的唯一性,都是通过key,各类型的IPC资源之间的key也可能会冲突。
此时怎么访问IPC资源的其它属性呢?
直接强转,(struct msg_queue*) p[1] ->其它属性
那么一个指针,指向结构体的第一个元素,其实就是指向了整个结构体。访问头部,直接访问;访问其它属性,做强转,这种结构不就是C++中的多态吗?
这时,我们所看到的kern_ipc_perm就是基类,与之相关的三个就是子类,继承了基类
,此时就可以使用基类来管理所有的子类了,这是C语言实现多态的另一种方式。
那具体是怎么识别是哪一种子类的呢?
实际在内核中,会定义各种的ipc_ids,但是它们的entries指针都指向同一个kern_ipc_perm数组。