Linux 管道
1. 管道的基本概念和原理
在Linux中,管道(pipe)是一种进程间通信机制,它允许将一个命令的输出直接作为另一个命令的输入。管道使用 |
符号来连接命令,实现数据的流向。
创建管道的系统调用是 pipe,其包含在头文件<unistd.h>
中,函数原型如下:
int pipe(int pipefd[2]);
参数 int pipefd[2]是一个输出型参数,是一个长度为2
的数组,其中 pipefd[0]是管道的读端fd,而pipefd[1]是管道的写端fd。
返回值:
- 返回 0 表示开辟管道成功
- 返回 -1 表示开辟管道失败,并且会设置错误码
管道的工作原理是在内核中创建一个缓冲区,用于存储一个命令的输出和另一个命令的输入。当使用管道连接命令时,前一个命令的标准输出(stdout)会被重定向到管道的写端,而后一个命令的标准输入(stdin)会被连接到管道的读端。 而这个缓冲区我们可以直接使用文件的缓冲区(但是操作系统不会在磁盘上创造文件,而是在内存中临时开辟一个缓冲区,我们以文件的缓冲区作例子易理解),实现如下:
由于子进程会继承父进程的PCB,同时也会继承父进程的struct files_struct
,相当于同时打开了一份文件。那么现在父子进程就都得到同样的资源,即可以看到同一份文件了。
管道是半双工的,即在同一时刻只能进行单向通信,必须是一端读,一端写,需要关闭相应的文件描述符来实现数据的发送和接收。上图中,父进程保留了写端,子进程保留了读端,即父进程向管道写入,子进程从管道读取。
2. 匿名管道
匿名管道是Linux中一种进程间通信(IPC)机制,它允许在具有亲缘关系的进程之间传递数据。匿名管道没有名字,通常用于父子进程或兄弟进程之间的通信。在创建管道后,父进程和子进程会继承管道的文件描述符,通过这些文件描述符可以进行读写操作,实现数据的传递。上述原理所使用的管道即为匿名管道。
int pipe(int pipefd[2]);
匿名管道的工作原理是通过内核维护的一个缓冲区来实现的。当使用 pipe()系统调用创建管道时,内核会在内存中创建一个缓冲区,并返回两个文件描述符,pipefd[0]用于读取(读端),而pipefd[1]用于写入(写端)。父进程和子进程通过这些文件描述符可以进行通信,数据从写端流向读端。使用如下,test.cpp文件如下:
#include<stdio.h>
#include<iostream>
#include<unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
using namespace std;
//往管道中读数据
void Read(int rfd)
{
char buffer[1024];
while(1)
{
read(rfd,buffer,sizeof(buffer));
cout<<"mypid="<<getpid()<<",get a message: "<<buffer<<endl;
}
}
//往管道中写数据
void Write(int wfd)
{
char buffer[1024];
int count = 0;
while(1)
{
snprintf(buffer,sizeof(buffer),"pid=%d,count=%d",getpid(),count++);
write(wfd,buffer,sizeof(buffer));
sleep(1);
}
}
int main()
{
int pipefd[2];
pipe(pipefd);
int rfd = pipefd[0],wfd = pipefd[1];
pid_t pid = fork();
if(pid == 0) //child process
{
close(wfd);//关闭写端
Read(rfd);//读数据
exit(0);
}
else
{
//parent process
close(rfd);//关闭读端
Write(wfd);//写数据
waitpid(pid,NULL,0);
}
return 0;
}
上述代码通过pipe
创建了管道,此时读端rfd
就是pipe[0]
,写端wfd
就是pipe[1]
。随后通过fork
创建子进程,子进程关闭rfd
,进行写入,父进程关闭wfd
,进行读取。编译生成 test.exe 文件并运行
可以看到父进程读取到了管道的内容。
3. 管道读写规则
1. 当没有数据可读时,read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
void Read(int rfd)
{
char buffer[1024];
while(1)
{
read(rfd,buffer,sizeof(buffer));
cout<<"mypid="<<getpid()<<",get a message: "<<buffer<<endl;
}
}
void Write(int wfd)
{
char buffer[1024];
int count = 0;
while(1)
{
write(wfd,"c",1);
count++;
cout<<count<<endl;
}
}
而读端函数新增一行代码,如下:
void Read(int rfd)
{
char buffer[1024];
while(1)
{
read(rfd,buffer,sizeof(buffer));
cout<<"mypid="<<getpid()<<",get a message: "<<buffer<<endl;
sleep(1000);//暂停读端
}
}
我们重新编译生成 test.exe 文件并运行
最后输出 65984
就不再写入数据了,此时写端停止写入数据,因为管道满了。而65984 byte
也大概是64 kb
。
void Read(int rfd)
{
char buffer[1024];
while(1)
{
int ret=read(rfd,buffer,sizeof(buffer));
if(ret==0)
cout<<"write end!"<<endl;
else
cout<<"mypid="<<getpid()<<",get a message: "<<buffer<<endl;
sleep(1);
}
}
//往管道中写数据
void Write(int wfd)
{
char buffer[1024];
int count = 0;
while(1)
{
write(wfd,"c",1);
count++;
if(count == 20)
break;
}
close(wfd);
}
我们重新编译生成 test.exe 文件并运行
我们可以看到写端关闭了管道,读端依然可以读取数据,把字符 c读取走后,随后的所有读取都是write end!。
通过上述的读写机制,我们可以了解到管道的特性包括:
-
单向性:数据只能沿着一个方向流动,即从管道的写端流向读端。
-
生命周期与进程一致:一般而言,进程退出,管道释放,所以管道的生命周期随进程
-
阻塞与非阻塞:如果管道中没有数据可读,读操作会阻塞;如果管道已满,写操作会阻塞。如果管道的所有读端被关闭,继续写入会导致写进程异常退出;如果所有写端被关闭,读操作在读取完数据后不会阻塞,而是返回0。
-
同步与互斥:管道提供了同步机制,确保数据的安全传输,避免数据竞争和不一致性。
-
原子性:对于不大于
PIPE_BUF
(通常是512或4096字节)的数据操作,管道保证原子性,也可以称之为面向字节流,即一次性完成,不会被中断。例如在上诉第三个案例中,Write函数写下21个c后就停止,而读函数一直在读,但是最终读函数只打印一条message。
4. 命名管道
命名管道即充当了上图中的struct file。只要进程A
和B
都打开了命名管道文件,此时两者就可以通过命名管道文件通信了。
mkfifo系统调用指令
我们使用 mkfifo log.txt 建立名字为 log.txt 的管道如下:
我们成功在另一个 bash 命令行读取到 log.txt 管道中的内容,读取结束后原命令行的阻塞状态也终止了。
mkfifo函数调用接口
我们也可以在进程中通过函数调用接口来创建管道,其包含在头文件<sys/types.h>和<sys/stat.h>中,函数原型如下:
参数:
pathname
:即管道文件创建的路径mode
:管道文件的初始权限
返回值为0则管道创建成功,否则创建失败。
使用如下,我们通过mkfifo
接口在当前路径下创建了 log.txt
管道文件,将初始权限设为0666(与掩码umask作用后最终为0664)
。
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
using namespace std;
int main()
{
int ret = mkfifo("./log.txt", 0666);
if (ret == 0)
cout << "mkfifo success" << endl;
else
cout << "mkfifo fail" << endl;
return 0;
}
我们可以看到已经创建出了 log.txt 管道文件,后续只需要通过在不同进程中通过open
接口打开这个文件,就可以进行进程间通信了。
如果想在程序中删除管道文件,则可以使用函数 unlink(文件路径)
5.命名管道实现通信
comm.hpp
#include<iostream>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<string>
#include<unistd.h>
using namespace std;
#define FIFO_NAME "./myfifo"
#define FIFO_MODE 0666
enum
{
FIFO_CREATE_ERROR=1,
FIFO_DELETE_ERROR=2,
FIFO_OPEN_ERROR=3,
};
class Init
{
public:
Init()
{
int ret=mkfifo(FIFO_NAME,FIFO_MODE);
if(ret!=0)
{
perror("mkfifo error");
exit(FIFO_CREATE_ERROR);
}
}
~Init()
{
int ret=unlink(FIFO_NAME);
if(ret!=0)
{
perror("unlink error");
exit(FIFO_DELETE_ERROR);
}
}
};
server.cpp
#include "comm.hpp"
int main()
{
Init init;
int fd=open(FIFO_NAME,O_WRONLY);
if(fd<0)
{
perror("open error");
exit(FIFO_OPEN_ERROR);
}
cout<<"server start"<<endl;
string message;
while(true)
{
cout<<"Please enter message@ ";
getline(cin,message);
write(fd,message.c_str(),message.size());
}
close(fd);
return 0;
}
client.cpp
#include "comm.hpp"
int main()
{
int fd = open(FIFO_NAME, O_RDONLY);
if (fd < 0)
{
perror("open error");
exit(FIFO_OPEN_ERROR);
}
while (true)
{
char buf[1024] = {0};
int len = read(fd, buf, sizeof(buf));
if (len == 0)
break;
cout << "server say#" << buf << endl;
}
close(fd);
return 0;
}
makefile
.PHONY:all
all:server client
server:server.cpp
g++ -o $@ $^
client:client.cpp
g++ -o $@ $^
.PHONY:clean
clean:
rm -f server client myfifo