当前位置: 首页 > article >正文

MQTT应用环路验证

前言

  如今物联网的概念在如今十分普及,而与之伴随的协议就是MQTT,之前给我的感觉就是熟悉又陌生,所以最近就探究,整个MQTT的应用环路,进一步加深其应用层面上的了解。当然,笔者本着体验阿里云的产品试用的同时,来进行验证,可能有什么不是很符合实际的,大家见谅。还有,可以从下述也能看到环路并没有完整的实现,所以写下该文章也是给大家提供下MQTT方向的思路,和记录下我的尝试流程以及思考。

文中用到的各大官网以及软件:阿里云,EMQX Platform、MQTTX工具。

方案

  打红叉的线路都是在下述提到的云产品(其它中间件或产品可能可以实现)中实现不了的,还是我技术水平有限了,之前猜想还以为互通的线路应该没啥问题,但是没想,多数只能单方面的写、上传,线路还是没法达到发生/接收的效果。

1.客户端+MQTT端+服务端+数据库

在这里插入图片描述

2.客户端+EMQX专业版+数据库

主要就是修改成EMQX专业版,可以优化处理端,减少步骤

在这里插入图片描述

方案1

1.客户端

客户端通常是具备网络连接能力并支持MQTT协议通信的物联网设备,例如ESP32、树莓派或部分Arduino开发板等等。不过平时测试时,通比较喜欢先用MQTTX来,模拟设备端收发数据来调试。

在这里插入图片描述

2.MQTT端

MQTT端本来之前都是用的阿里云的物联网平台的公共实例的,但是之前不知道为什么不见了,不知道是不是被什么操作给卡掉了,还是怎么回事,还有那上面的云产品流转,也就是得到MQTT消息的处理方法,还挺难用的,数据目的大部分也只能是自家的云产品,当然这公共实例是免费体验,也不能要求太多。

在这里插入图片描述

不过也正好体验下EMQX Cloud的Serverless服务,毕竟这个更加专注于MQTT服务,而且Serverless版本每月都免费额度,设定好消费额度后,可以每个月都只用免费额度的量,还是非常适合用来做测试验证之类的。

在这里插入图片描述

在这里插入图片描述

Serverless版本也有着一定的限制,就是较为关键的功能,数据集成方面给出的功能少,按文档来看其它版本才有更加多的集成,如数据库之类的,Serverless版本则没有,所以说该方案才考虑用阿里云的函数计算,搭建服务端,通过HTTP服务把消息发过去处理。

在这里插入图片描述

还有就是,每个连接进MQTT的客户端,都要先在控制台这里进行客户端认证,设置用户名和密码后,客户端才能拿着该信息登录MQTT服务器。这里我定义了,两个用户,一个是用来给设备客户端做数据上报,一个给服务端做数据返回。

在这里插入图片描述

3.服务端

也在有服务端处理需求的时候,找阿里云产品,才开始使用,看了视频介绍,也符好,可以将HTTP服务的请求进行处理,只用关注于服务端的处理程序就行了,至于服务器的购买,底层软件的安装都不用,直接按照,配置好的文件框架来。我配置的函数则是,Web函数下Debian10的python,则是基于Flask框架的。

在这里插入图片描述

在这里插入图片描述

下面也附上,我在函数计算中的调试代码,不过MQTT的服务是有问题的。数据库读写的功能是没有问题,目前使用的是flask_mqtt包,调试的时候一直再说SSL的验证有问题,也提供ca.crt证书了。在之前使用过paho.mqtt包,能连接上,通过.publish也能发送信息到主题,但是代码写的是发一条,结果是一直发,查不出问题就放弃了。

from flask import Flask
from flask import request
import pymysql
from flask_mqtt import Mqtt
from flask import jsonify
import ssl
import os


REQUEST_ID_HEADER = 'x-fc-request-id'

app = Flask(__name__)


# 获取当前工作目录
current_dir = os.path.dirname(os.path.abspath(__file__))
ca_certs_path = os.path.join(current_dir, 'emqxsl-ca.crt')

# MQTT配置
app.config['MQTT_BROKER_URL'] = 'MQTT服务器'
app.config['MQTT_BROKER_PORT'] = 8883
app.config['MQTT_USERNAME'] = '用户名' # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = '密码'      # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5                # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = True           # 如果你的服务器支持 TLS,请设置为 True
app.config['MQTT_TLS_INSECURE'] = True  # 仅用于调试,生产环境中不推荐
app.config['MQTT_TLS_CA_CERTS'] = ca_certs_path
app.config['MQTT_TLS_SSL_VERSION'] = ssl.PROTOCOL_TLSv1_2
app.config['MQTT_TLS_CERT_REQS'] = ssl.CERT_REQUIRED

