MQTT客户端实战:从连接到通信。详细说明MQTT客户端和MQTT代理进行通信
EMQX安装
EMQX服务器安装
安装文档,见链接不另外写
https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
启动 EMQX
启动为一个 systemd 服务:
sudo systemctl start emqx
在windows安装客户端
在线 MQTT WebSocket
客户端工具,MQTTX Web
是开源的 MQTT 5.0 浏览器客户端,但是经我测试没有成功,好像有bug.
建议使用MQTT 5.0 命令行客户端工具。使用命令行上的 MQTTX
,旨在帮助开发者在不需要使用图形化界面的基础上,也能更快的开发和调试 MQTT 服务与应用。
由于是后期被写的博文,图是借官方的。请自行区分一下。
平台安装后的地址
1,平台的地址
- http://127.0.0.1:18083
后台登录 用户名:test 密码:test
Laravel中处理MQTT订阅
1,安装MQTT客户端库
在Laravel项目中安装一个MQTT客户端库。你可以使用Composer来安装 php-mqtt/client:
composer require php-mqtt/client
2, 新建command文件
文件路径:app/Console/Commands/MqttClientCommand.php
这段PHP代码是一个用于处理MQTT消息的命令行工具,它使用了Simps的MQTT客户端库。代码中定义了两个类:MQTTUserConfig 和 MqttClientCommand。
MQTTUserConfig 类定义了一些常量,这些常量用于配置MQTT连接。
MqttClientCommand 类继承自 Illuminate\Console\Command,是一个命令行工具,用于订阅或发布MQTT消息。
<?php
namespace App\Console\Commands;
use App\Http\Controllers\Wxapi\DeviceReportController;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Simps\MQTT\Protocol\Types;
use Simps\MQTT\Protocol\V5;
use Simps\MQTT\Tools\Common;
use Simps\MQTT\Client;
use Simps\MQTT\Config\ClientConfig;
use Simps\MQTT\Hex\ReasonCode;
use Swoole\Coroutine;
use Illuminate\Support\Facades\Redis;
class MQTTUserConfig
{
const SIMPS_MQTT_REMOTE_HOST = '*';
const SIMPS_MQTT_PORT = 1883;
const SIMPS_MQTT_SUBSCRIBE_PORT = 8083;
const SIMPS_MQTT_USER = 'test*';
const SIMPS_MQTT_PASSWORD = 'test*';
}
class MqttClientCommand extends Command
{
protected $signature = 'mqtt:handle {param1}';
protected $description = '订阅物联网mqtt消息 param1:null 订阅消息, param1:public 发布消息';
protected $mqtt ;
const SWOOLE_MQTT_CONFIG = [
'open_mqtt_protocol' => true,
'package_max_length' => 2 * 1024 * 1024,
'connect_timeout' => 5.0,
'write_timeout' => 5.0,
'read_timeout' => 5.0,
];
//模拟设备
const CLiENT_IDs = [
'mqttx_devA',
'mqttx_devB',
'mqttx_devC',
'mqttx_devD'
];
public function __construct()
{
parent::__construct();
}
public function handle()
{
$param1 =$this->argument('param1');
// $param2 =$this->argument('param2');
if ($param1=='subscribe') {
$this->info('启动订阅...');
$this->subscribeMqtt();
} elseif ($param1=='public') {
$this->info('启动发布...');
$this->publishMQTT();
}
echo '\r\n\r\n分配工作执行完成!!!';
}
protected function getTestMQTT5ConnectConfig()
{
$config = new ClientConfig();
$UserConfig = new MQTTUserConfig();
return $config->setUserName($UserConfig::SIMPS_MQTT_USER)
->setPassword($UserConfig::SIMPS_MQTT_PASSWORD)
->setClientId(Client::genClientID())
->setKeepAlive(10)
->setDelay(3000) // 3s
->setMaxAttempts(5)
->setProperties([
'session_expiry_interval' => 60,
'receive_maximum' => 65535,
'topic_alias_maximum' => 65535,
])
->setProtocolLevel(5)
->setSwooleConfig( [
'open_mqtt_protocol' => true,
'package_max_length' => 2 * 1024 * 1024,
'connect_timeout' => 5.0,
'write_timeout' => 5.0,
'read_timeout' => 5.0,
]);
}
private function heartbeat($message) {
if ($message) {
parse_str($message,$array);
$device = $array['imei'];
$hash = ':mqtt:heartbeat:online'.":{$device}";
Redis::expire($hash,30); ##30s有效
Redis::sAdd($hash,1);
}
}
/*
* 订阅
* private function subscribeMqtt(){
Coroutine\run(function () {
$client = new Client('39.108.230.87', 1883, $this->getTestMQTT5ConnectConfig());
....
*/
private function subscribeMqtt(){
Coroutine\run(function () {
$UserConfig = new MQTTUserConfig();
$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, 1883,
$this->getTestMQTT5ConnectConfig());
$will = [
'topic' => 'simps-mqtt/dinweiyi/delete',
'qos' => 1,
'retain' => 0,
'message' => 'byebye',
'properties' => [
'will_delay_interval' => 60,
'message_expiry_interval' => 60,
'content_type' => 'test',
'payload_format_indicator' => true, // false 0 1
],
];
$client->connect(true, $will);
$topics['simps-mqtt/dinweiyi/subscribe_message'] = [
'qos' => 2,
'no_local' => true,
'retain_as_published' => true,
'retain_handling' => 2,
];
$res = $client->subscribe($topics);
$timeSincePing = time();
var_dump($res);
echo '\r\n\r\n connect success !!!';
while (true) {
try {
$buffer = $client->recv();
$message = null;
if ($buffer && $buffer !== true) {
$message = $buffer["message"];
// QoS1 PUBACK
if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) {
$client->send(
[
'type' => Types::PUBACK,
'message_id' => $buffer['message_id'],
],
false
);
}
if ($buffer['type'] === Types::DISCONNECT) {
echo sprintf(
"Broker is disconnected, The reason is %s [%d]\n",
ReasonCode::getReasonPhrase($buffer['code']),
$buffer['code']
);
$client->close($buffer['code']);
break;
}
$reportObj = new DeviceReportController();
$ret = $reportObj->store($message);
var_dump("182>>>",$ret);
unset($reportObj);
}
if ($timeSincePing <= (time() - $client->getConfig()->getKeepAlive())) {
$buffer = $client->ping();
if ($buffer) {
echo 'send ping success ...' ;
$this->heartbeat($message);
$timeSincePing = time();
}
}
} catch (\Throwable $e) {
throw $e;
}
}
});
}
protected function getMessage() {
$client_ids = [
'mqttx_devA',
// 'mqttx_devB',
'mqttx_devC',
'mqttx_devD'
];
$message = [];
$message['clientID'] = self::CLiENT_IDs[array_rand($client_ids)];
$message['time'] = time();
$message['location'] = ["x"=>rand(1000,9999),"y"=>rand(1000,9999)];
return json_encode($message);
}
/*
* 发布
*/
public function publishMQTT() {
Coroutine\run(function () {
$UserConfig = new MQTTUserConfig();
$client = new Client($UserConfig::SIMPS_MQTT_REMOTE_HOST, $UserConfig::SIMPS_MQTT_PORT,
$this->getTestMQTT5ConnectConfig());
$client->connect();
while (true) {
$message = $this->getMessage();
$response = $client->publish(
'simps-mqtt/user/subscribe_message',
$message,
1,
0,
0,
[
'topic_alias' => 1,
'message_expiry_interval' => 12,
]
);
var_dump( 'publishMQTT>>>',$message);
Coroutine::sleep(1);
}
});
}
}
3, 代码流程图
使用Mermaid语法描述的上述PHP代码的流程图:
流程说明:
- 开始:程序启动。
- 构造函数 __construct:初始化命令行工具。
- handle 方法:处理命令行输入。
- param1 参数:根据输入的参数决定是订阅还是发布。
- 调用 subscribeMqtt:如果参数是
subscribe
,则调用此方法。 - 调用 publishMQTT:如果参数是
public
,则调用此方法。 - Coroutine 运行 subscribeMqtt:在协程中运行订阅方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 设置遗嘱消息:设置遗嘱消息,以便在客户端意外断开时发送。
- 订阅主题:订阅特定的MQTT主题。
- 接收消息:持续监听并接收消息。
- 处理消息:对接收到的消息进行处理。
- 心跳函数 heartbeat:检查设备心跳。
- 存储消息:将消息存储到数据库或其他存储系统。
- 是否断开连接:检查客户端是否断开连接。
- 关闭连接:如果断开,则关闭连接。
- Coroutine 运行 publishMQTT:在协程中运行发布方法。
- 创建 MQTT 客户端并连接:创建MQTT客户端并连接到服务器。
- 循环发布消息:循环发布消息。
- 获取测试消息:生成要发布的测试消息。
- 发布消息:将消息发布到MQTT服务器。
- 结束:程序结束。
后台常驻运行
1,php artisan命令在后台运行
- 打开您的终端或SSH到您的服务器。
- 使用nohup命令运行您的Artisan命令进行测试,如下所示
php /www/wwwroot/denwei_laraveladmin/artisan mqtt:handle subscribe
3.命令行的php的版本与web php的版本号要一致
2,使用宝塔的守护进程开启进程
也可以添加守护进程。
以上2种最好是只选一个
测试
打开emqx web ,在浏览器输入http://127.0.0.0.1:18083/#/websocket
主题:
主题跟php代码内的主题是一致的。
Payload:
是发出的字符串。由于在测试中遇到json字符串转换失败。所以选择了组装字符格式。
已发送
会出现发布的主题和内容
检查发送的结果
打开数据库,检查device_report表是否成功。成功应下图所示: