【Linux-进程间通信】了解信号量 + 共享内存 + 消息队列的应用
信号量(了解)
1.概念理论渗透
1.多个执行流(进程),能看到的同一份资源:共享资源
2.被保护起来的资源-----临界资源---同步和互斥----用互斥的方式保护共享资源----临界资源
3.互斥:任何时刻只能有一个进程在访问共享资源
4.资源要被程序员访问,资源被访问就是通过代码访问-----代码 = 访问共享资源的代码 (临界区)+ 不访问共享资源的代码(非临界区)
5.所谓的对共享资源进行保护--临界资源--本质是对访问共享资源的代码进行保护(通过加锁保护)
2.什么是信号量
信号量(Semaphore)是操作系统和并发编程中一种用于协调不同进程或线程对共享资源访问的同步机制。信号量的核心思想是通过计数的方式来控制资源的使用。它可以保证多个进程或线程不会在同一时间对同一共享资源进行访问,从而避免竞争条件(race condition)和资源冲突。
3.信号量的类型
1.计数信号量(Counting Semaphore):
计数信号量有一个整型计数器,表示可用资源的数量。当计数器值为正时,表示还有资源可以被分配;为0时,表示所有资源都已经被占用;为负时,表示有等待的线程或进程。
适用于限制多个进程/线程访问一组相同的资源,例如限制最多有N个线程可以访问某个数据库连接池。
2.二元信号量(Binary Semaphore):
又称为互斥信号量(Mutex),其值只有0和1,主要用于实现互斥锁(Mutex)。值为1时表示资源空闲,0表示资源已被占用。
适用于只有一个线程可以访问某一资源的场景(类似于锁机制)。
4.信号量的主要操作
P 操作(wait/Down/Proberen):将信号量的值减1。当信号量的值大于0时,线程可以继续执行;如果信号量值为0或负值,则线程进入等待队列,直到信号量的值大于0。
V 操作(signal/Up/Verhogen):将信号量的值加1,并唤醒一个正在等待的线程(如果有)。
申请信号量的本质:就是对公共资源的一种预定机制
System V 信号量的基本操作:
-
创建/获取信号量:semget
int semget(key_t key, int nsems, int semflg);
功能:
•semget 用于创建或获取一个信号量集。每个信号量集可以包含多个信号量。
•如果指定的信号量集不存在且 IPC_CREAT 标志被设置,则创建一个新的信号量集;否则,返回已存在的信号量集的标识符。
参数:
•key: 信号量集的键值,标识符由此生成。如果为 IPC_PRIVATE,则创建一个新的信号量集,且只能被调用进程访问。
•nsems: 要创建的信号量的数量。如果获取一个已存在的信号量集,则此参数将被忽略。
•semflg: 权限标志和控制选项,通常是 0666 | IPC_CREAT,其中权限类似于文件权限。
返回值:
•成功返回信号量集的标识符(正整数),失败返回 -1。
key_t key = ftok("somefile", 65); // 获取唯一key
int semid = semget(key, 1, 0666 | IPC_CREAT); // 创建一个信号量集,包含1个信号量
if (semid == -1) {
perror("semget failed");
}
-
P/V 操作:semop
int semop(int semid, struct sembuf *sops, size_t nsops);
功能:
•semop 用于执行对信号量的操作,包括 P 操作(等待/减1操作)和 V 操作(释放/加1操作)。
•semop 可以执行一个或多个信号量操作,通过一个 struct sembuf 数组指定每个信号量的操作。
参数:
•semid: 信号量集的标识符,由 semget 返回。
•sops: 指向信号量操作数组的指针,每个元素是一个 struct sembuf 结构体,定义信号量的具体操作。
•nsops: 操作的数量,即 sops 数组的大小。
返回值:成功返回 0,失败返回 -1。
struct sembuf 结构体:
【示例】
-
设置信号量值:semctl
int semctl(int semid, int semnum, int cmd, ...);
功能:
•semctl 用于控制信号量集或单个信号量。它可以获取、设置信号量的值,还可以删除信号量集。
•常用操作包括设置信号量的值、获取信号量的值和删除信号量集。
参数:
•semid: 信号量集的标识符。
•semnum: 信号量集中的信号量编号。对整个信号量集操作时,此值可以被忽略。
•cmd: 指定要执行的控制命令。常见命令有:
•SETVAL: 设置信号量的值。
•GETVAL: 获取信号量的值。
•IPC_RMID: 删除信号量集。
返回值:根据不同的命令,返回值可能是信号量的值、执行操作的状态等。如果失败,返回 -1。
示例 - 设置信号量值:
semctl(semid, 0, SETVAL, 1); // 将第0个信号量的值设置为1
示例 - 获取信号量值:
int val = semctl(semid, 0, GETVAL); // 获取第0个信号量的值
printf("Semaphore value: %d\n", val);
-
删除信号量:
semctl(semid, 0, IPC_RMID); // 删除信号量集
【代码示例】
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/sem.h>
int main() {
// 1. 创建/获取信号量
key_t key = ftok("somefile", 65); // 获取唯一key
int semid = semget(key, 1, 0666 | IPC_CREAT); // 创建一个信号量集,包含1个信号量
// 2. 初始化信号量值
semctl(semid, 0, SETVAL, 1); // 将信号量设为1
// 3. P 操作(减1)
struct sembuf sb = {0, -1, 0}; // 操作第0个信号量,执行减1操作
semop(semid, &sb, 1); // 执行信号量操作
printf("Entering critical section...\n");
// 4. 执行临界区代码
sleep(2);
// 5. V 操作(加1)
sb.sem_op = 1; // 将信号量加1
semop(semid, &sb, 1); // 退出临界区
printf("Leaving critical section...\n");
// 6. 删除信号量
semctl(semid, 0, IPC_RMID); // 删除信号量
return 0;
}
System V 信号量的相关操作说明:
•semget: 创建或获取一个信号量集(可以包含多个信号量)。
•semop: 对信号量执行 P 或 V 操作。
•semctl: 控制信号量,可以用于设置信号量的值、获取信号量值,或者删除信号量。
共享内存应用--Sever&Client通信
client.cc
#include "Shm.hpp"
#include "namedPipe.hpp"
int main()
{
// 1. 创建共享内存
Shm shm(gpathname, gproj_id, gUser);
shm.Zero();
char *shmaddr = (char *)shm.Addr();
sleep(3);
// 2. 打开管道
NamePiped fifo(comm_path, User);
fifo.OpenForWrite();
// 当成string
char ch = 'A';
while (ch <= 'Z')
{
shmaddr[ch - 'A'] = ch;
std::string temp = "wakeup";
std::cout << "add " << ch << " into Shm, " << "wakeup reader" << std::endl;
fifo.WriteNamedPipe(temp);
sleep(2);
ch++;
}
return 0;
}
namedPipe.hpp
#pragma once
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
const std::string comm_path = "./myfifo";
#define DefaultFd -1
#define Creater 1
#define User 2
#define Read O_RDONLY
#define Write O_WRONLY
#define BaseSize 4096
class NamePiped
{
private:
bool OpenNamedPipe(int mode)
{
_fd = open(_fifo_path.c_str(), mode);
if (_fd < 0)
return false;
return true;
}
public:
NamePiped(const std::string &path, int who)
: _fifo_path(path), _id(who), _fd(DefaultFd)
{
if (_id == Creater)
{
int res = mkfifo(_fifo_path.c_str(), 0666);
if (res != 0)
{
perror("mkfifo");
}
std::cout << "creater create named pipe" << std::endl;
}
}
bool OpenForRead()
{
return OpenNamedPipe(Read);
}
bool OpenForWrite()
{
return OpenNamedPipe(Write);
}
// const &: const std::string &XXX
// * : std::string *
// & : std::string &
int ReadNamedPipe(std::string *out)
{
char buffer[BaseSize];
int n = read(_fd, buffer, sizeof(buffer));
if(n > 0)
{
buffer[n] = 0;
*out = buffer;
}
return n;
}
int WriteNamedPipe(const std::string &in)
{
return write(_fd, in.c_str(), in.size());
}
~NamePiped()
{
if (_id == Creater)
{
int res = unlink(_fifo_path.c_str());
if (res != 0)
{
perror("unlink");
}
std::cout << "creater free named pipe" << std::endl;
}
if(_fd != DefaultFd) close(_fd);
}
private:
const std::string _fifo_path;
int _id;
int _fd;
};
server.cc
#include "Shm.hpp"
#include "namedPipe.hpp"
int main()
{
// 1. 创建共享内存
Shm shm(gpathname, gproj_id, gCreater);
char *shmaddr = (char*)shm.Addr();
shm.DebugShm();
// // 2. 创建管道
// NamePiped fifo(comm_path, Creater);
// fifo.OpenForRead();
// while(true)
// {
// // std::string temp;
// // fifo.ReadNamedPipe(&temp);
// std::cout << "shm memory content: " << shmaddr << std::endl;
// }
sleep(5);
return 0;
}
Shm.hpp
#ifndef __SHM_HPP__
#define __SHM_HPP__
#include <iostream>
#include <string>
#include <cerrno>
#include <cstdio>
#include <cstring>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
const int gCreater = 1;
const int gUser = 2;
const std::string gpathname = "/home/wuxu/day26/CS-shm";
const int gproj_id = 0x66;
const int gShmSize = 4097; // 4096*n
class Shm
{
private:
key_t _key;
int _shmid;
std::string _pathname;
int _proj_id;
int _who;
void *_addrshm;
private:
key_t GetCommKey()
{
key_t k = ftok(_pathname.c_str(), _proj_id);
if (k < 0)
{
perror("ftok");
}
return k;
}
int GetShmHelper(key_t key, int size, int flag)
{
int shmid = shmget(key, size, flag);
if (shmid < 0)
{
perror("shmget");
}
return shmid;
}
std::string RoleToString(int who)
{
if (who == gCreater)
return "Creater";
else if (who == gUser)
return "gUser";
else
return "None";
}
void *AttachShm()
{
if (_addrshm != nullptr)
DetachShm(_addrshm);
void *shmaddr = shmat(_shmid, nullptr, 0);
if (shmaddr == nullptr)
{
perror("shmat");
}
std::cout << "who: " << RoleToString(_who) << " attach shm..." << std::endl;
return shmaddr;
}
void DetachShm(void *shmaddr)
{
if (shmaddr == nullptr)
return;
shmdt(shmaddr);
std::cout << "who: " << RoleToString(_who) << " detach shm..." << std::endl;
}
public:
Shm(const std::string &pathname, int proj_id, int who)
: _pathname(pathname), _proj_id(proj_id), _who(who), _addrshm(nullptr)
{
_key = GetCommKey();
if (_who == gCreater)
GetShmUseCreate();
else if (_who == gUser)
GetShmForUse();
_addrshm = AttachShm();
std::cout << "shmid: " << _shmid << std::endl;
std::cout << "_key: " << ToHex(_key) << std::endl;
}
~Shm()
{
if (_who == gCreater)
{
int res = shmctl(_shmid, IPC_RMID, nullptr);
}
std::cout << "shm remove done..." << std::endl;
}
std::string ToHex(key_t key)
{
char buffer[128];
snprintf(buffer, sizeof(buffer), "0x%x", key);
return buffer;
}
bool GetShmUseCreate()
{
if (_who == gCreater)
{
_shmid = GetShmHelper(_key, gShmSize, IPC_CREAT | IPC_EXCL | 0666);
if (_shmid >= 0)
return true;
std::cout << "shm create done..." << std::endl;
}
return false;
}
bool GetShmForUse()
{
if (_who == gUser)
{
_shmid = GetShmHelper(_key, gShmSize, IPC_CREAT | 0666);
if (_shmid >= 0)
return true;
std::cout << "shm get done..." << std::endl;
}
return false;
}
void Zero()
{
if (_addrshm)
{
memset(_addrshm, 0, gShmSize);
}
}
void *Addr()
{
return _addrshm;
}
void DebugShm()
{
struct shmid_ds ds;
int n = shmctl(_shmid, IPC_STAT, &ds);
if (n < 0)
return;
std::cout << "ds.shm_perm.__key : " << ToHex(ds.shm_perm.__key) << std::endl;
std::cout << "ds.shm_nattch: " << ds.shm_nattch << std::endl;
}
};
#endif
消息队列——实现Client&Server
由于struct msgbuf中的mtype可以标识向那个进程发送消息。假设Server接收mtype为1的数据↓↓↓
Com.hpp
#pragma once
#include <iostream>
#include <cstring>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
const std::string PATH = "/home/xiaoming";
const int PROJ_ID = 999;
const int BUFF_SIZE = 1024;
enum
{
MSG_CREAT_ERR = 1,
MSG_GET_ERR,
MSG_DELETE_ERR
};
int CreateMsg()
{
key_t key = ftok(PATH.c_str(), PROJ_ID);
int msgid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
if (msgid < 0)
{
perror("msg create error");
exit(MSG_CREAT_ERR);
}
return msgid;
}
int GetMsg()
{
key_t key = ftok(PATH.c_str(), PROJ_ID);
int msgid = msgget(key, IPC_CREAT | 0666);
if (msgid < 0)
{
perror("msg get error");
exit(MSG_GET_ERR);
}
return msgid;
}
Server.cc
#include "Com.hpp"
int main()
{
struct msgbuf buffer;
int msgid = CreateMsg();
while (true)
{
msgrcv(msgid, &buffer, BUFF_SIZE, 1, 0);
std::cout << "Client say@ " << buffer.mtext << std::endl;
}
return 0;
}
Client.cc
#include "Com.hpp"
int main()
{
int msgid = GetMsg();
struct msgbuf buffer;
buffer.mtype = 1;
while (true)
{
std::cout << "Says # ";
std::string s;
std::getline(std::cin, s);
strcpy(buffer.mtext, s.c_str());
msgsnd(msgid, &buffer, BUFF_SIZE, 0);
}
return 0;
}