当前位置: 首页 > article >正文

Linux 消息队列的使用方法

文章目录

  • 1.概念
  • 2. 创建消息队列
  • 3. 发送消息
  • 4. 接收消息
  • 5. 消息结构体
  • 6. 消息队列控制(删除、获取队列状态)
  • 消息队列是否存在
  • 7. 使用场景
  • 8. 注意事项
  • 使用例子
  • 判断消息队列是否存在的代码
  • 获取队列空间大小

1.概念

  1. 消息队列是一种进程间通信 (IPC) 机制,允许进程之间交换数据。
  2. 每个消息队列都有一个唯一的标识符 (msqid),通过该标识符来访问消息队列。
  3. 消息队列可以存储多个消息,每个消息包含消息类型和消息内容。

2. 创建消息队列

  1. 使用 msgget() 函数创建消息队列。
  2. msgget() 函数需要两个参数:
    int msgget(key_t key, int msgflag)
    key_t key: 消息队列的键值,用于标识消息队列。
    int msgflg: 控制消息队列的创建方式,例如 IPC_CREAT 用于创建新的消息队列,IPC_EXCL 用于检查消息队列是否存在。
  3. 成功创建或访问消息队列,返回正整数 或 为0消息队列标识符(ID)。
    失败返回 < 0

使用 IPC_PRIVATE 创建的每个消息队列都是唯一的,即使是同一个进程多次创建,它们之间也是相互独立的,不会共享数据。

  • IPC_PRIVATE 用于创建一个私有的消息队列,它只对创建它的进程可见,其他进程无法访问。并且当创建者进程终止时,该 IPC 对象也会被自动销毁
  • 当一个进程使用 IPC_PRIVATE 创建消息队列时,系统会分配一个唯一的标识符(即消息队列 ID)给它。
  • 即使同一个进程多次使用 IPC_PRIVATE 创建消息队列,每次分配的标识符也会是不同的。
    因此,即使是同一个进程,每次使用 IPC_PRIVATE 创建的队列都是独立的、不同的消息队列。
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    int msqid1 = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
    if (msqid1 == -1) {
        perror("msgget 1");
        exit(1);
    }
    printf("Message queue ID 1: %d\n", msqid1);

    int msqid2 = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
    if (msqid2 == -1) {
        perror("msgget 2");
        exit(1);
    }
    printf("Message queue ID 2: %d\n", msqid2);

    // msqid1 和 msqid2 不同,指向不同的消息队列
    if (msqid1 == msqid2) {
        printf("Error: msqid1 and msqid2 are the same!\n");
    } else {
        printf("msqid1 and msqid2 are different, as expected.\n");
    }

    // 删除消息队列
    msgctl(msqid1, IPC_RMID, NULL);
    msgctl(msqid2, IPC_RMID, NULL);

    return 0;
}

3. 发送消息

  1. 使用 msgsnd() 函数发送消息到消息队列。

  2. msgsnd() 函数需要四个参数:
    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
    int msqid: 消息队列标识符。
    const void *msgp: 指向消息结构体的指针。
    size_t msgsz: 消息结构体的大小。
    int msgflg: 控制消息发送方式,例如 IPC_NOWAIT 用于非阻塞发送,填0则用于阻塞发送。

  3. 返回值是 0 表示发送成功,-1 表示发送失败。

4. 接收消息

  1. 使用 msgrcv() 函数从消息队列接收消息。

  2. msgrcv() 函数需要五个参数:
    ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
    int msqid: 消息队列标识符。
    void *msgp: 指向消息结构体的指针。
    size_t msgsz: 消息结构体的大小。
    long msgtyp: 消息类型,用于过滤接收的消息。
    int msgflg: 控制消息接收方式,例如 IPC_NOWAIT 用于非阻塞接收。填0则用于阻塞接受。

  3. 返回值是接收到的消息大小,如果接收失败,则返回 -1。(若使用了 IPC_NOWAIT这个标志,在无消息时也是返回-1)

5. 消息结构体

  1. 消息结构体包含消息类型 (mtype) 和消息内容 (mtext)。
  2. mtype 用于区分不同类型的消息。这个类型一点是
  3. mtext 可以存储任意数据。
// 定义消息结构体
struct msgbuf {
    long mtype;    // 消息类型,一定要放在结构体前面,而且是long类型
    char mtext[256]; // 消息内容
};

6. 消息队列控制(删除、获取队列状态)

  1. 使用 msgctl() 函数控制消息队列。
  2. msgctl() 函数需要三个参数:
    int msgctl(int msqid,int cmd,struct msqid_ds *buf)
    int msqid: 消息队列标识符。
    int cmd: 控制命令,例如 IPC_STAT 用于获取消息队列状态,IPC_RMID 用于删除消息队列。
  3. struct msqid_ds *buf: 指向消息队列控制结构体的指针,用于获取或设置消息队列属性。

不使用 IPC_PRIVATE 创建消息队列创建后,如果不删除,会一直存在于系统中,直到系统重启。

消息队列是否存在

在这里插入图片描述

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <errno.h>

int main() {
  key_t key = ftok(".", 'a'); // 获取消息队列键值
  int msgid = msgget(key, 0); // 尝试获取现有消息队列

  if (msgid == -1) {
    if (errno == ENOENT) {
      printf("Message queue with key %d does not exist.\n", key);
    } else {
      perror("msgget failed");
    }
    return 1;
  } else {
    printf("Message queue with key %d exists.\n", key);
    return 0;
  }
}

7. 使用场景

进程间通信:不同进程之间传递数据。
多线程通信:不同线程之间传递数据,但需要考虑线程同步问题。

8. 注意事项

  1. 消息队列的创建和使用需要权限控制。
  2. 消息队列需要使用 msgctl() 函数删除,否则会导致资源泄漏。
  3. 在多线程环境下使用消息队列时,需要考虑线程同步问题,避免数据竞争。
  4. 注意创建消息队列的 KEY,不能用相同的KEY获取消息队列,否则将会导致消息无法传输

总结:

Linux 消息队列是一种简单易用的进程间通信机制,可以用于进程之间或线程之间交换数据。它提供了灵活的消息类型和内容存储,但需要谨慎处理线程同步问题,并注意资源释放。

使用例子

=注意:struct msgbuf 中的long mtype一定是long类型

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MSG_KEY 12345

// 定义消息结构体
struct msgbuf {
    long mtype;    // 消息类型
    char mtext[256]; // 消息内容
};

int main() {
    int msqid;
    struct msgbuf message;

    // 创建消息队列
    msqid = msgget(MSG_KEY, IPC_CREAT | 0666);
    if (msqid == -1) {
        perror("msgget failed");
        exit(1);
    }

    // 进程A: 发送消息
    if (fork() == 0) { // 子进程
        printf("Process A: Sending message...\n");

        // 准备消息结构体
        message.mtype = 1;
        strcpy(message.mtext, "Hello from Process A!");

        // 发送消息
        if (msgsnd(msqid, &message, strlen(message.mtext) + 1, 0) == -1) {
            perror("msgsnd failed");
            exit(1);
        }

        printf("Process A: Message sent.\n");
        exit(0);
    } else { // 父进程
        // 等待子进程发送消息
        sleep(1);

        // 进程B: 接收消息
        printf("Process B: Receiving message...\n");

        // 接收消息
        if (msgrcv(msqid, &message, sizeof(message.mtext), 1, 0) == -1) {
            perror("msgrcv failed");
            exit(1);
        }

        printf("Process B: Received message: %s\n", message.mtext);
    }

    // 清理消息队列
    if (msgctl(msqid, IPC_RMID, NULL) == -1) {
        perror("msgctl failed");
        exit(1);
    }

    return 0;
}
#include <pthread.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define MSG_KEY 12345

// 定义消息结构体
struct msgbuf {
    long mtype;    // 消息类型
    char mtext[256]; // 消息内容
};

// 线程函数: 发送消息
void *sender(void *arg) {
    int msqid;
    struct msgbuf message;

    // 获取消息队列标识符
    msqid = *(int *)arg;

    while (1) {
        // 准备消息结构体
        message.mtype = 1;
        sprintf(message.mtext, "Message from sender thread: %ld", getpid());

        // 发送消息
        if (msgsnd(msqid, &message, strlen(message.mtext) + 1, 0) == -1) {
            perror("msgsnd failed");
            exit(1);
        }

        printf("Sender thread: Message sent: %s\n", message.mtext);
        sleep(1);
    }

    return NULL;
}

// 线程函数: 接收消息
void *receiver(void *arg) {
    int msqid;
    struct msgbuf message;

    // 获取消息队列标识符
    msqid = *(int *)arg;

    while (1) {
        // 接收消息
        if (msgrcv(msqid, &message, sizeof(message.mtext), 1, 0) == -1) {
            perror("msgrcv failed");
            exit(1);
        }

        printf("Receiver thread: Received message: %s\n", message.mtext);
    }

    return NULL;
}

int main() {
    int msqid;
    pthread_t sender_thread, receiver_thread;

    // 创建消息队列
    msqid = msgget(MSG_KEY, IPC_CREAT | 0666);
    if (msqid == -1) {
        perror("msgget failed");
        exit(1);
    }

    // 创建发送线程
    if (pthread_create(&sender_thread, NULL, sender, &msqid) != 0) {
        perror("pthread_create failed");
        exit(1);
    }

    // 创建接收线程
    if (pthread_create(&receiver_thread, NULL, receiver, &msqid) != 0) {
        perror("pthread_create failed");
        exit(1);
    }

    // 等待线程结束
    pthread_join(sender_thread, NULL);
    pthread_join(receiver_thread, NULL);

    // 清理消息队列
    if (msgctl(msqid, IPC_RMID, NULL) == -1) {
        perror("msgctl failed");
        exit(1);
    }

    return 0;
}

