当前位置: 首页 > article >正文

【Python】Paho-MQTT:mqtt 信息收发

Paho-MQTT 是一款对 MQTT (Message Queuing Telemetry Transport) 协议的 Python 库实现。MQTT 是一种轻量类信息公告方式协议,应用于 IoT 环境。通过 Paho-MQTT,可以实现信息发布与接收,构建高效的通信网络。

MQTT 核心概念

  • 服务器 (Broker):信息分发与管理的中心。
  • 客户端 (Client):发布或订阅信息的终端。
  • 主题 (Topic):信息分类与路由依据。
  • QoS 级别:保证信息传输质量,分为 0 (尽力而为)、1 (至少一次)、2 (仅一次)。

功能函数详解

初始化客户端

mqtt.Client(client_id="", clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp")
  • client_id:客户端标识符。
  • clean_session:是否清除会话数据。
  • userdata:用户自定义数据。
  • protocol:使用的 MQTT 协议版本。
  • transport:选择通信协议,WebSocket 通常为 “websockets”。
import paho.mqtt.client as mqtt

client = mqtt.Client(client_id="example_client", clean_session=True)
print("MQTT 客户端初始化完成")

连接到 Broker

connect(host, port=1883, keepalive=60, bind_address="")
  • host:Broker 的 IP 地址或域名。
  • port:通信端口,WebSocket 模式通常为 8083。
  • keepalive:保活时间,单位为秒。
  • bind_address:可选参数,绑定到特定网络接口。
BROKER_URL = "ws://broker.emqx.io:8083/mqtt"

client = mqtt.Client()
client.connect(BROKER_URL, 8083, 60)
print("已成功连接到 Broker")

发布信息

publish(topic, payload=None, qos=0, retain=False)
  • topic:信息发布的主题。
  • payload:发送的数据内容。
  • qos:信息传输质量。
  • retain:是否保留消息。
client.publish("test/topic", payload="Hello MQTT", qos=1)
print("信息发布成功")

订阅信息

subscribe(topic, qos=0)
  • topic:订阅的主题。
  • qos:订阅的 QoS 级别。
client.subscribe("test/topic", qos=1)
print("已订阅 test/topic 主题")

注册回调函数

连接回调函数
def on_connect(client, userdata, flags, rc):
    print(f"连接状态:{rc}")
消息回调函数
def on_message(client, userdata, msg):
    print(f"接收到消息:主题={msg.topic} 内容={msg.payload.decode()}")
注册示例
client.on_connect = on_connect
client.on_message = on_message

阻塞的网络循环

启动一个阻塞的网络循环,处理与 MQTT Broker 的通信,包括接收消息、维护连接和调用回调函数。

client = mqtt.Client()
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever()  # 进入阻塞模式,持续处理网络事件

非阻塞的网络循环

启动一个非阻塞的网络循环,在后台线程中处理与 Broker 的通信,程序主线程仍然可以继续执行其他任务。

client = mqtt.Client()
client.connect("broker.emqx.io", 1883, 60)
client.loop_start()  # 启动非阻塞网络循环

# 主线程执行其他任务
for i in range(5):
    print("主线程任务运行中...")
    time.sleep(1)

client.loop_stop()  # 停止后台网络循环

实战:IoT 环境下的信息监控

发布主题信息

import paho.mqtt.client as mqtt
import random

BROKER_URL = "ws://broker.emqx.io:8083/mqtt"
TOPIC = "iot/sensor/data"

# 产生随机温度信息

def generate_temperature():
    return round(random.uniform(20.0, 30.0), 2)

# 客户端连接
client = mqtt.Client(transport="websockets")
client.connect("broker.emqx.io", 8083, 60)

while True:
    temperature = generate_temperature()
    client.publish(TOPIC, f"Temperature: {temperature} C")

接收主题信息

import paho.mqtt.client as mqtt

BROKER_URL = "ws://broker.emqx.io:8083/mqtt"
TOPIC = "iot/sensor/data"

# 信息接收函数

def on_message(client, userdata, msg):
    print(f"接收信息:{msg.topic} - {msg.payload.decode()}")

client = mqtt.Client(transport="websockets")
client.on_message = on_message
client.connect("broker.emqx.io", 8083, 60)
client.subscribe(TOPIC)
client.loop_forever()

高级配置

完善错误处理

通过指定 on_log 函数输出 MQTT 日志以便跟踪连接、消息接收等操作。

# 定义日志输出函数

def on_log(client, userdata, level, buf):
    print(f"日志:{buf}")

client.on_log = on_log

TLS/加密连接

通过安全传输层协议(TLS)保障数据安全。

import ssl

client = mqtt.Client(transport="websockets")
client.tls_set(ca_certs="/path/to/ca.crt", certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(False)
client.connect("broker.emqx.io", 8084, 60)

自动重连逻辑

# 处理断线重连

def on_disconnect(client, userdata, rc):
    print("连接断开,尝试重新连接...")
    while rc != 0:
        try:
            rc = client.reconnect()
            print("重连成功")
        except Exception as e:
            print(f"重连失败:{e}")

client.on_disconnect = on_disconnect

通过以上实现,可以确保 MQTT 客户端在网络异常情况下的高可用性。


http://www.kler.cn/a/504724.html

相关文章:

  • Ubuntu把应用程序放到桌面
  • 【STM32】HAL库USB实现软件升级DFU的功能操作及配置
  • ElasticSearch|ES|架构介绍|原理浅析
  • Python编程与在线医疗平台数据挖掘与数据应用交互性研究
  • 存储过程和触发器
  • 本地部署项目管理工具 Leantime 并实现外部访问
  • 40,【6】CTFHUB WEB SQL MYSQL数据库
  • rsarsa-给定pqe求私钥对密文解密
  • Day08-后端Web实战——JDBCMybatis
  • PanWeidb-使用BenchmarkSQL对磐维数据库进行压测
  • 比较之舞,优雅演绎排序算法的智美篇章
  • 数仓建模(六)从ODS到DWD、DWS、ADS
  • 过压保护电路
  • 查看linux 当前运行的 python脚本
  • Linux 系统资源监控笔记
  • MySQL表的创建实验
  • leetcode131.分割回文串
  • ISP基本框架及算法介绍
  • ROS2 准备工作(虚拟机安装,Ubuntu安装,ROS2系统安装)
  • [PAT 甲级] 1179 Chemical Equation (DFS)
  • python中的RPA->playwright自动化录制脚本实战案例笔记
  • Qt 各版本选择
  • 软定时器的删除与状态查询
  • UE材质节点Fresnel
  • 2025年互联网医院系统源码开发趋势:如何构建AI在线问诊APP
  • 2025.1.15——假期回归训练,从sql注入开始|一、SQL整数型注入