当前位置: 首页 > article >正文

python实现mqtt client 客户端

功能介绍
实现一个简单的MQTT客户端,该客户端需要连接到一个MQTT代理,并发布一些消息到指定的主题。

  1. 实现一个MQTT客户端,连接到一个MQTT代理。
  2. 能够自动重连。
  3. 支持授权登录、TLS验证
  4. 内部使用队列,用于接收外部调用发送过来的字符串消息
  5. 内部不断从队列获取消息,并发送到指定主题。
  6. 退出接口

为了实现一个简单的MQTT客户端,我们可以使用 paho-mqtt 库。这个库提供了与 MQTT 代理通信的功能。我们将按照以下步骤来实现这个客户端:

  1. 安装 paho-mqtt 库。
  2. 实现一个 MQTT 客户端类,包含连接、自动重连、消息队列处理等功能。
  3. 编写测试代码来验证客户端的功能。

1. 安装 paho-mqtt 库

首先,我们需要安装 paho-mqtt 库。你可以使用以下命令来安装它:

pip install paho-mqtt

2. 实现 MQTT 客户端类

import paho.mqtt.client as mqtt
import queue
import time
import threading

class SimpleMQTTClient:
    def __init__(self, broker_address, port=1883, keepalive=60, topic="test/topic",
                 username=None, password=None, ca_certs=None, certfile=None, keyfile=None):
        self.broker_address = broker_address
        self.port = port
        self.keepalive = keepalive
        self.topic = topic
        self.message_queue = queue.Queue()
        self.connected = False
        self.running = False
        
        client_id = f'sunoff_oid-mqtt-{int(time.time())}'
        self.client = mqtt.Client(
            client_id=client_id
        )

        if username and password:
            self.client.username_pw_set(username=username, password=password)

        if ca_certs:
            self.client.tls_set(ca_certs=ca_certs, certfile=certfile, keyfile=keyfile)
            self.client.tls_insecure_set(False)
                  
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
            self.connected = True
        else:
            print(f"Failed to connect, return code {rc}")

    def on_disconnect(self, client, userdata, rc):
        print("Disconnected from MQTT Broker")
        self.connected = False
        self.reconnect()

    def reconnect(self):          
        while not self.connected and self.running:
            print("Reconnecting to MQTT Broker...")
            try:
                self.client.reconnect()
            except Exception as e:
                print(f"Reconnection failed: {e}")
            time.sleep(5)

    def start(self):
        self.running = True
        self.client.connect(self.broker_address, self.port, self.keepalive)
        self.client.loop_start()

        threading.Thread(target=self.process_queue, daemon=True).start()

    def process_queue(self):
        while self.running:
            try:
                message = self.message_queue.get(timeout=10)
                self.client.publish(self.topic, message)
                print(f"Published message: {message}")
            except queue.Empty:
                continue

    def send_message(self, message):
        if self.connected:
            self.message_queue.put(message)
        else:
            # print("Not connected, message not sent")
            pass

    def stop(self):
        self.running = False
        self.client.loop_stop()
        self.client.disconnect()
        print("MQTT client stopped.")

3. 编写测试代码

if __name__ == "__main__":
    # 配置MQTT代理地址和主题
    broker_address = "111.190.143.158"  # 使用公共MQTT代理进行测试
    port = 9701
    topic = "rid_mq"

    # 创建MQTT客户端实例
    mqtt_client = SimpleMQTTClient(broker_address, port=port, topic=topic)

    # 启动客户端
    mqtt_client.start()

    # 发送一些消息
    for i in range(100):
        message = f"Hello MQTT {i}"
        mqtt_client.send_message(message)
        time.sleep(0.09)

    # 等待一段时间,确保消息发送完成
    time.sleep(2)

    # 停止客户端
    mqtt_client.stop()

测试截图

发送端
在这里插入图片描述

mqttx 接收端
在这里插入图片描述


http://www.kler.cn/a/507586.html

相关文章:

  • Vue篇-07
  • hadoop3.3和hive4.0安装——单节点
  • JAVA安全—JWT攻防Swagger自动化Druid泄露
  • vscode的安装与使用
  • FFmpeg开发笔记(七)欧拉系统编译安装FFmpeg
  • 我国无人机新增实名登记110.3 万架,累计完成飞行2666万小时
  • 128.最长连续序列
  • Web3D交互展示:重塑产品展示的新维度
  • ESP8266 AP模式 网页配网 arduino ide
  • vue3学习日记8 - 一级分类
  • Kivy App开发之UX控件Spinner选择框
  • qt信号槽复杂参数传递,以结构体为例
  • 递归40题!再见递归
  • React 中hooks之useReducer使用场景和方法总结
  • python学opencv|读取图像(三十六)(反)零值处理
  • springboot 利用html模版导出word
  • JavaScript笔记基础篇03——函数
  • HarmonyOS使用Grid网格实现计算器功能实现
  • AndroidStudio升级到2024.2.2项目AGP升级8.8.0版本记录
  • MyBatis(三)代理Dao方式的CRUD操作
  • uniapp 微信小程序 金额展示套餐
  • 【狂热算法篇】探秘图论之 Floyd 算法:解锁最短路径的神秘密码(通俗易懂版)
  • 算法(蓝桥杯)贪心算法5——删数问题的解题思路
  • Titans Learning to Memorize at Test Time
  • AI编程工具使用技巧——通义灵码
  • 《火焰烟雾检测开源神经网络模型:智能防火的科技护盾》