当前位置: 首页 > article >正文

MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信

在这里插入图片描述

EMQX安装

EMQX服务器安装

安装文档,见链接不另外写

https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html

启动 EMQX

启动为一个 systemd 服务:

sudo systemctl start emqx

在windows安装客户端

在线 MQTT WebSocket 客户端工具,MQTTX Web 是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.

建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。
在这里插入图片描述
由于是后期被写的博文,图是借官方的。请自行区分一下。

平台安装后的地址

1,平台的地址

  • http://127.0.0.1:18083
    后台登录 用户名:test 密码:test

Laravel中处理MQTT订阅

1,安装MQTT客户端库

在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:

composer require php-mqtt/client

2, 新建command文件

文件路径:app/Console/Commands/MqttClientCommand.php

这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。

MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。

MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。

<?php

namespace App\Console\Commands;

use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;

use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;

use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;

class MQTTUserConfig
{    
    const SIMPS_MQTT_REMOTE_HOST = '*';
    const SIMPS_MQTT_PORT = 1883;
    const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
    const SIMPS_MQTT_USER = 'test*';
    const SIMPS_MQTT_PASSWORD = 'test*';
}

class MqttClientCommand extends Command
{
    protected $signature = 'mqtt:handle {param1}';

    protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
    protected  $mqtt ;
    const SWOOLE_MQTT_CONFIG = [
        'open_mqtt_protocol' => true,
        'package_max_length' => 2 * 1024 * 1024,
        'connect_timeout' => 5.0,
        'write_timeout' => 5.0,
        'read_timeout' => 5.0,
    ];

    //模拟设备
    const CLiENT_IDs = [
        'mqttx_devA',
        'mqttx_devB',
        'mqttx_devC',
        'mqttx_devD'
    ];

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {

        $param1 =$this->argument('param1');
//        $param2 =$this->argument('param2');
        if ($param1=='subscribe') {
            $this->info('启动订阅...');
            $this->subscribeMqtt();
        } elseif ($param1=='public') {
            $this->info('启动发布...');
            $this->publishMQTT();
        }
        echo '\r\n\r\n分配工作执行完成!!!';

    }



    protected function getTestMQTT5ConnectConfig()
        {

            $config = new ClientConfig();
            $UserConfig = new MQTTUserConfig();
            return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
                ->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
                ->setClientId(Client::genClientID())
                ->setKeepAlive(10)
                ->setDelay(3000) // 3s
                ->setMaxAttempts(5)
                ->setProperties([
                    'session_expiry_interval' => 60,
                    'receive_maximum' => 65535,
                    'topic_alias_maximum' => 65535,
                ])
                ->setProtocolLevel(5)
                ->setSwooleConfig( [
                    'open_mqtt_protocol' => true,
                    'package_max_length' => 2 * 1024 * 1024,
                    'connect_timeout' => 5.0,
                    'write_timeout' => 5.0,
                    'read_timeout' => 5.0,
                ]);

    }

    private function heartbeat($message) {
        if ($message) {
            parse_str($message,$array);
            $device = $array['imei'];
            $hash = ':mqtt:heartbeat:online'.":{$device}";
            Redis::expire($hash,30);  ##30s有效
            Redis::sAdd($hash,1);
        }

    }
    /*
     * 订阅
     *  private function subscribeMqtt(){


        Coroutine\run(function () {
            $client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
            ....
     */
    private function subscribeMqtt(){

        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
            $this->getTestMQTT5ConnectConfig());
            $will = [
                'topic' => 'simps-mqtt/dinweiyi/delete',
                'qos' => 1,
                'retain' => 0,
                'message' => 'byebye',
                'properties' => [
                    'will_delay_interval' => 60,
                    'message_expiry_interval' => 60,
                    'content_type' => 'test',
                    'payload_format_indicator' => true, // false 0 1
                ],
            ];
            $client->connect(true, $will);

            $topics['simps-mqtt/dinweiyi/subscribe_message'] = [
                'qos' => 2,
                'no_local' => true,
                'retain_as_published' => true,
                'retain_handling' => 2,
            ];

            $res = $client->subscribe($topics);
            $timeSincePing = time();
            var_dump($res);

