MQTT(Message Queuing Telemetry Transport)协议(三)
主题是什么
2. TCP 协议封装
tcp.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
// 建立 TCP 连接
int tcp_connect(const char *server_ip, int server_port) {
int sockfd;
struct sockaddr_in server_addr;
// 创建套接字
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket creation failed");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
// 将 IP 地址从点分十进制转换为二进制形式
if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
perror("invalid address/ address not supported");
close(sockfd);
return -1;
}
// 连接服务器
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
perror("connection failed");
close(sockfd);
return -1;
}
return sockfd;
}
// 发送数据
int tcp_send(int sockfd, const char *data, int length) {
return send(sockfd, data, length, 0);
}
// 接收数据
int tcp_receive(int sockfd, char *buffer, int buffer_size) {
return recv(sockfd, buffer, buffer_size, 0);
}
// 关闭 TCP 连接
void tcp_close(int sockfd) {
close(sockfd);
}
tcp.h
#ifndef TCP_H
#define TCP_H
int tcp_connect(const char *server_ip, int server_port);
int tcp_send(int sockfd, const char *data, int length);
int tcp_receive(int sockfd, char *buffer, int buffer_size);
void tcp_close(int sockfd);
#endif
- MQTT 协议封装
mqtt.h
#ifndef MQTT_H
#define MQTT_H
#include <stdint.h>
// 定义主题列表节点结构体
typedef struct TopicNode {
char *topic;
struct TopicNode *next;
} TopicNode;
// 定义主题列表结构体
typedef struct TopicList {
TopicNode *head;
TopicNode *tail;
int count;
} TopicList;
int mqtt_connect(int sockfd, const char *client_id);
int mqtt_parse_connack(int sockfd);
int mqtt_subscribe_topics(int sockfd, TopicList *topics);
int mqtt_publish(int sockfd, const char *topic, const char *message);
void init_topic_list(TopicList *list);
void add_topic(TopicList *list, const char *topic);
void free_topic_list(TopicList *list);
#endif
mqtt.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"
// MQTT 固定报头类型
#define MQTT_CONNECT 1
#define MQTT_CONNACK 2
#define MQTT_PUBLISH 3
#define MQTT_PUBACK 4
#define MQTT_PUBREC 5
#define MQTT_PUBREL 6
#define MQTT_PUBCOMP 7
#define MQTT_SUBSCRIBE 8
#define MQTT_SUBACK 9
#define MQTT_UNSUBSCRIBE 10
#define MQTT_UNSUBACK 11
#define MQTT_PINGREQ 12
#define MQTT_PINGRESP 13
#define MQTT_DISCONNECT 14
// 生成 MQTT CONNECT 消息
int mqtt_connect(int sockfd, const char *client_id) {
char buffer[100];
int index = 0;
// 固定报头
buffer[index++] = MQTT_CONNECT << 4;
// 剩余长度
int remaining_length = 12 + strlen(client_id);
do {
unsigned char digit = remaining_length % 128;
remaining_length /= 128;
if (remaining_length > 0) {
digit |= 0x80;
}
buffer[index++] = digit;
} while (remaining_length > 0);
// 可变报头
// 协议名
buffer[index++] = 0;
buffer[index++] = 4;
memcpy(buffer + index, "MQTT", 4);
index += 4;
// 协议级别
buffer[index++] = 4;
// 连接标志
buffer[index++] = 0;
// 保持活动时间
buffer[index++] = 0;
buffer[index++] = 60;
// 有效载荷
// 客户端 ID
buffer[index++] = 0;
buffer[index++] = strlen(client_id);
memcpy(buffer + index, client_id, strlen(client_id));
index += strlen(client_id);
// 发送 CONNECT 消息
return tcp_send(sockfd, buffer, index);
}
// 解析 MQTT CONNACK 消息
int mqtt_parse_connack(int sockfd) {
char buffer[10];
int len = tcp_receive(sockfd, buffer, sizeof(buffer));
if (len < 4) {
return -1;
}
if ((buffer[0] & 0xF0) != (MQTT_CONNACK << 4)) {
return -1;
}
return buffer[3];
}
// 初始化主题列表
void init_topic_list(TopicList *list) {
list->head = NULL;
list->tail = NULL;
list->count = 0;
}
// 添加主题到列表
void add_topic(TopicList *list, const char *topic) {
TopicNode *new_node = (TopicNode *)malloc(sizeof(TopicNode));
if (new_node == NULL) {
perror("Memory allocation failed");
return;
}
new_node->topic = strdup(topic);
new_node->next = NULL;
if (list->head == NULL) {
list->head = new_node;
list->tail = new_node;
} else {
list->tail->next = new_node;
list->tail = new_node;
}
list->count++;
}
// 释放主题列表内存
void free_topic_list(TopicList *list) {
TopicNode *current = list->head;
TopicNode *next;
while (current != NULL) {
next = current->next;
free(current->topic);
free(current);
current = next;
}
list->head = NULL;
list->tail = NULL;
list->count = 0;
}
// 生成 MQTT SUBSCRIBE 消息,支持多主题订阅
int mqtt_subscribe_topics(int sockfd, TopicList *topics) {
char buffer[200];
int index = 0;
// 固定报头
buffer[index++] = MQTT_SUBSCRIBE << 4 | 0x02;
// 计算剩余长度
int remaining_length = 2; // 报文标识符长度
TopicNode *current = topics->head;
while (current != NULL) {
remaining_length += 2 + strlen(current->topic) + 1; // 主题长度 + 主题名 + QoS
current = current->next;
}
// 编码剩余长度
do {
unsigned char digit = remaining_length % 128;
remaining_length /= 128;
if (remaining_length > 0) {
digit |= 0x80;
}
buffer[index++] = digit;
} while (remaining_length > 0);
// 可变报头
// 报文标识符
buffer[index++] = 0;
buffer[index++] = 1;
// 有效载荷,遍历主题列表
current = topics->head;
while (current != NULL) {
// 主题过滤器
buffer[index++] = 0;
buffer[index++] = strlen(current->topic);
memcpy(buffer + index, current->topic, strlen(current->topic));
index += strlen(current->topic);
// QoS
buffer[index++] = 0;
current = current->next;
}
// 发送 SUBSCRIBE 消息
return tcp_send(sockfd, buffer, index);
}
// 生成 MQTT PUBLISH 消息
int mqtt_publish(int sockfd, const char *topic, const char *message) {
char buffer[200];
int index = 0;
// 固定报头
buffer[index++] = MQTT_PUBLISH << 4;
// 剩余长度
int remaining_length = 2 + strlen(topic) + strlen(message);
do {
unsigned char digit = remaining_length % 128;
remaining_length /= 128;
if (remaining_length > 0) {
digit |= 0x80;
}
buffer[index++] = digit;
} while (remaining_length > 0);
// 可变报头
// 主题名
buffer[index++] = 0;
buffer[index++] = strlen(topic);
memcpy(buffer + index, topic, strlen(topic));
index += strlen(topic);
// 有效载荷
memcpy(buffer + index, message, strlen(message));
index += strlen(message);
// 发送 PUBLISH 消息
return tcp_send(sockfd, buffer, index);
}
- main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "tcp.h"
#include "mqtt.h"
#define SERVER_IP "your_mqtt_server_ip"
#define SERVER_PORT 1883
#define CLIENT_ID "MyClientID"
#define TOPIC1 "home/livingroom/temperature"
#define TOPIC2 "home/bedroom/humidity"
#define MESSAGE "Sensor data"
int main() {
int sockfd;
TopicList topics;
// 建立 TCP 连接
sockfd = tcp_connect(SERVER_IP, SERVER_PORT);
if (sockfd == -1) {
return -1;
}
// 发送 MQTT CONNECT 消息
if (mqtt_connect(sockfd, CLIENT_ID) == -1) {
tcp_close(sockfd);
return -1;
}
// 解析 MQTT CONNACK 消息
int connack_result = mqtt_parse_connack(sockfd);
if (connack_result != 0) {
printf("Connection failed with result code %d\n", connack_result);
tcp_close(sockfd);
return -1;
}
printf("Connected to MQTT server\n");
// 初始化主题列表
init_topic_list(&topics);
// 添加主题
add_topic(&topics, TOPIC1);
add_topic(&topics, TOPIC2);
// 订阅多个主题
if (mqtt_subscribe_topics(sockfd, &topics) == -1) {
tcp_close(sockfd);
free_topic_list(&topics);
return -1;
}
printf("Subscribed to multiple topics\n");
// 发布消息
if (mqtt_publish(sockfd, TOPIC1, MESSAGE) == -1) {
tcp_close(sockfd);
free_topic_list(&topics);
return -1;
}
printf("Published message: %s on topic: %s\n", MESSAGE, TOPIC1);
// 释放主题列表内存
free_topic_list(&topics);
// 关闭连接
tcp_close(sockfd);
return 0;
}
// 修改 tcp_connect 函数,添加超时处理
int tcp_connect(const char *server_ip, int server_port) {
int sockfd;
struct sockaddr_in server_addr;
struct timeval timeout;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == -1) {
perror("socket creation failed");
return -1;
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(server_port);
if (inet_pton(AF_INET, server_ip, &server_addr.sin_addr) <= 0) {
perror("invalid address/ address not supported");
close(sockfd);
return -1;
}
// 设置连接超时时间
timeout.tv_sec = 5;
timeout.tv_usec = 0;
setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (char *)&timeout, sizeof(timeout));
if (connect(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
if (errno == ETIMEDOUT) {
printf("Connection timed out\n");
} else {
perror("connection failed");
}
close(sockfd);
return -1;
}
return sockfd;
}