当前位置: 首页 > article >正文

vue3中使用mqtt数据传输(封装)

使用版本

"mqtt": "^5.8.0",

安装指令

npm install mqtt --save
------
yarn add mqtt

介绍mqtt

参考使用文档

配置

connection: {
  protocol: "ws",
  host: "broker.emqx.io",
  port: 8083,
  endpoint: "/mqtt",
  clean: true,
  connectTimeout: 30 * 1000, // ms
  reconnectPeriod: 4000, // ms
  clientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),
  // 随机数 每次不能重复
  username: "emqx_test",
  password: "emqx_test",
},

连接

import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
  // 订阅主题
  
})

订阅主题

client.subscribe(topic, { qos: 1 }, (err) => {
  if (!err) {
    console.log('订阅成功')
  } else {
    console.log('消息订阅失败!')
  }
})

消息发布

给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {
  if (!err) {
    console.log('发送成功')
    client.subscribe(topic, { qos: 1 }, (err) => {
      if (!err) {
        console.log('订阅成功')
      } else {
        console.log('消息订阅失败!')
      }
    })
  } else {
    console.log('消息发送失败!')
  }
})

取消订阅

client.unsubscribe(topicList, (error) => {
  console.log('主题为' + topicList + '取消订阅成功', error)
})

断开连接

export function unconnect() {
  client.end()
  client = null
  // Message.warning('服务器已断开连接!')
  console.log('服务器已断开连接!')
}

mqtt封装使用(ts版)

import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
  clientId: string;
}

interface SubscribeOptions {
  topic: string;
  callback: (topic: string, message: string) => void;
  subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
  topic: string;
  message: string;
}

class Mqtt {
  private static instance: Mqtt;
  private client: MqttClient | undefined;
  private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};
  private pendingSubscriptions: SubscribeOptions[] = [];
  private pendingPublications: PublishOptions[] = [];
  private isConnected: boolean = false;

  private constructor(url?: string) {
    if (url) {
      this.connect(url);
    }
  }

  public static getInstance(url?: string): Mqtt {
    if (!Mqtt.instance) {
      Mqtt.instance = new Mqtt(url);
    } else if (url && !Mqtt.instance.client) {
      Mqtt.instance.connect(url);
    }
    return Mqtt.instance;
  }

  private connect(url: string): void {
    console.log(url, clientOptions);
    if (!this.client) {
      this.client = mqtt.connect(url, clientOptions);
      this.client.on('connect', this.onConnect);
      this.client.on('reconnect', this.onReconnect);
      this.client.on('error', this.onError);
      this.client.on('message', this.onMessage);
    }
  }

  public disconnect(): void {
    if (this.client) {
      this.client.end();
      this.client = undefined;
      this.subscribeMembers = {};
      this.isConnected = false;
      console.log(`服务器已断开连接!`);
    }
  }

  public subscribe({ topic, callback }: SubscribeOptions): void {
    if (this.isConnected) {
      this.client?.subscribe(topic, { qos: 1 }, error => {
        if (error) {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);
        } else {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);
        }
      });
      this.subscribeMembers[topic] = callback;
    } else {
      this.pendingSubscriptions.push({ topic, callback });
    }
  }

  public unsubscribe(topic: string): void {
    if (!this.client) {
      return;
    }
    this.client.unsubscribe(topic, error => {
      if (error) {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);
      } else {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);
      }
    });
    this.subscribeMembers[topic] = undefined;
  }

  public publish({ topic, message }: PublishOptions): void {
    if (this.isConnected) {
      this.client?.publish(topic, message, { qos: 1 }, e => {
        if (e) {
          console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
        }
      });
    } else {
      this.pendingPublications.push({ topic, message });
    }
  }

  private onConnect = (e: any): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);
    this.isConnected = true;
    this.processPendingSubscriptions();
    this.processPendingPublications();
  };

  private onReconnect = (): void => {
    console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);
    this.isConnected = false;
  };

  private onError = (error: Error): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);
    this.isConnected = false;
  };

  private onMessage = (topic: string, message: Buffer): void => {
    // console.log(
    //   `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,
    //   message.toString(),
    // );
    const callback = this.subscribeMembers?.[topic];
    callback?.(topic, message.toString());
  };

  private processPendingSubscriptions(): void {
    while (this.pendingSubscriptions.length > 0) {
      const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
      this.subscribe({ topic, callback, subscribeOption });
    }
  }

  private processPendingPublications(): void {
    while (this.pendingPublications.length > 0) {
      const { topic, message } = this.pendingPublications.shift()!;
      this.publish({ topic, message });
    }
  }
}

const clientOptions: ClientOptions = {
  clean: true,
  connectTimeout: 500,
  protocolVersion: 5,
  rejectUnauthorized: false,
  username: 'admin',
  password: 'Anjian-emqx',
  clientId: `client-${Date.now()}`
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);

注意:

  1. 环境配置
    .env.test
VITE_OTHER_SERVICE_BASE_URL= `{
  "mqtt": "ws://192.168.11.14:8083/mqtt"
}`
  1. qos设置 前后端统一为1
    在这里插入图片描述

http://www.kler.cn/a/376982.html

相关文章:

  • 二十三种设计模式-原型模式
  • Vscode辅助编码AI神器continue插件
  • C++和Python中负数取余结果的区别
  • Boost.Asio 同步读写及客户端 - 服务器实现详解
  • 内网基础-防火墙-隧道技术
  • 计算机毕业设计Python中华古诗词知识图谱可视化 古诗词智能问答系统 古诗词数据分析 古诗词情感分析模型 自然语言处理NLP 机器学习 深度学习
  • 使用Postman进行API测试
  • 论文翻译 | Ignore Previous Prompt: Attack Techniques For Language Models
  • 【OD-支持在线评测】周末爬山(200分)
  • 移植 AWTK 到 纯血鸿蒙 (HarmonyOS NEXT) 系统 (2) - 移植 nanovg
  • 《深入浅出HTTPS​​​​》读书笔记(4):密码学
  • Flutter报错信息Unhandled Exception: Binding has not yet been initialized.
  • Facebook直播按钮缺失现象的深入分析
  • expand,None索引,permute【pytorch】
  • 数据结构之——选择树
  • leetcode hot100【LeetCode 322. 零钱兑换】java实现
  • Linux下Nginx的安装与使用
  • 红队-shodan搜索引擎篇
  • k8s 小版本升级
  • VS+Qt解决提升控件后,包含头文件格式不对问题处理
  • C++设计模式结构型模式———装饰模式
  • 房贷利率定价调整机制变更的一点理解
  • 数学建模学习(134):使用Python基于WISP的多准则决策分析
  • 练习LabVIEW第三十四题
  • 我们来学mysql -- 查询成本之索引选择(原理篇)
  • 政策推动下的少儿编程行业规范发展:从校外到校内的全方位布局