判断消息队列是否存在的代码

msgget() 函数中,将 IPC_EXCLIPC_CREAT 标志位一起使用:

2int msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
key: 消息队列的键值。
IPC_CREAT: 如果消息队列不存在,则创建新的消息队列。
IPC_EXCL: 如果消息队列已存在,则返回错误,并设置 errnoEEXIST
0666: 设置消息队列的访问权限。

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

int main() {
    int msqid;
    key_t key = 12345;

    // 第一次创建消息队列,成功
    msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
    if (msqid == -1) {
        perror("msgget failed");
        exit(1);
    }
    printf("Message queue created successfully.\n");

    // 第二次创建消息队列,失败,因为消息队列已经存在
    msqid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
    if (msqid == -1) {
        if (errno == EEXIST) {
            printf("Message queue already exists.\n");
        } else {
            perror("msgget failed");
            exit(1);
        }
    }

    return 0;
}

解释:

  • 第一次调用 msgget() 时,使用 IPC_CREAT | IPC_EXCL 标志位,成功创建消息队列。
  • 第二次调用 msgget() 时,使用相同的键值,由于消息队列已存在,IPC_EXCL 标志位会使 msgget() 返回错误,errno 为 EEXIST。

使用场景:

  • 当需要确保创建的消息队列是新的,并且不允许重复创建时,使用 IPC_EXCL 标志位。
  • 在多进程或多线程环境中,使用 IPC_EXCL 可以防止多个进程或线程同时创建相同的消息队列。

获取队列空间大小

步骤:

  1. 获取消息队列控制结构体:
    使用 msgctl() 函数,并设置 cmd 为 IPC_STAT,将消息队列控制结构体指针作为第三个参数传入。
    在这里插入图片描述

  2. 获取 msg_qbytes 字段:
    msqid_ds 结构体中包含消息队列的各种属性,包括 msg_qbytes 字段,它表示消息队列的最大容量(字节)。

  3. 计算队列大小:
    msg_qbytes 代表的是消息队列的总容量如果需要计算可以容纳的消息数量,需要考虑每个消息的大小
    int max_messages = msqid_ds.msg_qbytes / sizeof(struct msgbuf); // 假设 struct msgbuf是你的消息结构体

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    int msqid;
    key_t key = 12345;

    // 创建消息队列
    msqid = msgget(key, IPC_CREAT | 0666);
    if (msqid == -1) {
        perror("msgget failed");
        exit(1);
    }

    // 获取消息队列控制结构体
    struct msqid_ds msqid_ds;
    if (msgctl(msqid, IPC_STAT, &msqid_ds) == -1) {
        perror("msgctl failed");
        exit(1);
    }

    // 打印消息队列最大容量
    printf("Message queue size: %ld bytes\n", msqid_ds.msg_qbytes);

    // 计算可以容纳的消息数量(假设消息结构体大小为 100 字节)
    int max_messages = msqid_ds.msg_qbytes / 100;
    printf("Maximum number of messages: %d\n", max_messages);

    // 删除消息队列
    if (msgctl(msqid, IPC_RMID, NULL) == -1) {
        perror("msgctl failed");
        exit(1);
    }

    return 0;
}

在这里插入图片描述


http://www.kler.cn/a/517658.html

相关文章:

  • 9.business english-agreement
  • ThinkPhp伪静态设置后,访问静态资源也提示找不到Controller
  • 0164__【GNU】gcc -O编译选项 -Og -O0 -O1 -O2 -O3 -Os
  • impact 影响分析学习笔记(一)
  • 小利特惠源码/生活缴费/电话费/油卡燃气/等充值业务类源码附带承兑系统
  • 【C++高并发服务器WebServer】-2:exec函数簇、进程控制
  • 团体程序设计天梯赛-练习集——L1-016 查验身份证
  • java —— 面向对象(上)
  • [Dialog屏幕开发] 屏幕绘制(Table Control控件)
  • 为什么IDEA提示不推荐@Autowired❓️如果使用@Resource呢❓️
  • K8S中ingress详解
  • 数据结构测试题1
  • DeepSeek-R1:将强化学习用于激励大型语言模型的推理能力
  • 设计模式:春招面试的关键知识储备
  • ubunut22.04安装docker(基于阿里云 Docker 镜像源安装 Docker)
  • mapbox加载geojson,鼠标移入改变颜色,设置样式以及vue中的使用
  • web前端8--浮动
  • Python面向对象编程:精雕细琢对象的“名片”——重写 `__str__()` 和 `__repr__()` 方法
  • 【函数题】6-1 单链表逆转
  • 三高“高性能、高并发、高可靠”系统架构设计系列文章
  • 计算机视觉之三维重建-单视几何
  • jenkins-通过api获取所有job及最新build信息
  • hedfs和hive数据迁移后校验脚本
  • Rust 中的引用与借用:深入理解所有权与数据安全
  • 多模态数据融合的基本流程与关键环节
  • elementPlus-button组件二次封装