6.【Linux】进程间通信(管道命名管道||简易进程池||简易客户端服务端通信)
介绍
进程间通信的方式
1.Linux原生支持的管道----匿名和命名管道
2.System V-----共享内存、消息队列、信号量
3.Posix------多线程、网路通信
进程间通信目的
数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变
管道
原理
管道是单向通信的,常用于有亲缘关系的父子间通信,父进程调用pipe打开管道文件,操作系统创建struct file结构体(存有inode等文件信息),父进程fd指向该文件,父进程fork出子进程,子进程也拷贝父进程的代码和fd,同时fd指向管道,分别关闭读端和写端,从而实现单向通信。
创建管道
调用成功返回0,失败返回-1。pipefd是输出型参数,pipefd【0】为3,pipefd【1】为4.
构建单向通信的读端(子进程关闭写端)
//2.create child process
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
//child process
//3构建单向通行的信道pipe[0]:read,pipe[1]:write
//3.1关闭子进程不需要的fd
close(pipefd[1]);
char buffer[1025];
while(true)
{
ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
buffer[s]=0;
cout<<"child get a message["<<getpid()<<"]father"<<buffer<<endl;
}
}
exit(0);
}
写端(父进程关闭读端)
close(pipefd[0]);
string message="我是父进程,我正在给你发消息";
int count=0;
char send_buffer[1024];
while(true)
{
//3.2构建一个变化的字符串
snprintf(send_buffer,sizeof(send_buffer),"%s:%d",message.c_str(),count++);
//3.3写入
write(pipefd[1],send_buffer,strlen(send_buffer));
//3.4故意睡一会
sleep(1);
}
pid_t ret=waitpid(id,nullptr,0);
assert(ret<0);
(void)ret;
close(pipefd[1]);
管道特点
1.常用于亲缘关系的父子间通信
2.提供访问控制(例如管道无数据时读端就必须等数据写入)
3.管道本质是内核中的一块缓冲区,多个进程通过访问同一块缓冲区实现通信。
4. 管道提供的是面向流式的通信服务(面向字节流),需要定制协议来进行数据区分。
5.管道是基于文件的,文件的生命周期是随进程的,那么管道的生命周期也是随进程的。
6.管道是单向通信的,就是半双工通信的一种特殊情况,数据只能向一个方向流动。需要双方通信时,需要建立起两个管道。半双工通信就是要么在收数据,要么在发数据,不能同时在收数据和发数据(比如两个人在交流时,一个人在说,另一个人在听);而全双工通信是同时进行收数据和发数据(比如两个人吵架的时候,相互问候对方,一个人既在问候对方又在听对方的问候)。
实现一个简易进程池
process.cc
#include<iostream>
#include<unistd.h>
#include<stdlib.h>
#include<sys/wait.h>
#include<sys/types.h>
#include<assert.h>
#include<vector>
#include<cstdlib>
#include<time.h>
#include"Task.hpp"
#define PROCESS_NUM 5
using namespace std;
int waitCommand(int waitFd,bool &quit)
{
uint32_t command=0;
ssize_t s=read(waitFd,&command,sizeof(command));
if(s==0)
{
quit=true;
return -1;
}
assert(s==sizeof(uint32_t));
return command;
}
//给哪一个进程通过什么文件描述符发送什么命令
void sendAndWakeUp(pid_t who,int fd,uint32_t command)
{
write(fd,&command,sizeof(command));
cout<<"call process"<<who<<"execute"<<desc[command]<<"through"<<fd<<endl;
}
int main()
{
Load();
vector<pair<pid_t,int>> slots;
//先创建多个进程
for(int i=0;i<PROCESS_NUM;i++)
{
int pipefd[2]={0};
int n=pipe(pipefd);
assert(n==0);
(void)n;
pid_t id=fork();
assert(id!=-1);
if(id==0)
{
//child读取,关闭写端
close(pipefd[1]);
while(true)
{
bool quit=false;
//pipefd[0]
int command=waitCommand(pipefd[0],quit);//如果不发,则阻塞
if(quit)
break;
if(command>=0&&command<handlerSize())
{
callbacks[command]();
}
else
{
cout<<"非法command"<<endl;
}
}
exit(1);
}
close(pipefd[0]);
slots.push_back(pair<pid_t,int>(id,pipefd[1]));
}
//开始任务
srand((unsigned long)time(nullptr));
while(true)
{
int select,command;
cout<<"##############"<<endl;
cout<<"1.show functions"<<endl;
cout<<"2.send command"<<endl;
cout<<"Please select";
cin>>select;
if(1==select)
{
showHandler();
}
else if(select==2)
{
cout<<"Enter Your Command:";
cin>>command;
//选择进程
int choice_procID=rand()%slots.size();
//布置任务给指定进程
sendAndWakeUp(slots[choice_procID].first,slots[choice_procID].second,command);
}
}
//关闭fd,所有的子进程都会退出
for(const auto& slot:slots)
{
close(slot.second);
}
//回收所有的子进程信息
for(const auto& slot: slots)
{
waitpid(slot.first,nullptr,0);
}
return 0;
}
task.hpp
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
#include<functional>
#include<vector>
#include<unordered_map>
typedef std::function<void()> func;
std::vector<func> callbacks;
std::unordered_map<int,std::string> desc;
void readMySQL()
{
std::cout<<"process["<<getpid()<<"]执行访问数据库的任务"<<std::endl;
}
void execuleURL()
{
std::cout<<"process["<<getpid()<<"]执行url解析"<<std::endl;
}
void cal()
{
std::cout<<"process["<<getpid()<<"]执行加密任务"<<std::endl;
}
void save()
{
std::cout<<"process"<<getpid()<<"执行数据持久化任务"<<std::endl;
}
void Load()
{
desc.insert({callbacks.size(),"readMySQL"});
callbacks.push_back(readMySQL);
desc.insert({callbacks.size(),"execul"});
callbacks.push_back(execuleURL);
desc.insert({callbacks.size(),"cal"});
callbacks.push_back(cal);
desc.insert({callbacks.size(),"save"});
callbacks.push_back(save);
}
void showHandler()
{
for(const auto& iter:desc)
{
std::cout<<iter.first<<"\t"<<iter.second<<std::endl;
}
}
int handlerSize()
{
return callbacks.size();
}
命名管道
引入
管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。命名管道是一种特殊类型的文件。
区别(打开方式,是否存在于文件系统,血缘)
匿名管道不属于文件系统,是一种特殊的文件,只存在于内存中,只能进行血缘关系间的通信。命名管道可用于无关联的进程间通信,mkfifo函数调用后需用open打开,因为它以FIFO文件的形式存在于文件系统中。
实现一个简易的客户端和服务端
//comm.hpp
#ifndef _COMM_H
#define _COMM_H
#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include<cstdio>
#include<cstring>
#include"log.hpp"
using namespace std;
string ipcPath="./fifo.ipc";
const int MODE=0666;
const int SIZE=128;
#endif
//server.cc
#include"comm.hpp"
int main()
{
//1.创建命名管道
if(mkfifo(ipcPath.c_str(),MODE)<0)
{
perror("mkfifo");
exit(1);
}
Log("创建管道文件成功",Debug)<<"step1"<<endl;
//2.打开命名管道
int fd=open(ipcPath.c_str(),O_RDONLY);
if(fd<0)
{
perror("open");
exit(2);
}
Log("打开成功",Debug)<<"step2"<<endl;
//3.读取数据
char buf[SIZE];
while(true)
{
memset(buf,'\0',sizeof(buf));
ssize_t s=read(fd,buf,sizeof(buf)-1);//'\0'不读
if(s>0)
{
cout<<"client say:"<<buf<<endl;
}
else if(s==0)
{
//EOF
cerr<<"client quit!"<<endl;
break;
}
else{
//error
perror("read");
break;
}
}
//4.关闭文件
close(fd);
unlink(ipcPath.c_str());
Log("关闭成功",Debug)<<"step 3"<<endl;
return 0;
}
//client.cc
#include"comm.hpp"
int main()
{
//获取管道文件
int fd=open(ipcPath.c_str(),O_WRONLY);
if(fd<0)
{
perror("open");
exit(1);
}
//ipc通信过程
string buffer;
while(true)
{
cout<<"please input:"<<endl;
getline(cin,buffer);
write(fd,buffer.c_str(),buffer.size());
}
close(fd);
return 0;
}