【Python】Paho-MQTT:mqtt 信息收发
Paho-MQTT 是一款对 MQTT (Message Queuing Telemetry Transport) 协议的 Python 库实现。MQTT 是一种轻量类信息公告方式协议,应用于 IoT 环境。通过 Paho-MQTT,可以实现信息发布与接收,构建高效的通信网络。
MQTT 核心概念
- 服务器 (Broker):信息分发与管理的中心。
- 客户端 (Client):发布或订阅信息的终端。
- 主题 (Topic):信息分类与路由依据。
- QoS 级别:保证信息传输质量,分为 0 (尽力而为)、1 (至少一次)、2 (仅一次)。
功能函数详解
初始化客户端
mqtt.Client(client_id="", clean_session=True, userdata=None, protocol=mqtt.MQTTv311, transport="tcp")
- client_id:客户端标识符。
- clean_session:是否清除会话数据。
- userdata:用户自定义数据。
- protocol:使用的 MQTT 协议版本。
- transport:选择通信协议,WebSocket 通常为 “websockets”。
import paho.mqtt.client as mqtt
client = mqtt.Client(client_id="example_client", clean_session=True)
print("MQTT 客户端初始化完成")
连接到 Broker
connect(host, port=1883, keepalive=60, bind_address="")
- host:Broker 的 IP 地址或域名。
- port:通信端口,WebSocket 模式通常为 8083。
- keepalive:保活时间,单位为秒。
- bind_address:可选参数,绑定到特定网络接口。
BROKER_URL = "ws://broker.emqx.io:8083/mqtt"
client = mqtt.Client()
client.connect(BROKER_URL, 8083, 60)
print("已成功连接到 Broker")
发布信息
publish(topic, payload=None, qos=0, retain=False)
- topic:信息发布的主题。
- payload:发送的数据内容。
- qos:信息传输质量。
- retain:是否保留消息。
client.publish("test/topic", payload="Hello MQTT", qos=1)
print("信息发布成功")
订阅信息
subscribe(topic, qos=0)
- topic:订阅的主题。
- qos:订阅的 QoS 级别。
client.subscribe("test/topic", qos=1)
print("已订阅 test/topic 主题")
注册回调函数
连接回调函数
def on_connect(client, userdata, flags, rc):
print(f"连接状态:{rc}")
消息回调函数
def on_message(client, userdata, msg):
print(f"接收到消息:主题={msg.topic} 内容={msg.payload.decode()}")
注册示例
client.on_connect = on_connect
client.on_message = on_message
阻塞的网络循环
启动一个阻塞的网络循环,处理与 MQTT Broker 的通信,包括接收消息、维护连接和调用回调函数。
client = mqtt.Client()
client.connect("broker.emqx.io", 1883, 60)
client.loop_forever() # 进入阻塞模式,持续处理网络事件
非阻塞的网络循环
启动一个非阻塞的网络循环,在后台线程中处理与 Broker 的通信,程序主线程仍然可以继续执行其他任务。
client = mqtt.Client()
client.connect("broker.emqx.io", 1883, 60)
client.loop_start() # 启动非阻塞网络循环
# 主线程执行其他任务
for i in range(5):
print("主线程任务运行中...")
time.sleep(1)
client.loop_stop() # 停止后台网络循环
实战:IoT 环境下的信息监控
发布主题信息
import paho.mqtt.client as mqtt
import random
BROKER_URL = "ws://broker.emqx.io:8083/mqtt"
TOPIC = "iot/sensor/data"
# 产生随机温度信息
def generate_temperature():
return round(random.uniform(20.0, 30.0), 2)
# 客户端连接
client = mqtt.Client(transport="websockets")
client.connect("broker.emqx.io", 8083, 60)
while True:
temperature = generate_temperature()
client.publish(TOPIC, f"Temperature: {temperature} C")
接收主题信息
import paho.mqtt.client as mqtt
BROKER_URL = "ws://broker.emqx.io:8083/mqtt"
TOPIC = "iot/sensor/data"
# 信息接收函数
def on_message(client, userdata, msg):
print(f"接收信息:{msg.topic} - {msg.payload.decode()}")
client = mqtt.Client(transport="websockets")
client.on_message = on_message
client.connect("broker.emqx.io", 8083, 60)
client.subscribe(TOPIC)
client.loop_forever()
高级配置
完善错误处理
通过指定 on_log
函数输出 MQTT 日志以便跟踪连接、消息接收等操作。
# 定义日志输出函数
def on_log(client, userdata, level, buf):
print(f"日志:{buf}")
client.on_log = on_log
TLS/加密连接
通过安全传输层协议(TLS)保障数据安全。
import ssl
client = mqtt.Client(transport="websockets")
client.tls_set(ca_certs="/path/to/ca.crt", certfile=None, keyfile=None, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(False)
client.connect("broker.emqx.io", 8084, 60)
自动重连逻辑
# 处理断线重连
def on_disconnect(client, userdata, rc):
print("连接断开,尝试重新连接...")
while rc != 0:
try:
rc = client.reconnect()
print("重连成功")
except Exception as e:
print(f"重连失败:{e}")
client.on_disconnect = on_disconnect
通过以上实现,可以确保 MQTT 客户端在网络异常情况下的高可用性。