python实现mqtt client 客户端
功能介绍:
实现一个简单的MQTT客户端,该客户端需要连接到一个MQTT代理,并发布一些消息到指定的主题。
- 实现一个MQTT客户端,连接到一个MQTT代理。
- 能够自动重连。
- 支持授权登录、TLS验证
- 内部使用队列,用于接收外部调用发送过来的字符串消息
- 内部不断从队列获取消息,并发送到指定主题。
- 退出接口
为了实现一个简单的MQTT客户端,我们可以使用 paho-mqtt 库。这个库提供了与 MQTT 代理通信的功能。我们将按照以下步骤来实现这个客户端:
- 安装 paho-mqtt 库。
- 实现一个 MQTT 客户端类,包含连接、自动重连、消息队列处理等功能。
- 编写测试代码来验证客户端的功能。
1. 安装 paho-mqtt 库
首先,我们需要安装 paho-mqtt 库。你可以使用以下命令来安装它:
pip install paho-mqtt
2. 实现 MQTT 客户端类
import paho.mqtt.client as mqtt
import queue
import time
import threading
class SimpleMQTTClient:
def __init__(self, broker_address, port=1883, keepalive=60, topic="test/topic",
username=None, password=None, ca_certs=None, certfile=None, keyfile=None):
self.broker_address = broker_address
self.port = port
self.keepalive = keepalive
self.topic = topic
self.message_queue = queue.Queue()
self.connected = False
self.running = False
client_id = f'sunoff_oid-mqtt-{int(time.time())}'
self.client = mqtt.Client(
client_id=client_id
)
if username and password:
self.client.username_pw_set(username=username, password=password)
if ca_certs:
self.client.tls_set(ca_certs=ca_certs, certfile=certfile, keyfile=keyfile)
self.client.tls_insecure_set(False)
self.client.on_connect = self.on_connect
self.client.on_disconnect = self.on_disconnect
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
self.connected = True
else:
print(f"Failed to connect, return code {rc}")
def on_disconnect(self, client, userdata, rc):
print("Disconnected from MQTT Broker")
self.connected = False
self.reconnect()
def reconnect(self):
while not self.connected and self.running:
print("Reconnecting to MQTT Broker...")
try:
self.client.reconnect()
except Exception as e:
print(f"Reconnection failed: {e}")
time.sleep(5)
def start(self):
self.running = True
self.client.connect(self.broker_address, self.port, self.keepalive)
self.client.loop_start()
threading.Thread(target=self.process_queue, daemon=True).start()
def process_queue(self):
while self.running:
try:
message = self.message_queue.get(timeout=10)
self.client.publish(self.topic, message)
print(f"Published message: {message}")
except queue.Empty:
continue
def send_message(self, message):
if self.connected:
self.message_queue.put(message)
else:
# print("Not connected, message not sent")
pass
def stop(self):
self.running = False
self.client.loop_stop()
self.client.disconnect()
print("MQTT client stopped.")
3. 编写测试代码
if __name__ == "__main__":
# 配置MQTT代理地址和主题
broker_address = "111.190.143.158" # 使用公共MQTT代理进行测试
port = 9701
topic = "rid_mq"
# 创建MQTT客户端实例
mqtt_client = SimpleMQTTClient(broker_address, port=port, topic=topic)
# 启动客户端
mqtt_client.start()
# 发送一些消息
for i in range(100):
message = f"Hello MQTT {i}"
mqtt_client.send_message(message)
time.sleep(0.09)
# 等待一段时间,确保消息发送完成
time.sleep(2)
# 停止客户端
mqtt_client.stop()
测试截图
发送端
mqttx 接收端