MQTT框架和使用
目录
MQTT框架
1. MQTT概述
1.1 形象地理解三个角色
1.2 消息的传递
2. 在Windows上体验MQTT
2.1 安装APP
2.2 启动服务器
2.3 使用MQTTX
2.3.1 建立连接
2.3.2 订阅主题
2.3.3 发布主题
2.4 使用mosquitto
2.4.1 发布消息
2.4.2 订阅消息
3. kawaii-mqtt源码分析
3.1 使用
3.2 kawaii-mqtt内部实现
3.2.1 主要代码
3.2.2 处理函数记录在链表里
3.2.3 流程图
MQTT框架
参考资料:
-
kawaii-mqtt源码:
-
作者发布源码:GitHub - jiejieTop/mqttclient: A high-performance, high-stability, cross-platform MQTT client, developed based on the socket API, can be used on embedded devices (FreeRTOS / LiteOS / RT-Thread / TencentOS tiny), Linux, Windows, Mac, with a very concise The API interface realizes the quality of service of QOS2 with very few resources, and seamlessly connects the mbedtls encryption library.
-
大牛维护的:GitHub - longtengmcu/kawaii-mqtt: 基于socket API的MQTT客户端,以极少的资源实现qos2服务质量,并且实现mbedtls支持,此仓库是专门为RT-Thread做的软件包,原始仓库位于:https://github.com/jiejieTop/mqttclient
-
-
博客
-
作者博客:
-
你不得不看的图文并茂的MQTT协议通信过程!!!
-
MQTT协议简介及协议原理
-
mqttclient设计与实现方式
-
-
大牛笔记:
-
记一次解决MQTT软件包内存泄露的心路历程
-
-
-
APP
-
Download | Eclipse Mosquitto
-
MQTTX:全功能 MQTT 客户端工具
-
1. MQTT概述
1.1 形象地理解三个角色
MQTT通信模型示意图如下:
使用电视台、记者、观众三个角色来类比更容易理解:
-
电视台:在MQTT里被称为服务器(broker),有如下作用
-
接受来自客户的网络连接; // 记者/观众连接进电视台
-
接受客户发布的应用信息; // 接受记者发布的消息
-
处理来自客户端的订阅和退订请求; // 处理观众的订阅、退订请求
-
向订阅的客户转发应用程序消息 // 给观众转发记者报道的消息
-
-
记者和观众都是客户端,记者也可以当观众,观众也可以当记者,有如下作用:
-
发布信息; // publish,记者
-
订阅消息;// Subscribe ,观众
-
退订或删除消息;
-
断开与服务器连接
-
1.2 消息的传递
还是以日常生活为例,提几个问题:
-
观众:我只关心财经新闻,那么只订阅"财经新闻",不订阅"体育新闻"
-
记者:我是财经记者,我可以发布"财经新闻",不发布"体育新闻"
这个过程中,引入两个概念:
-
主题(Topic):是财经类的?还是体育类的?
-
消息(Message)或负载(Playload):具体的新闻信息
具体的流程是这样的:
-
观众打电话到电视台:connect
-
观众向电视台订阅"财经新闻": Subscribe 某个 Topic
-
记者打电话到电视台:connect
-
记者向电视台发布"财经新闻":Public某个Topic的某个Playload
-
电视台向"订阅了财经新闻的观众"发布"某条消息":Public某个Playload给Subscriber
整个过程中,电视台和记者、电视台和观众直接的电话要保存连接状态,还要时不时确认一下:
-
记者要时不时给电视台喊一声"喂":确保电视台还正常
-
观众要时不时给电视台喊一声"喂":确保电视台还正常
2. 在Windows上体验MQTT
2.1 安装APP
安装这2个APP:
2.2 启动服务器
使用DOS命令行,进入mosquitto-2.0.14-install-windows-x64的安装目录,执行命令:
cd "c:\Program Files\mosquitto"
.\mosquitto.exe -v
在下面的实验中,无论是使用MQTTX还是使用mosquitto_pub/mosquitto_sub,都要保持mosquitto.exe在运行。
2.3 使用MQTTX
2.3.1 建立连接
运行MQTTX后,如下图操作:
2.3.2 订阅主题
建立连接后,如下图操作:
2.3.3 发布主题
如下操作:
2.4 使用mosquitto
2.4.1 发布消息
参数说明:
mosquitto_pub 命令参数说明
1. -d 打印debug信息
2. -f 将指定文件的内容作为发送消息的内容
3. -h 指定要连接的域名 默认为localhost
4. -i 指定要给哪个clientId的用户发送消息
5. -I 指定给哪个clientId前缀的用户发送消息
6. -m 消息内容
7. -n 发送一个空(null)消息
8. -p 连接端口号
9. -q 指定QoS的值(0,1,2)
10. -t 指定topic
11. -u 指定broker访问用户
12. -P 指定broker访问密码
13. -V 指定MQTT协议版本
14. --will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用
15. --will-qos Will的QoS值。该参数需要与--will-topic一起使用
16. --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用
17. --will-topic 用户发送Will消息的topic
使用DOS命令行,进入mosquitto-2.0.14-install-windows-x64的安装目录,执行命令:
cd "c:\Program Files\mosquitto"
.\mosquitto_pub.exe -h 127.0.0.1 -p 1883 -t "100ask"
如图:
2.4.2 订阅消息
参数说明:
mosquitto_sub 命令参数说明
1. -c 设定‘clean session’为无效状态,这样一直保持订阅状态,即便是已经失去连接,如果再次连接仍旧能够接收的断开期间发送的消息。
2. -d 打印debug信息
3. -h 指定要连接的域名 默认为localhost
4. -i 指定clientId
5. -I 指定clientId前缀
6. -k keepalive 每隔一段时间,发PING消息通知broker,仍处于连接状态。 默认为60秒。
7. -q 指定希望接收到QoS为什么的消息 默认QoS为0
8. -R 不显示陈旧的消息
9. -t 订阅topic
10. -v 打印消息
11. --will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与--will-topic一起使用
12. --will-qos Will的QoS值。该参数需要与--will-topic一起使用
13. --will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与--will-topic一起使用
14. --will-topic 用户发送Will消息的topic
使用DOS命令行,进入mosquitto-2.0.14-install-windows-x64的安装目录,执行命令:
3. kawaii-mqtt源码分析
3.1 使用
几条代码使用MQTT:
void my_message_handler_t(void* client, message_data_t* msg)
{
}
int main(void)
{
int err;
mqtt_client_t *client = NULL;
err = mqtt_connect(client);
err = mqtt_subscribe(client, "100ask-topic", QOS0, my_message_handler_t);
while (1);
}
从上述代码中,提2个问题:
答案:
3.2 kawaii-mqtt内部实现
3.2.1 主要代码
kawaii-mqtt内部处理都是使用mqtt_yield_thread线程来处理:
主要函数是mqtt_yield:
mqtt_yield里的核心函数是对数据包的处理:mqtt_packet_handle
3.2.2 处理函数记录在链表里
mqtt_client结构体里有2个链表:
MQTT Client向Broker发出某些数据包时,期待得到回应(ACK):会启动一个定时器。如果定时器超时表示没有收到ACK:
-
要么重发
-
要么出错
-
对于ACK包,一般无需提供处理函数
要订阅某个主题时,MQTT Client会发出SUBCRIBE包,期待得到回应的数据包:SUBACK包。代码如下:
mqtt_subscribe
msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);
/* create a ack handler node */
ack_handler = mqtt_ack_handler_create(c, type, packet_id, payload_len, handler);
platform_timer_cutdown(&ack_handler->timer, c->mqtt_cmd_timeout);
mqtt_list_add_tail(&ack_handler->list, &c->mqtt_ack_handler_list);
如果在指定时间里没有收到SUBACK包,那么就会在mqtt_ack_handler_list中删除该handler。
如果收到队列SUBACK包,那么要做两件事:
-
在mqtt_ack_handler_list中删除该handler
-
把该handler放到mqtt_msg_handler_list中:以后收到PUBLISH数据包时这个handler被调用