当前位置: 首页 > article >正文

MQTT(Message Queuing Telemetry Transport)协议(三)

主题是什么

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2. TCP 协议封装
tcp.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>

// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {
    int sockfd;
    struct sockaddr_in server_addr;

    // 创建套接字
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket creation failed");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(server_port);

    // 将 IP 地址从点分十进制转换为二进制形式
    if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
        perror("invalid address/ address not supported");
        close(sockfd);
        return -1;
    }

    // 连接服务器
    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("connection failed");
        close(sockfd);
        return -1;
    }

    return sockfd;
}

// 发送数据
int tcp_send(int sockfd, const char *data, int length) {
    return send(sockfd, data, length, 0);
}

// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {
    return recv(sockfd, buffer, buffer_size, 0);
}

// 关闭 TCP 连接
void tcp_close(int sockfd) {
    close(sockfd);
}

tcp.h

#ifndef TCP_H
#define TCP_H

int tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);

#endif
  1. MQTT 协议封装

在这里插入图片描述
mqtt.h

#ifndef MQTT_H
#define MQTT_H

#include <stdint.h>

// 定义主题列表节点结构体
typedef struct TopicNode {
    char *topic;
    struct TopicNode *next;
} TopicNode;

// 定义主题列表结构体
typedef struct TopicList {
    TopicNode *head;
    TopicNode *tail;
    int count;
} TopicList;

int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);

#endif

mqtt.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"

// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14

// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {
    char buffer[100];
    int index = 0;

    // 固定报头
    buffer[index++] = MQTT_CONNECT << 4;

    // 剩余长度
    int remaining_length = 12 + strlen(client_id);
    do {
        unsigned char digit = remaining_length % 128;
        remaining_length /= 128;
        if (remaining_length > 0) {
            digit |= 0x80;
        }
        buffer[index++] = digit;
    } while (remaining_length > 0);

    // 可变报头
    // 协议名
    buffer[index++] = 0;
    buffer[index++] = 4;
    memcpy(buffer + index, "MQTT", 4);
    index += 4;
    // 协议级别
    buffer[index++] = 4;
    // 连接标志
    buffer[index++] = 0;
    // 保持活动时间
    buffer[index++] = 0;
    buffer[index++] = 60;

    // 有效载荷
    // 客户端 ID
    buffer[index++] = 0;
    buffer[index++] = strlen(client_id);
    memcpy(buffer + index, client_id, strlen(client_id));
    index += strlen(client_id);

    // 发送 CONNECT 消息
    return tcp_send(sockfd, buffer, index);
}

// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {
    char buffer[10];
    int len = tcp_receive(sockfd, buffer, sizeof(buffer));
    if (len < 4) {
        return -1;
    }
    if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {
        return -1;
    }
    return buffer[3];
}

// 初始化主题列表
void init_topic_list(TopicList *list) {
    list->head = NULL;
    list->tail = NULL;
    list->count = 0;
}

// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {
    TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));
    if (new_node == NULL) {
        perror("Memory allocation failed");
        return;
    }
    new_node->topic = strdup(topic);
    new_node->next = NULL;

    if (list->head == NULL) {
        list->head = new_node;
        list->tail = new_node;
    } else {
        list->tail->next = new_node;
        list->tail = new_node;
    }
    list->count++;
}

// 释放主题列表内存
void free_topic_list(TopicList *list) {
    TopicNode *current = list->head;
    TopicNode *next;
    while (current != NULL) {
        next = current->next;
        free(current->topic);
        free(current);
        current = next;
    }
    list->head = NULL;
    list->tail = NULL;
    list->count = 0;
}

// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {
    char buffer[200];
    int index = 0;

    // 固定报头
    buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;

    // 计算剩余长度
    int remaining_length = 2; // 报文标识符长度
    TopicNode *current = topics->head;
    while (current != NULL) {
        remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoS
        current = current->next;
    }

    // 编码剩余长度
    do {
        unsigned char digit = remaining_length % 128;
        remaining_length /= 128;
        if (remaining_length > 0) {
            digit |= 0x80;
        }
        buffer[index++] = digit;
    } while (remaining_length > 0);

    // 可变报头
    // 报文标识符
    buffer[index++] = 0;
    buffer[index++] = 1;

    // 有效载荷,遍历主题列表
    current = topics->head;
    while (current != NULL) {
        // 主题过滤器
        buffer[index++] = 0;
        buffer[index++] = strlen(current->topic);
        memcpy(buffer + index, current->topic, strlen(current->topic));
        index += strlen(current->topic);
        // QoS
        buffer[index++] = 0;
        current = current->next;
    }

    // 发送 SUBSCRIBE 消息
    return tcp_send(sockfd, buffer, index);
}

// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {
    char buffer[200];
    int index = 0;

    // 固定报头
    buffer[index++] = MQTT_PUBLISH << 4;

    // 剩余长度
    int remaining_length = 2 + strlen(topic) + strlen(message);
    do {
        unsigned char digit = remaining_length % 128;
        remaining_length /= 128;
        if (remaining_length > 0) {
            digit |= 0x80;
        }
        buffer[index++] = digit;
    } while (remaining_length > 0);

    // 可变报头
    // 主题名
    buffer[index++] = 0;
    buffer[index++] = strlen(topic);
    memcpy(buffer + index, topic, strlen(topic));
    index += strlen(topic);

    // 有效载荷
    memcpy(buffer + index, message, strlen(message));
    index += strlen(message);

    // 发送 PUBLISH 消息
    return tcp_send(sockfd, buffer, index);
}
  1. main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"

#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"

int main() {
    int sockfd;
    TopicList topics;

    // 建立 TCP 连接
    sockfd = tcp_connect(SERVER_IP, SERVER_PORT);
    if (sockfd == -1) {
        return -1;
    }

    // 发送 MQTT CONNECT 消息
    if (mqtt_connect(sockfd, CLIENT_ID) == -1) {
        tcp_close(sockfd);
        return -1;
    }

    // 解析 MQTT CONNACK 消息
    int connack_result = mqtt_parse_connack(sockfd);
    if (connack_result != 0) {
        printf("Connection failed with result code %d\n", connack_result);
        tcp_close(sockfd);
        return -1;
    }
    printf("Connected to MQTT server\n");

    // 初始化主题列表
    init_topic_list(&topics);
    // 添加主题
    add_topic(&topics, TOPIC1);
    add_topic(&topics, TOPIC2);

    // 订阅多个主题
    if (mqtt_subscribe_topics(sockfd, &topics) == -1) {
        tcp_close(sockfd);
        free_topic_list(&topics);
        return -1;
    }
    printf("Subscribed to multiple topics\n");

    // 发布消息
    if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {
        tcp_close(sockfd);
        free_topic_list(&topics);
        return -1;
    }
    printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);

    // 释放主题列表内存
    free_topic_list(&topics);
    // 关闭连接
    tcp_close(sockfd);

    return 0;
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {
    int sockfd;
    struct sockaddr_in server_addr;
    struct timeval timeout;

    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket creation failed");
        return -1;
    }

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(server_port);

    if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
        perror("invalid address/ address not supported");
        close(sockfd);
        return -1;
    }

    // 设置连接超时时间
    timeout.tv_sec = 5;
    timeout.tv_usec = 0;
    setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));

    if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        if (errno == ETIMEDOUT) {
            printf("Connection timed out\n");
        } else {
            perror("connection failed");
        }
        close(sockfd);
        return -1;
    }

    return sockfd;
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


http://www.kler.cn/a/547218.html

相关文章:

  • 从VGG到Transformer:深度神经网络层级演进对模型性能的深度解析与技术实践指南
  • 使用PHP爬虫获取1688商品分类:实战案例指南
  • Create Deploy Your Website Quickly - Docusaurus GitHub Pages
  • 力扣100. 相同的树(利用分解思想解决)
  • BPMN.js 与 DeepSeek 集成:打造个性化 Web 培训项目的秘诀
  • (一)获取数据和读取数据
  • [AI.认知]李飞飞团队“50美元”训练出s1-32B,以及研究成果《s1:Simple test- time scaling》,背后的本质?
  • 软考高级《系统架构设计师》知识点(四)
  • 侯捷 C++ 课程学习笔记:C++ 新标准 11/14 的革新与实战应用
  • 2025年02月13日Github流行趋势
  • 【Pico】使用Pico进行无线串流搜索不到电脑
  • Android Studio: RxJava如何取消订阅
  • 工业物联网远程监控系统优化方案,基于巨控GRM553Y-CHE
  • 正则化(Regularization)和正则表达式(Regular Expression)区别
  • C#02项目——Checked用法
  • 算法基础之排序算法大总结1!!
  • 自定义基座实时采集uniapp日志
  • Langchain对管道操作符|的重构实现链式流程
  • 一文讲清前端热更新
  • 【NLP 21、实践 ③ 全切分函数切分句子】