MQTT应用环路验证
前言
如今物联网的概念在如今十分普及,而与之伴随的协议就是MQTT,之前给我的感觉就是熟悉又陌生,所以最近就探究,整个MQTT的应用环路,进一步加深其应用层面上的了解。当然,笔者本着体验阿里云的产品试用的同时,来进行验证,可能有什么不是很符合实际的,大家见谅。还有,可以从下述也能看到环路并没有完整的实现,所以写下该文章也是给大家提供下MQTT方向的思路,和记录下我的尝试流程以及思考。
文中用到的各大官网以及软件:阿里云,EMQX Platform、MQTTX工具。
方案
打红叉的线路都是在下述提到的云产品(其它中间件或产品可能可以实现)中实现不了的,还是我技术水平有限了,之前猜想还以为互通的线路应该没啥问题,但是没想,多数只能单方面的写、上传,线路还是没法达到发生/接收的效果。
1.客户端+MQTT端+服务端+数据库
2.客户端+EMQX专业版+数据库
主要就是修改成EMQX专业版,可以优化处理端,减少步骤
方案1
1.客户端
客户端通常是具备网络连接能力并支持MQTT协议通信的物联网设备,例如ESP32、树莓派或部分Arduino开发板等等。不过平时测试时,通比较喜欢先用MQTTX
来,模拟设备端收发数据来调试。
2.MQTT端
MQTT端本来之前都是用的阿里云的物联网平台的公共实例的,但是之前不知道为什么不见了,不知道是不是被什么操作给卡掉了,还是怎么回事,还有那上面的云产品流转,也就是得到MQTT消息的处理方法,还挺难用的,数据目的大部分也只能是自家的云产品,当然这公共实例是免费体验,也不能要求太多。
不过也正好体验下EMQX Cloud
的Serverless服务,毕竟这个更加专注于MQTT服务,而且Serverless
版本每月都免费额度,设定好消费额度后,可以每个月都只用免费额度的量,还是非常适合用来做测试验证之类的。
Serverless
版本也有着一定的限制,就是较为关键的功能,数据集成方面给出的功能少,按文档来看其它版本才有更加多的集成,如数据库之类的,Serverless版本则没有,所以说该方案才考虑用阿里云的函数计算,搭建服务端,通过HTTP服务把消息发过去处理。
还有就是,每个连接进MQTT的客户端,都要先在控制台这里进行客户端认证,设置用户名和密码后,客户端才能拿着该信息登录MQTT服务器。这里我定义了,两个用户,一个是用来给设备客户端做数据上报,一个给服务端做数据返回。
3.服务端
也在有服务端处理需求的时候,找阿里云产品,才开始使用,看了视频介绍,也符好,可以将HTTP服务的请求进行处理,只用关注于服务端的处理程序就行了,至于服务器的购买,底层软件的安装都不用,直接按照,配置好的文件框架来。我配置的函数则是,Web函数下Debian10的python,则是基于Flask框架的。
下面也附上,我在函数计算
中的调试代码,不过MQTT的服务是有问题的。数据库读写的功能是没有问题,目前使用的是flask_mqtt
包,调试的时候一直再说SSL的验证有问题,也提供ca.crt证书了。在之前使用过paho.mqtt
包,能连接上,通过.publish也能发送信息到主题,但是代码写的是发一条,结果是一直发,查不出问题就放弃了。
from flask import Flask
from flask import request
import pymysql
from flask_mqtt import Mqtt
from flask import jsonify
import ssl
import os
REQUEST_ID_HEADER = 'x-fc-request-id'
app = Flask(__name__)
# 获取当前工作目录
current_dir = os.path.dirname(os.path.abspath(__file__))
ca_certs_path = os.path.join(current_dir, 'emqxsl-ca.crt')
# MQTT配置
app.config['MQTT_BROKER_URL'] = 'MQTT服务器'
app.config['MQTT_BROKER_PORT'] = 8883
app.config['MQTT_USERNAME'] = '用户名' # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = '密码' # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5 # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = True # 如果你的服务器支持 TLS,请设置为 True
app.config['MQTT_TLS_INSECURE'] = True # 仅用于调试,生产环境中不推荐
app.config['MQTT_TLS_CA_CERTS'] = ca_certs_path
app.config['MQTT_TLS_SSL_VERSION'] = ssl.PROTOCOL_TLSv1_2
app.config['MQTT_TLS_CERT_REQS'] = ssl.CERT_REQUIRED
mqtt_client = Mqtt(app)
# mqtt连接成功回调
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
if rc == 0:
print('Connected successfully')
mqtt_client.subscribe(topic) # 订阅主题
else:
print('Bad connection. Code:', rc)
# 接收消息
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
data = dict(
topic=message.topic,
payload=message.payload.decode()
)
print('Received message on topic: {topic} with payload: {payload}'.format(**data))
# 发布消息
@app.route('/get', methods=['POST'])
def publish_message():
request_data = request.get_json()
publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
return jsonify({'code': publish_result[0]})
# 修改数据库
def update_database(temp, humi):
# 数据库操作
# 打开数据库连接
my_db = pymysql.connect(host='数据库地址',
user='用户',
password='密码',
database='表名')
# 使用cursor()方法获取操作游标
cursor = my_db.cursor()
# SQL 更新语句
sql = "UPDATE `my_db`.`mqtt` SET `temp`=%s,`humi`=%s WHERE `id`=1"
try:
# 执行SQL语句
cursor.execute(sql, (temp, humi))
# 提交到数据库执行
my_db.commit()
except Exception as e:
# 发生错误时回滚
my_db.rollback()
print(f"Error occurred: {e}")
finally:
if my_db:
# 关闭数据库连接
my_db.close()
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def hello_world(path):
rid = request.headers.get(REQUEST_ID_HEADER)
print("FC Invoke Start RequestId: " + rid)
print("Path: " + path)
# MQTT端数据解析
json_data = request.get_json(force=True)
if json_data:
print("Json Data: " + str(json_data))
# 操作方法 send or get
method = str(json_data['method'])
print("Data: " + method)
# 如果是发送数据,则更新数据库
if method == 'send':
# 提取 temp 和 humi 的值
temp_value = json_data['data'].get('temp', 0) # 默认值为 0
humi_value = json_data['data'].get('humi', 0) # 默认值为 0
update_database(temp_value, humi_value)
elif method == 'get':
# mqtt_client.publish(topic, f"Hi MQTTX!")
pass
print("FC Invoke End RequestId: " + rid)
return "OK!"
if __name__ == '__main__':
app.run(host='0.0.0.0',port=9000)
4.数据持久化
数据库选用的阿里云的MySQL,启动实例,创建账号,开通外网访问权限以获取地址,复制到服务端即可。
方案2
该方案主要步骤只是把EMQX的Serverless
版本,改为专业版
,因为数据集成自带数据库操作,就可以去掉上个方案中的服务端处理,其它的大体不变。版本介绍 | EMQX Platform
当然这个网址,也写有了各版本之间开放什么功能。
问题
问题1:MQTT端的HTTP服务,无法访问服务端
EMQX Cloud
的Serverless版本,在数据集成设定调用HTTP服务输出动作到服务端(函数计算),一直都失败,而用Hoppscotch工具,给定相同的地址,和请求头却又可以。然后我通过Beyond Compare 4(文本比较工具),比较了,这成功案例和失败案例的日志,发现原来的HTTP服务的默认端口80,与服务端提供服务的9000端口不一致,造成的,后来在输出动作的url上加:9000
就行了。