mqtt_client = Mqtt(app)

# mqtt连接成功回调
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # 订阅主题
   else:
       print('Bad connection. Code:', rc)

# 接收消息
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))



# 发布消息
@app.route('/get', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})


# 修改数据库
def update_database(temp, humi):
        # 数据库操作
        # 打开数据库连接
        my_db = pymysql.connect(host='数据库地址',
                user='用户',
                password='密码',
                database='表名')
        # 使用cursor()方法获取操作游标 
        cursor = my_db.cursor()
        
        # SQL 更新语句
        sql = "UPDATE `my_db`.`mqtt` SET `temp`=%s,`humi`=%s WHERE `id`=1"
        try:
                # 执行SQL语句
                cursor.execute(sql, (temp, humi))
                # 提交到数据库执行
                my_db.commit()
        except Exception as e:
                # 发生错误时回滚
                my_db.rollback()
                print(f"Error occurred: {e}")
        finally:
                if my_db:
                        # 关闭数据库连接
                        my_db.close()


@app.route('/', defaults={'path': ''})
@app.route('/<path:path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def hello_world(path):
        rid = request.headers.get(REQUEST_ID_HEADER)
        print("FC Invoke Start RequestId: " + rid)
        print("Path: " + path)
        # MQTT端数据解析
        json_data = request.get_json(force=True)
        if json_data:
                print("Json Data: " + str(json_data))
                # 操作方法 send or get
                method = str(json_data['method'])
                print("Data: " + method)
                # 如果是发送数据,则更新数据库
                if method == 'send':
                        # 提取 temp 和 humi 的值
                        temp_value = json_data['data'].get('temp', 0)  # 默认值为 0
                        humi_value = json_data['data'].get('humi', 0)  # 默认值为 0
                        update_database(temp_value, humi_value)
                elif method == 'get':
                        # mqtt_client.publish(topic, f"Hi MQTTX!")
                        pass
                        
        print("FC Invoke End RequestId: " + rid)
        return "OK!"

if __name__ == '__main__':
        app.run(host='0.0.0.0',port=9000)

4.数据持久化

数据库选用的阿里云的MySQL,启动实例,创建账号,开通外网访问权限以获取地址,复制到服务端即可。

在这里插入图片描述

方案2

该方案主要步骤只是把EMQX的Serverless版本,改为专业版,因为数据集成自带数据库操作,就可以去掉上个方案中的服务端处理,其它的大体不变。版本介绍 | EMQX Platform
当然这个网址,也写有了各版本之间开放什么功能。

在这里插入图片描述

在这里插入图片描述

问题

问题1:MQTT端的HTTP服务,无法访问服务端

EMQX Cloud的Serverless版本,在数据集成设定调用HTTP服务输出动作到服务端(函数计算),一直都失败,而用Hoppscotch工具,给定相同的地址,和请求头却又可以。然后我通过Beyond Compare 4(文本比较工具),比较了,这成功案例和失败案例的日志,发现原来的HTTP服务的默认端口80,与服务端提供服务的9000端口不一致,造成的,后来在输出动作的url上加:9000就行了。

在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/565676.html

相关文章:

  • Godot4.3 显示像素风格图片模糊如何设置?
  • Debian安装C语言环境
  • 自主可控:国产CAE一体化平台如何筑基新能源车未来
  • leetcode 75.颜色分类(详解)数组分块c++
  • 【Spring】AOP
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_conf_t
  • [深度学习] 大模型学习2-提示词工程指北
  • 【落羽的落羽 C++】C++入门基础·其之一
  • 芯麦GC1277:电脑散热风扇驱动芯片的优质之选 并可替代传统的0CH477/灿瑞芯片。
  • API,URL,Token,XML,JSON是干嘛的
  • 某镇江 app 练手
  • linux之crosstool-NG(1)生成toolchain
  • TCP/IP 5层协议簇:数据链路层(交换机工作原理)
  • 如何获取mac os 安装盘
  • AI 自动化编程:从效率革命到未来教育的革新
  • 论文绘图工具
  • 自动化测试框架设计
  • 细说 Java 线程池
  • 商城源码的框架
  • c++中迭代器和指针有什么区别?