线程通信和进程通信方法
线程通信和进程通信方法
进程间
进程间通信(英语:Inter-Process Communication,简称IPC),指至少两个进程或线程间传送数据或信号的一些技术或方法
管道
单向通信,只能在具有亲缘关系的进程之间使用
python
可以注意到,用fdopen就可以使用和普通文件一样的读写方法
os.write, os.read都要求btyestring,也就是b’xxx’
下面的是父进程写给子进程
import os
r, w = os.pipe()
pid = os.fork()
if pid > 0:
os.close(r)
print("Parent process is writing")
text = "Hello child process"
with os.fdopen(w, 'w') as f:
f.write(text)
print("Written text:", text)
# text = b"Hello child process"
# os.write(w, text) # write bytestring
# print("Written text:", text.decode())
else:
os.close(w)
print("\nChild Process is reading")
# f = os.fdopen(r)
# print("Read text:", f.read())
# f.close()
with os.fdopen(r, 'r') as f:
print("Read text:", f.read())
# data = os.read(r, 19)
# print(data)
# print(data.decode())
子进程写父进程读也是可以的
import os
r, w = os.pipe()
pid = os.fork()
if pid > 0:
os.close(w)
print("\nParent Process is reading")
# f = os.fdopen(r)
# print("Read text:", f.read())
# f.close()
with os.fdopen(r, 'r') as f:
print("Read text:", f.read())
# data = os.read(r, 19)
# print(data)
# print(data.decode())
else:
os.close(r)
print("Child process is writing")
text = "Hello parent process"
with os.fdopen(w, 'w') as f:
f.write(text)
print("Written text:", text)
# text = b"Hello parent process"
# os.write(w, text) # write bytestring
# print("Written text:", text.decode())
开两个管道也可以双向
//虽然我不知道这么写是不是对的
import os
parent2child_r, parent2child_w = os.pipe()
child2parent_r, child2parent_w = os.pipe()
pid = os.fork()
if pid > 0:
os.close(parent2child_r)
print("Parent process is writing")
text = "Hello child process"
with os.fdopen(parent2child_w, 'w') as f:
f.write(text)
print("Parent process Written text:", text)
# text = b"Hello child process"
# os.write(w, text) # write bytestring
# print("Written text:", text.decode())
os.close(child2parent_w)
print("\nParent Process is reading")
with os.fdopen(child2parent_r, 'r') as f:
text = f.read()
print("Parent Process Read text:", text)
else:
os.close(parent2child_w)
print("\nChild Process is reading")
# f = os.fdopen(r)
# print("Read text:", f.read())
# f.close()
text = ''
with os.fdopen(parent2child_r, 'r') as f:
text = f.read()
print("Child Process Read text:", text)
# data = os.read(r, 19)
# print(data)
# print(data.decode())
text += ', faQ'
os.close(child2parent_r)
print("\nChild Process is writing")
with os.fdopen(child2parent_w, 'w') as f:
f.write(text)
print("Child ProcessWritten text:", text)
也可以直接重定向标准输入输出
import os
import sys
import random
low, high = 1, 100
parent2child_r, parent2child_w = os.pipe()
child2parent_r, child2parent_w = os.pipe()
pid = os.fork()
if pid > 0:
os.close(parent2child_r)
os.close(child2parent_w)
ans = random.randint(low, high)
print(f'ans: {ans}')
os.dup2(parent2child_w, sys.stdout.fileno())
os.dup2(child2parent_r, sys.stdin.fileno())
with open('parent.log', 'w') as f:
while True:
guess = int(str.strip(input()))
f.write(f'{guess}\n')
if guess == ans:
print(0)
break
elif guess < ans:
print(-1)
else:
print(1)
else:
os.close(parent2child_w)
os.close(child2parent_r)
os.dup2(child2parent_w, sys.stdout.fileno())
os.dup2(parent2child_r, sys.stdin.fileno())
l, r = 1, 100
with open('child.log', 'w') as f:
while l <= r:
mid = l + (r - l) // 2
print(mid)
result = int(str.strip(input()))
f.write(f'{result}\n')
if result == 0:
break
elif result == -1:
l = mid + 1
else:
r = mid - 1
C/C++
#include<iostream>
#include<string>
#include<unistd.h>
int main(){
int fd[2];
pid_t pid;
if(pipe(fd) == -1){
std::cerr<< "Error: pipe creation failed" << std::endl;
return -1;
}
pid = fork();
if(pid < 0){
std::cerr<< "Error: fork failed" << std::endl;
return 1;
}
else if(pid > 0){ //parent process
close(fd[0]);
std::string msg = "Hello child process";
if(write(fd[1], msg.c_str(), msg.size()) == -1){
std::cerr<< "Error: write to pipe failed" << std::endl;
return -2;
}
close(fd[1]);
}
else{ // child process
close(fd[1]);
char buffer[256];
int len = read(fd[0], buffer, sizeof(buffer));
if(len == -1){
std::cerr<< "Error: read from pipe failed" << std::endl;
return -3;
}
buffer[len] = '\0';
std::cout<< "Received message: \" " << buffer << '"' << std::endl;
close(fd[0]);
}
return 0;
}
重定向也是可以的
#include<iostream>
#include<string>
#include<unistd.h>
int main(){
int fd[2];
pid_t pid;
if(pipe(fd) == -1){
std::cerr<< "Error: pipe creation failed" << std::endl;
return -1;
}
pid = fork();
if(pid < 0){
std::cerr<< "Error: fork failed" << std::endl;
return 1;
}
else if(pid > 0){ //parent process
close(fd[0]);
std::string msg = "Hello child process";
if(dup2(fd[1], STDOUT_FILENO) != STDOUT_FILENO){
std::cerr<< "dup2 error to stdout"<< std::endl;
return -2;
}
std::cout<< msg << std::endl;
close(fd[1]);
}
else{ // child process
close(fd[1]);
if(dup2(fd[0], STDIN_FILENO) != STDIN_FILENO){
std::cerr<< "dup2 error to stdin"<< std::endl;
return -4;
};
std::string msg;
std::cin >> msg;
//use getline to get "Hello child process"
std::cout<< msg.size()<< std::endl;
std::cout<< msg<< std::endl;
close(fd[0]);
}
return 0;
}
命名管道
命名管道(Named Pipe):允许无亲缘关系的进程间进行通信。
python
import os
import sys
import time
import random
pipe_name = 'pipe_test'
if not os.path.exists(pipe_name):
os.mkfifo(pipe_name)
pid = os.fork()
if pid > 0:
pipein = open(pipe_name, 'r')
with open(pipe_name, 'r') as f:
while True:
# time.sleep(1)
line = f.readline()[:-1]
print(f'Parent got {line} at {time.time()}')
else:
cnt = 0
pipeout = os.open(pipe_name, os.O_WRONLY)
# os.O_WRONLY <=> 'w'
while cnt < 20:
time.sleep(1)
os.write(pipeout, f'Number {cnt}\n'.encode())
cnt += 1
C/C++
client输出,server接收
server.cpp
#include<iostream>
#include<string>
#include<cstring>
#include<sys/stat.h>
#include<unistd.h>
#include<fcntl.h>
std::string fifoname = "./fifo";
mode_t mode = 0666;
int main(){
umask(0);
unlink(fifoname.c_str());
if(mkfifo(fifoname.c_str(), mode) != 0){
std::cerr<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
int fd = open(fifoname.c_str(), O_RDONLY);
if(fd < 0){
std::cerr<< errno<< ':'<< strerror(errno)<< std::endl;
return -2;
}
if(dup2(fd, STDIN_FILENO) != STDIN_FILENO){
std::cerr<< "dup2 error to stdin"<< std::endl;
return -3;
}
std::string data;
while(std::cin>> data){
std::cout<< ""<< data<< std::endl;
}
close(fd);
unlink(fifoname.c_str());
return 0;
}
client.cpp
#include<iostream>
#include<string>
#include<cstring>
#include<sys/stat.h>
#include<unistd.h>
#include<fcntl.h>
std::string fifoname = "./fifo";
mode_t mode = 0666;
int main(){
int fd = open(fifoname.c_str(), O_WRONLY);
if(fd < 0){
std::cerr<< errno<< ':'<< strerror(errno)<< std::endl;
return -1;
}
if(dup2(fd, STDOUT_FILENO) != STDOUT_FILENO){
std::cerr<< "dup2 error to stdout"<< std::endl;
return -2;
}
std::string msg;
while(std::cin>> msg){
std::cout<< msg<< std::endl;
}
close(fd);
return 0;
}
共享内存
使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。是针对其他通信机制运行效率较低而设计的。往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。
shmget
先启动writer.cpp,输入想要写入共享内存的内容
再启动reader.cpp,读取共享内存
writer.cpp
#include<iostream>
#include<cstdio>
#include<sys/ipc.h>
#include<sys/shm.h>
int main(){
key_t key = ftok("shmfile", 65);
int shmid = shmget(key, 1024, 0666|IPC_CREAT);
char* str = (char*)shmat(shmid, nullptr, 0);
std::cout<< "writing data: ";
std::cin.getline(str, 1024);
std::cout<< "Data written in memory:" << str<< std::endl;
shmdt(str);
return 0;
}
reader.cpp
#include<iostream>
#include<cstdio>
#include<sys/ipc.h>
#include<sys/shm.h>
int main(){
key_t key = ftok("shmfile", 65);
int shmid = shmget(key, 1024, 0666|IPC_CREAT);
char* str = (char*)shmat(shmid, nullptr, 0);
std::cout<< "Data read from memory:"<< str<< std::endl;
shmdt(str);
shmctl(shmid, IPC_RMID, nullptr);
return 0;
}
mmap
信号量
信号量(Semaphore):用于控制对共享资源的访问,防止出现竞态条件。
python
C/C++
#include<iostream>
#include<cstring>
#include<sys/sem.h>
#include<sys/ipc.h>
#include<unistd.h>
/* semop system calls takes an array of these. */
//struct sembuf {
// unsigned short sem_num; /* semaphore index in array */
// short sem_op; /* semaphore operation */
// short sem_flg; /* operation flags */
//};
/* arg for semctl system calls. */
union SEMUN {
int val; /* value for SETVAL */
struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */
unsigned short *array; /* array for GETALL & SETALL */
struct seminfo *__buf; /* buffer for IPC_INFO */
void *__pad;
};
int init_sem(int sem_id, int value){
union SEMUN tmp;
tmp.val = value;
if(semctl(sem_id, 0, SETVAL, tmp) == -1){
std::cerr<< "semctl error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
return 0;
}
/*
p operation
if semaphore is 1, -1
if semaphore is 0, wait
*/
int sem_p(int sem_id){
struct sembuf sbuf;
sbuf.sem_num = 0;
sbuf.sem_op = -1;
sbuf.sem_flg = SEM_UNDO;
if(semop(sem_id, &sbuf, 1) == -1){
std::cerr<< "p operation error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
return 0;
}
int sem_v(int sem_id){
struct sembuf sbuf;
sbuf.sem_num = 0;
sbuf.sem_op = 1;
sbuf.sem_flg = SEM_UNDO;
if(semop(sem_id, &sbuf, 1) == -1){
std::cerr<< "v operation error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
return 0;
}
int del_sem(int sem_id){
union SEMUN tmp;
if(semctl(sem_id, 0, IPC_RMID, tmp) == -1){
std::cerr<< "delete semaphore error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
return 0;
}
int main(){
key_t key = ftok(".", 65);
if(key == -1){
std::cerr<< "ftok error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
int sem_id = semget(key, 1, IPC_CREAT|0666);
if(sem_id == -1){
std::cerr<< "semget error"<< errno<< ':' << strerror(errno)<< std::endl;
return -2;
}
init_sem(sem_id, 0);
pid_t pid = fork();
if(pid == -1){
std::cerr<< "fork error"<< errno<< ':' << strerror(errno)<< std::endl;
return -3;
}
else if(pid == 0){
sleep(2);
std::cout<< "Process child: pid = "<< getpid()<< std::endl;
sem_v(sem_id);
}
else{
sem_p(sem_id);
std::cout<< "Process father: pid = "<< getpid()<< std::endl;
sem_v(sem_id);
del_sem(sem_id);
}
return 0;
}
消息队列
消息发送者将消息发送到队列中,接收者从队列中取出消息
python
C/C++
函数msgrcv在读取消息队列时,type参数有下面几种情况:
- type == 0,返回队列中的第一个消息;
- type > 0,返回队列中消息类型为 type 的第一个消息;
- type < 0,返回队列中消息类型值小于或等于 type 绝对值的消息,如果有多个,则取类型值最小的消息。
先启动server
再启动client
server.cpp
#include<iostream>
#include<cstdio>
#include<string>
#include<cstring>
#include<unistd.h>
#include<sys/msg.h>
#include<sys/ipc.h>
const std::string MSG_FILE = "/tmp"; //must exists
const int MSGSZ = 256;
struct MSG_FORM {
long mtype; /* message type, must be > 0 */
char mtext[MSGSZ]; /* message data */
};
int main(){
key_t key = ftok(MSG_FILE.c_str(), 65);
if(key == -1){
std::cerr<< "ftok error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
std::cout<< "Message Queue - Server key is: "<< key<< std::endl;
struct MSG_FORM msg;
int msqid = msgget(key, IPC_CREAT|0777);
if(msqid == -1){
std::cerr<< "msgget error"<< errno<< ':' << strerror(errno)<< std::endl;
return -2;
}
std::cout<< "My msqid is: "<< msqid<< std::endl;
std::cout<< "My pid is: "<< getpid()<< std::endl;
for(;;){
msgrcv(msqid, &msg, MSGSZ, 888, 0); // return the first msg which has the type of 888
std::cout<< "Server: receive msg.mtext is: "<< msg.mtext<< std::endl;
std::cout<< "Server: receive msg.mtype is: "<< msg.mtype<< std::endl;
msg.mtype = 999;
sprintf(msg.mtext, "hello, I'm server %d", getpid());
msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
}
return 0;
}
client.cpp
#include<iostream>
#include<cstdio>
#include<string>
#include<cstring>
#include<unistd.h>
#include<sys/msg.h>
#include<sys/ipc.h>
const std::string MSG_FILE = "/tmp";
const int MSGSZ = 256;
struct MSG_FORM {
long mtype; /* message type, must be > 0 */
char mtext[MSGSZ]; /* message data */
};
int main(){
key_t key = ftok(MSG_FILE.c_str(), 65);
if(key < 0){
std::cerr<< "ftok error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
std::cout<< "Message Queue - Client key is: "<< key<< std::endl;
struct MSG_FORM msg;
int msqid = msgget(key, IPC_CREAT|0777);
if(msqid == -1){
std::cerr<< "msgget error"<< errno<< ':' << strerror(errno)<< std::endl;
return -2;
}
std::cout<< "My msqid is: "<< msqid<< std::endl;
std::cout<< "My pid is: "<< getpid()<< std::endl;
msg.mtype = 888;
sprintf(msg.mtext, "hello I'm client %d", getpid());
msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
msgrcv(msqid, &msg, MSGSZ, 999, 0);
std::cout<< "Client: receive msg.mtext is: "<< msg.mtext<< std::endl;
std::cout<< "Client: receive msg.mtype is: "<< msg.mtype<< std::endl;
return 0;
}
套接字
套接字(Socket):适用于不同计算机或操作系统之间的进程通信。
python
C/C++
先启动server
再启动client
server.cpp
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
//struct sockaddr_in {
// sa_family_t sin_family; /* AF_INET */
// in_port_t sin_port; /* Port number */
// struct in_addr sin_addr; /* IPv4 address */
//};
//struct in_addr {
// in_addr_t s_addr;
//};
const int PORT = 8090;
const int BUFSZ = 1024;
int main(){
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
if(server_fd < 0){
std::cerr<< "socket error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
int opt = 1;
if(setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR|SO_REUSEPORT, &opt, sizeof(opt))){
std::cerr<< "setsockopt error"<< errno<< ':' << strerror(errno)<< std::endl;
return -2;
}
struct sockaddr_in address;
address.sin_family = AF_INET;
address.sin_addr.s_addr = INADDR_ANY;
address.sin_port = htons(PORT);
if(bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0){
std::cerr<< "bind error"<< errno<< ':' << strerror(errno)<< std::endl;
return -3;
}
if(listen(server_fd, 128) < 0){
std::cerr<< "listen error"<< errno<< ':' << strerror(errno)<< std::endl;
return -4;
}
struct sockaddr_in client_address;
socklen_t client_address_len = sizeof(client_address);
int cfd = accept(server_fd, (struct sockaddr*)&client_address, &client_address_len);
if(cfd < 0){
std::cerr<< "accept error"<< errno<< ':' << strerror(errno)<< std::endl;
return -5;
}
char ip[24] = {};
inet_ntop(AF_INET, &client_address.sin_addr.s_addr, ip, sizeof(ip));
int client_port = ntohs(client_address.sin_port);
std::cout<< "client's ip: "<< ip<< ":"<< client_port<< std::endl;
char data[BUFSZ];
for(;;){
int len = recv(cfd, data, BUFSZ - 1, 0);
if(len > 0){
data[len] = 0;
std::cout<< "client: "<< data<< std::endl;
strcat(data, " received");
std::cout<< "send: "<< data<< std::endl;
send(cfd, data, strlen(data), 0);
}
else if(len == 0){
std::cout<< "client end"<< std::endl;
break;
}
else{
std::cout<< "error"<< std::endl;
break;
}
}
close(cfd);
close(server_fd);
return 0;
}
client.cpp
#include <iostream>
#include <string>
#include <cstring>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
//struct sockaddr_in {
// sa_family_t sin_family; /* AF_INET */
// in_port_t sin_port; /* Port number */
// struct in_addr sin_addr; /* IPv4 address */
//};
//struct in_addr {
// in_addr_t s_addr;
//};
const int PORT = 8090;
const std::string SERVER_IP = "127.0.0.1";
const int BUFSZ = 1024;
int main(){
int client_fd = socket(AF_INET, SOCK_STREAM, 0);
if(client_fd < 0){
std::cerr<< "socket error"<< errno<< ':' << strerror(errno)<< std::endl;
return -1;
}
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
if(inet_pton(AF_INET, SERVER_IP.c_str(), &serv_addr.sin_addr) <= 0){
std::cerr<< "Invalid address/ address not supported"<< std::endl;
return -2;
}
int status = connect(client_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if(status < 0){
std::cerr<< "connection failed"<< std::endl;
return -3;
}
char data[BUFSZ];
for(;;){
std::string msg = "faQ 2";
send(client_fd, msg.c_str(), msg.size(), 0);
std::cout<< "send: "<< msg<< std::endl;
int len = recv(client_fd, data, BUFSZ - 1, 0);
if(len > 0){
data[len] = 0;
std::cout<< "server: "<< data<< std::endl;
}
else if(len == 0){
std::cout<< "server end"<< std::endl;
break;
}
else{
std::cout<< "error"<< std::endl;
break;
}
sleep(5);
}
close(client_fd);
return 0;
}
信号
信号(Signal):进程会收到一个信号,可以用来通知进程发生了某个事件。
python
C/C++
#include<iostream>
#include<cstring>
#include<unistd.h>
#include<signal.h>
void sighup(int sig){
std::cout<< "sig = "<< sig<< std::endl;
std::cout<< "CHILD: I have received a SIGHUP"<< std::endl;
}
void sigint(int sig){
std::cout<< "sig = "<< sig<< std::endl;
std::cout<< "CHILD: I have received a SIGINT"<< std::endl;
}
void sigquit(int sig){
std::cout<< "sig = "<< sig<< std::endl;
std::cout<< "My DADDY has killed me!!!"<< std::endl;
exit(0);
}
int main(){
int pid = fork();
if(pid < 0){
return -1;
}
else if(pid == 0){
signal(SIGHUP, sighup);
signal(SIGINT, sigint);
signal(SIGQUIT, sigquit);
for(;;);
}
else{
std::cout<< "PARENT: sending SIGHUP"<< std::endl;
kill(pid, SIGHUP);
sleep(3);
std::cout<< "PARENT: sending SIGINT"<< std::endl;
kill(pid, SIGINT);
sleep(3);
std::cout<< "PARENT: sending SIGQUIT"<< std::endl;
kill(pid, SIGQUIT);
sleep(3);
}
return 0;
}
https://blog.csdn.net/de_se/article/details/130372187
https://blog.csdn.net/weixin_38416696/article/details/90719388
https://www.cnblogs.com/zgq0/p/8780893.html
https://blog.csdn.net/qq_62939852/article/details/130138087
https://www.geeksforgeeks.org/socket-programming-cc/
线程间
线程间常用的通信方式有互斥锁,读写锁,条件变量,自旋锁和屏障
互斥锁(Mutex)
控制对共享资源的访问,确保同一时刻只有一个线程可以访问该资源。
读写锁(Read-Write Lock)
允许多个线程同时读取共享资源,但在写入时必须互斥。
条件变量(Condition Variable)
让线程等待某些条件满足后再继续执行,用于线程间通信和同步。
自旋锁(Spin Lock)
在使用短时间内的锁保护共享资源时,相比于互斥锁,自旋锁不会导致线程阻塞,而是一直进行忙等待直到获得锁。
屏障(Barrier)
让多个线程在特定点处等待彼此,直到所有线程都到达这个点才能继续执行后面的代码。
https://blog.csdn.net/baidu_39514357/article/details/127262001?spm=1001.2014.3001.5502
https://levelup.gitconnected.com/producer-consumer-problem-using-mutex-in-c-764865c47483