MQTT(Message Queuing Telemetry Transport)协议(一)
MQTT(Message Queuing Telemetry Transport)协议是一种基于发布 / 订阅模式的轻量级物联网消息传输协议,它通常封装在 TCP/IP 协议之上 ,也可以基于其他可靠的流传输协议,以下为你详细介绍:
基于 TCP/IP 协议
原因:TCP/IP 协议提供了可靠的字节流传输服务,保证数据的无差错、按顺序交付。MQTT 作为一种需要确保消息可靠传输的应用层协议,选择 TCP/IP 作为底层支撑,能够满足其对数据传输可靠性的要求。例如,在物联网设备向服务器上报关键状态信息时,必须保证信息准确无误地到达。
工作方式:MQTT 客户端通过 TCP 连接与 MQTT 服务器建立通信。在建立连接时,客户端会向服务器发送连接请求报文,其中包含客户端 ID、连接标志、保持连接时间等信息。服务器收到连接请求后,会进行相应的验证和处理,如果连接成功,双方就可以在这个 TCP 连接上进行 MQTT 报文的收发。例如,在智能家居场景中,智能灯具作为 MQTT 客户端,通过 TCP 连接到家庭网关中的 MQTT 服务器,上报灯具的开关状态或接收调光指令。
基于 TLS/SSL 协议
原因:TLS(Transport Layer Security)及其前身 SSL(Secure Sockets Layer)协议用于在网络通信中提供加密和身份验证功能。MQTT 在传输一些敏感数据(如设备控制指令、用户隐私信息等)时,为防止数据被窃取或篡改,需要 TLS/SSL 提供的安全保障。
工作方式:MQTT 客户端和服务器在基于 TCP 建立连接后,可以进一步协商启用 TLS/SSL 加密。客户端和服务器需要各自拥有数字证书(用于身份验证和密钥交换)。在握手过程中,双方会验证对方的证书,协商加密算法和密钥,之后的数据传输都会在加密的通道中进行。例如,在工业物联网中,对生产设备的远程监控和控制数据通过 MQTT 传输时,利用 TLS/SSL 加密确保生产数据的安全性和隐私性。
其他可选的底层协议:理论上,只要能提供稳定的数据流传输,MQTT 也可以基于其他协议实现。例如,在一些资源受限且对实时性要求极高的场景下,可能会考虑基于 UDP(User Datagram Protocol)进行定制化封装。但 UDP 本身是不可靠传输协议,需要在 MQTT 上层增加额外的机制来保证消息的可靠性 。
选择合适的底层协议来封装 MQTT 需要综合考虑多个因素,以下从不同维度为你分析并给出选择建议:
可靠性需求
要求高可靠性场景
若业务场景不能容忍消息丢失或错误,如工业自动化中的设备控制指令传输、金融交易数据的上报等,应选择 TCP 或基于 TCP 的 TLS/SSL 协议。TCP 提供可靠的字节流服务,通过确认机制、重传机制和滑动窗口协议确保数据无差错、按顺序交付。而 TLS/SSL 在 TCP 的基础上增加了加密和身份验证功能,能进一步保障数据的安全性和完整性。
例如,在智能电网中,电表数据的实时采集和传输对可靠性要求极高,使用基于 TLS/SSL 的 MQTT 可以确保电力数据准确无误且安全地传输到电力管理系统。
可容忍一定丢包场景
对于一些对实时性要求高但可以容忍一定程度丢包的场景,如视频监控中的实时状态信息推送、体育赛事的实时比分更新等,可以考虑基于 UDP 定制封装 MQTT。虽然 UDP 是不可靠传输协议,但通过在应用层增加简单的确认和重传机制,可以在一定程度上弥补其不足,同时利用 UDP 的低延迟特性满足实时性需求。
安全性需求
对安全性要求低场景
如果数据本身不涉及敏感信息,且网络环境相对安全,如一些内部局域网中的简单设备状态监控(如办公室内的空调温度监控),可以直接使用 TCP 作为底层协议。这样可以减少 TLS/SSL 握手过程带来的开销,提高通信效率。
对安全性要求高场景
当传输的数据包含敏感信息,如用户的个人身份信息、设备的控制密钥等,或者网络环境存在安全风险(如通过公共网络进行通信),则必须使用基于 TLS/SSL 的 TCP 协议。TLS/SSL 可以对数据进行加密,防止数据在传输过程中被窃取或篡改,同时通过身份验证机制确保通信双方的身份真实性。例如,在远程医疗设备与医疗信息系统之间的通信中,使用 TLS/SSL 加密的 MQTT 可以保护患者的隐私数据。
网络环境和资源限制
网络带宽充足、设备资源丰富场景
在网络带宽较高且设备计算能力和存储资源充足的情况下,如企业级数据中心内部的设备通信、高速局域网环境中的智能家居系统,优先选择 TCP 或 TLS/SSL 协议。虽然 TLS/SSL 会增加一定的计算和带宽开销,但对于资源丰富的设备和网络来说,这些开销是可以接受的,并且能提供更好的安全性和可靠性。
网络带宽有限、设备资源受限场景
对于一些物联网设备,如传感器节点、智能水表等,它们通常具有有限的计算能力、存储容量和网络带宽。在这种情况下,需要尽量减少通信开销。可以考虑直接使用 TCP 协议,如果对安全性有一定要求但设备资源无法支持完整的 TLS/SSL 握手过程,可以采用轻量级的加密算法或预共享密钥的方式来实现部分安全功能。另外,在一些特殊场景下,如低功耗广域网(LPWAN)环境中,由于网络带宽极低且对功耗要求严格,可能需要对 MQTT 协议进行裁剪和优化,并选择合适的底层传输协议。
应用场景的实时性要求
实时性要求高场景
对于实时性要求极高的应用场景,如工业自动化中的实时控制、车联网中的车辆实时定位和状态更新等,需要选择低延迟的底层协议。在这种情况下,基于 UDP 的定制化 MQTT 实现可能更合适,因为 UDP 不需要建立连接和维护复杂的状态信息,数据传输延迟较低。但需要注意的是,要在应用层实现必要的可靠性机制。
实时性要求不高场景
如果应用场景对实时性要求不高,如设备的定期数据上报、日志信息的传输等,TCP 或 TLS/SSL 协议是比较好的选择。这些协议虽然在建立连接和数据传输过程中会有一定的延迟,但可以保证数据的可靠传输,适合对实时性要求相对较低的应用。
如何在基于UDP的MQTT中实现简单的确认和重传机制?
在基于 UDP 的 MQTT 中实现简单的确认和重传机制可以从消息编号、确认消息、重传定时器和重传次数限制几个方面入手,以下为你详细介绍实现步骤,并给出示例代码。
实现步骤
消息编号:为每个发出的 MQTT 消息分配一个唯一的编号。接收方在收到消息后,需要记录已接收消息的编号。
确认消息:接收方在收到消息后,立即向发送方发送一个确认消息(ACK),其中包含已成功接收消息的编号。
重传定时器:发送方在发送每条消息时启动一个重传定时器。如果在定时器超时之前没有收到对应的确认消息,则重传该消息。
重传次数限制:为了避免无限重传,设置一个最大重传次数。当重传次数达到上限时,停止重传并进行错误处理。
以下是一个用 C 语言实现基于 UDP 的 MQTT 简单确认和重传机制的示例代码。该代码分为发送端和接收端两部分:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <time.h>
#define SERVER_PORT 10000
#define MAX_RETRIES 3
#define TIMEOUT_SECONDS 1
// 发送方函数
void sender() {
int sockfd;
struct sockaddr_in server_addr;
char messages[][20] = {"Message 1", "Message 2", "Message 3"};
int num_messages = sizeof(messages) / sizeof(messages[0]);
// 创建 UDP 套接字
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
for (int msg_id = 0; msg_id < num_messages; msg_id++) {
int retries = 0;
char send_buffer[50];
// 格式化要发送的消息,格式为 "消息编号:消息内容"
snprintf(send_buffer, sizeof(send_buffer), "%d:%s", msg_id, messages[msg_id]);
while (retries < MAX_RETRIES) {
// 发送消息
if (sendto(sockfd, send_buffer, strlen(send_buffer), 0, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("sendto failed");
continue;
}
// 设置超时时间
struct timeval timeout;
timeout.tv_sec = TIMEOUT_SECONDS;
timeout.tv_usec = 0;
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
perror("setsockopt failed");
continue;
}
char recv_buffer[10];
socklen_t len = sizeof(server_addr);
ssize_t n = recvfrom(sockfd, recv_buffer, sizeof(recv_buffer), 0, (struct sockaddr *)&server_addr, &len);
if (n > 0) {
int ack_id;
sscanf(recv_buffer, "%d", &ack_id);
if (ack_id == msg_id) {
printf("Message %d sent successfully\n", msg_id);
break;
}
} else {
// 超时重传
printf("Timeout for message %d, retrying...\n", msg_id);
retries++;
}
}
if (retries == MAX_RETRIES) {
printf("Failed to send message %d after %d retries\n", msg_id, MAX_RETRIES);
}
}
close(sockfd);
}
// 接收方函数
void receiver() {
int sockfd;
struct sockaddr_in server_addr, client_addr;
socklen_t len = sizeof(client_addr);
// 创建 UDP 套接字
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
// 绑定地址和端口
if (bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
char buffer[50];
while (1) {
// 接收消息
ssize_t n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &len);
if (n > 0) {
int msg_id;
char message[20];
sscanf(buffer, "%d:%19s", &msg_id, message);
printf("Received message %d: %s\n", msg_id, message);
// 发送确认消息
char ack_buffer[10];
snprintf(ack_buffer, sizeof(ack_buffer), "%d", msg_id);
if (sendto(sockfd, ack_buffer, strlen(ack_buffer), 0, (const struct sockaddr *)&client_addr, len) < 0) {
perror("sendto (ACK) failed");
}
}
}
close(sockfd);
}
int main() {
// 创建接收方进程
pid_t pid = fork();
if (pid < 0) {
perror("fork failed");
return EXIT_FAILURE;
} else if (pid == 0) {
// 子进程作为接收方
receiver();
} else {
// 父进程等待一段时间后作为发送方
sleep(1);
sender();
}
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <time.h>
#include <pthread.h>
#include <errno.h>
#define SERVER_PORT 10000
#define MAX_RETRIES 3
#define INITIAL_TIMEOUT_SECONDS 1
#define MAX_MESSAGE_LENGTH 256
// 消息结构体
typedef struct {
int msg_id;
char message[MAX_MESSAGE_LENGTH];
int retries;
time_t last_send_time;
int timeout;
} Message;
// 环形缓冲区结构体
typedef struct {
Message *messages;
int head;
int tail;
int size;
int capacity;
} CircularBuffer;
// 初始化环形缓冲区
void init_circular_buffer(CircularBuffer *buffer, int capacity) {
buffer->messages = (Message *)malloc(capacity * sizeof(Message));
buffer->head = 0;
buffer->tail = 0;
buffer->size = 0;
buffer->capacity = capacity;
}
// 向环形缓冲区添加消息
int add_message(CircularBuffer *buffer, int msg_id, const char *message) {
if (buffer->size == buffer->capacity) {
return -1; // 缓冲区已满
}
Message *msg = &buffer->messages[buffer->tail];
msg->msg_id = msg_id;
strncpy(msg->message, message, MAX_MESSAGE_LENGTH - 1);
msg->message[MAX_MESSAGE_LENGTH - 1] = '\0';
msg->retries = 0;
msg->last_send_time = time(NULL);
msg->timeout = INITIAL_TIMEOUT_SECONDS;
buffer->tail = (buffer->tail + 1) % buffer->capacity;
buffer->size++;
return 0;
}
// 从环形缓冲区移除消息
int remove_message(CircularBuffer *buffer, int msg_id) {
int index = buffer->head;
for (int i = 0; i < buffer->size; i++) {
if (buffer->messages[index].msg_id == msg_id) {
// 移除消息
for (int j = index; j < buffer->size - 1; j++) {
buffer->messages[j] = buffer->messages[j + 1];
}
buffer->tail = (buffer->tail - 1 + buffer->capacity) % buffer->capacity;
buffer->size--;
return 0;
}
index = (index + 1) % buffer->capacity;
}
return -1; // 未找到消息
}
// 发送消息
void send_message(int sockfd, struct sockaddr_in *server_addr, Message *msg) {
char send_buffer[MAX_MESSAGE_LENGTH + 10];
snprintf(send_buffer, sizeof(send_buffer), "%d:%s", msg->msg_id, msg->message);
if (sendto(sockfd, send_buffer, strlen(send_buffer), 0, (const struct sockaddr *)server_addr, sizeof(*server_addr)) < 0) {
perror("sendto failed");
}
msg->last_send_time = time(NULL);
msg->retries++;
// 动态调整超时时间
msg->timeout = msg->timeout * 2;
}
// 发送线程函数
void *sender_thread(void *arg) {
int sockfd = *(int *)arg;
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = INADDR_ANY;
CircularBuffer buffer;
init_circular_buffer(&buffer, 10);
char messages[][20] = {"Message 1", "Message 2", "Message 3"};
int num_messages = sizeof(messages) / sizeof(messages[0]);
for (int msg_id = 0; msg_id < num_messages; msg_id++) {
add_message(&buffer, msg_id, messages[msg_id]);
send_message(sockfd, &server_addr, &buffer.messages[buffer.head]);
}
while (buffer.size > 0) {
time_t current_time = time(NULL);
int index = buffer.head;
for (int i = 0; i < buffer.size; i++) {
Message *msg = &buffer.messages[index];
if (current_time - msg->last_send_time >= msg->timeout) {
if (msg->retries < MAX_RETRIES) {
send_message(sockfd, &server_addr, msg);
} else {
printf("Failed to send message %d after %d retries\n", msg->msg_id, MAX_RETRIES);
remove_message(&buffer, msg->msg_id);
}
}
index = (index + 1) % buffer.capacity;
}
sleep(1);
}
free(buffer.messages);
return NULL;
}
// 接收线程函数
void *receiver_thread(void *arg) {
int sockfd = *(int *)arg;
struct sockaddr_in client_addr;
socklen_t len = sizeof(client_addr);
char buffer[MAX_MESSAGE_LENGTH + 10];
while (1) {
ssize_t n = recvfrom(sockfd, buffer, sizeof(buffer), 0, (struct sockaddr *)&client_addr, &len);
if (n > 0) {
int msg_id;
char message[MAX_MESSAGE_LENGTH];
sscanf(buffer, "%d:%19s", &msg_id, message);
printf("Received message %d: %s\n", msg_id, message);
// 发送确认消息
char ack_buffer[10];
snprintf(ack_buffer, sizeof(ack_buffer), "%d", msg_id);
if (sendto(sockfd, ack_buffer, strlen(ack_buffer), 0, (const struct sockaddr *)&client_addr, len) < 0) {
perror("sendto (ACK) failed");
}
}
}
return NULL;
}
int main() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
perror("socket creation failed");
return EXIT_FAILURE;
}
pthread_t sender_tid, receiver_tid;
if (pthread_create(&sender_tid, NULL, sender_thread, &sockfd) != 0) {
perror("pthread_create (sender) failed");
close(sockfd);
return EXIT_FAILURE;
}
if (pthread_create(&receiver_tid, NULL, receiver_thread, &sockfd) != 0) {
perror("pthread_create (receiver) failed");
close(sockfd);
return EXIT_FAILURE;
}
pthread_join(sender_tid, NULL);
// 此处可以添加逻辑来终止接收线程
close(sockfd);
return 0;
}