            echo '\r\n\r\n connect success !!!';
            while (true) {
                try {
                    $buffer = $client->recv();
                    $message = null;
                    if ($buffer && $buffer !== true) {
                        $message = $buffer["message"];

                        // QoS1 PUBACK
                        if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
                            $client->send(
                                [
                                    'type' => Types::PUBACK,
                                    'message_id' => $buffer['message_id'],
                                ],
                                false
                            );
                        }
                        if ($buffer['type'] === Types::DISCONNECT) {
                            echo sprintf(
                                "Broker is disconnected, The reason is %s [%d]\n",
                                ReasonCode::getReasonPhrase($buffer['code']),
                                $buffer['code']
                            );
                            $client->close($buffer['code']);
                            break;
                        }
                        $reportObj = new DeviceReportController();

                        $ret = $reportObj->store($message);
                        var_dump("182>>>",$ret);
                        unset($reportObj);
                    }
                    if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
                        $buffer = $client->ping();
                        if ($buffer) {
                            echo 'send ping success ...' ;
                            $this->heartbeat($message);
                            $timeSincePing = time();
                        }
                    }

                } catch (\Throwable $e) {
                    throw $e;
                }
            }

        });


    }

    protected function getMessage() {
        $client_ids = [
            'mqttx_devA',
//            'mqttx_devB',
            'mqttx_devC',
            'mqttx_devD'
        ];
        $message = [];
        $message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
        $message['time'] = time();
        $message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
        return json_encode($message);
    }
    /*
     * 发布
     */
    public function publishMQTT() {
        Coroutine\run(function () {
            $UserConfig = new MQTTUserConfig();
            $client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
            $this->getTestMQTT5ConnectConfig());
            $client->connect();
            while (true) {
                $message = $this->getMessage();
                $response = $client->publish(
                    'simps-mqtt/user/subscribe_message',
                    $message,
                    1,
                    0,
                    0,
                    [
                        'topic_alias' => 1,
                        'message_expiry_interval' => 12,
                    ]
                );
                var_dump( 'publishMQTT>>>',$message);
                Coroutine::sleep(1);
            }
        });
    }

}

3, 代码流程图

使用Mermaid语法描述的上述PHP代码的流程图:

subscribe
public
收到消息
心跳超时
开始
构造函数 __construct
handle 方法
param1 参数
调用 subscribeMqtt
调用 publishMQTT
Coroutine 运行 subscribeMqtt
创建 MQTT 客户端并连接
设置遗嘱消息
订阅主题
接收消息
处理消息
发送心跳
心跳函数 heartbeat
存储消息
是否断开连接
关闭连接
Coroutine 运行 publishMQTT
创建 MQTT 客户端并连接
循环发布消息
获取测试消息
发布消息
结束

流程说明:

  1. 开始:程序启动。
  2. 构造函数 __construct:初始化命令行工具。
  3. handle 方法:处理命令行输入。
  4. param1 参数:根据输入的参数决定是订阅还是发布。
  5. 调用 subscribeMqtt:如果参数是subscribe,则调用此方法。
  6. 调用 publishMQTT:如果参数是public,则调用此方法。
  7. Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
  8. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  9. 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
  10. 订阅主题:订阅特定的MQTT主题。
  11. 接收消息:持续监听并接收消息。
  12. 处理消息:对接收到的消息进行处理。
  13. 心跳函数 heartbeat:检查设备心跳。
  14. 存储消息:将消息存储到数据库或其他存储系统。
  15. 是否断开连接:检查客户端是否断开连接。
  16. 关闭连接:如果断开,则关闭连接。
  17. Coroutine 运行 publishMQTT:在协程中运行发布方法。
  18. 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
  19. 循环发布消息:循环发布消息。
  20. 获取测试消息:生成要发布的测试消息。
  21. 发布消息:将消息发布到MQTT服务器。
  22. 结束:程序结束。

后台常驻运行

1,php artisan命令在后台运行
  1. 打开您的终端或SSH到您的服务器。
  2. 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe

3.命令行的php的版本与web php的版本号要一致

2,使用宝塔的守护进程开启进程

在这里插入图片描述
也可以添加守护进程。
以上2种最好是只选一个

测试

打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket

在这里插入图片描述
主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容

检查发送的结果

打开数据库,检查device_report表是否成功。成功应下图所示:
在这里插入图片描述

实操完成


http://www.kler.cn/a/320578.html

相关文章:

  • 【redis】ubuntu18安装redis7
  • 微服务Docker相关指令
  • 使用python搭建Web项目
  • swiper3匀速滚动会卡顿问题,已解决
  • Linux线程同步—竞态条件与互斥锁、读写锁(C语言)
  • <Java>String类型变量的使用
  • 基于Python flask的医院管理学院,医生能够增加/删除/修改/删除病人的数据信息,有可视化分析
  • 【c语言数据结构】栈的详解! 超级详细!(模拟实现,OJ练习题)
  • kubernetes K8S 挂载分布式存储 ceph
  • 算法基础8-双链表
  • Xcode16 iOS18 编译问题适配
  • 微信小程序教程:如何在个人中心实现头像贴纸功能
  • python爬虫解析工具BeautifulSoup(bs4)和CSS选择器——处理HTML和XML数据(7)
  • Windows系统修改Tomcat虚拟机内存参数
  • 《CUDA编程》3.简单CUDA程序的基本框架
  • 计算机毕业设计python+spark知识图谱房价预测系统 房源推荐系统 房源数据分析 房源可视化 房源大数据大屏 大数据毕业设计 机器学习
  • RuoYi-App根据不同角色权限实现功能按钮显隐
  • OpenHarmony(鸿蒙南向)——平台驱动指南【I2C】
  • 简易STL实现 | 红黑树的实现
  • SpringCloud-07 GateWay01 网关技术
  • 使用Okhttp-服务器不支持缓存的解决办法