当前位置: 首页 > article >正文

mqtt单次订阅多个主题

这个组件单次只能订阅一个主题,比较局限因此在此组件的基础上升级成本组件

vue连接mqtt实现收发消息组件超级详细_vue mqtt-CSDN博客

<!--
 * @Author: mhf
 * @Date: 2024/10/14 15:18
-->
<template>
  <div>
    <mqttManyComp
      ref="mqttManyComp"
      :topics="topicArr"
      :mqttUrl="mqttUrl"
      :mqttOpts="mqttOpts"
      @message-received="getMqttMessage"
      @mqtt-close="mqttClose"
    />
  </div>
</template>

<script>
import mqttManyComp from "@/views/cockpit/modules/information/mqttManyComp.vue";

export default {
  name: "drawCar",
  components: { mqttManyComp },
  mixins: [],
  props: {},
  computed: {},
  watch: {},
  filters: {},
  data() {
    return {
      topicArr: [],
      mqttUrl: {
        head: "ws",
        host: "",
        port: ,
        tailPath: "mqtt",
      },
      mqttOpts: {
        keepalive: 60,
        clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
        username: "",
        password: "",
        connectTimeout: 10 * 1000,
      },
      radarList: [],
      radarInViewport: [],
      timer: null,
    };
  },
  methods: {
    /**
     * @Event 订阅雷达轨迹主题消息
     * @description:
     * @author: mhf
     * @time: 2024-10-15 10:18:33
     **/
    subscribeTopics() {
      function getTopic(code) {
           return code;
      }

      setTimeout(() => {
        this.$refs.mqttManyComp.initMqtt();
      }, 500);

      setTimeout(() => {
        this.mqttClose();
      }, 1000);
    },

    getMqttMessage(message) {
      setTimeout(() => {
        // todo 这里是 收到的数据
      }, 500);
    },

    mqttClose() {
      /* 关闭 */
      this.assetCodeArr.forEach((code) => {
        this.$refs.mqttManyComp.mqttPublish(`smart_trans/${code}/down/track`, {
          id: code,
          switch: "on",
        });
      });
    },

    mqttEnd() {
      setTimeout(() => {
        this.assetCodeArr.forEach((code) => {
          this.$refs.mqttManyComp.mqttPublish(
            `smart_trans/${code}/down/track`,
            {
              id: code,
              switch: "off",
            },
          );
        });
      }, 1000);
      setTimeout(() => {
        this.bus.$emit("information-remove-car-marker-drawCar");
        this.$refs.mqttManyComp.handleEnd();
      }, 1500);
    },

    listenFlowCanvasNewClose() {
      this.bus.$on("reconnection", () => {
        // this.$message.success("弹窗关闭,重建链接");
        this.mqttClose();
      });
    },
  },
  created() {
  },
  mounted() {
    this.listenFlowCanvasNewClose();
  },
  destroyed() {},
};
</script>

<style lang="scss" scoped></style>
使用方式
<template>
  <div></div>
</template>

<script>
import mqtt from "mqtt";
import { refreshByTime } from "@/utils";

export default {
  name: "mqttManyComp",
  props: {
    topics: {
      type: Array,
      default: () => [],
    }, // 订阅主题数组
    mqttUrl: {
      type: Object,
      default: () => ({
        head: "", // 必须是 ws 或 wss
        host: "", // ip地址
        port: , // 服务端口
        tailPath: "", // 服务路径
      }),
    }, // 服务地址
    mqttOpts: {
      type: Object,
      default: () => ({
        keepalive: 60,
        clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
        username: "你的用户名",
        password: "你的密码",
        connectTimeout: 10 * 1000,
      }),
    }, // 连接配置
  },
  data() {
    return {
      client: "",
      clientCount: 0,
      receivedMessage: null, // 用于存储接收到的消息
      refreshTimer: null, // 定时刷新
    };
  },
  watch: {
    topics(newTopics) {
      if (newTopics.length > 0 && this.client) {
        newTopics.forEach((topic) => {
          this.client.unsubscribe(topic);
        });
        newTopics.forEach((topic) => {
          this.client.subscribe(topic);
        });
      }
    },
  },
  methods: {
    async initMqtt() {
      console.log(this.topics, "mqttManyComp组件接收到的topics");
      let opts = JSON.parse(JSON.stringify(this.mqttOpts));
      this.client = mqtt.connect(
        `${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,
        opts,
      );
      this.client.on("connect", this.handleConnect);
      this.client.on("message", this.handleMessage);
      this.client.on("reconnect", this.handleReconnect);
      this.client.on("error", this.handleError);
    },
    handleConnect() {
      this.topics.forEach((topic) => {
        console.log("mqtt_连接成功", topic);
        this.client.subscribe(topic);
      });
    },
    handleMessage(topic, message) {
      this.receivedMessage = JSON.parse(message?.toString() || {});
      this.$emit("message-received", { topic, message: this.receivedMessage });
    },
    handleReconnect(error) {
      // console.log(`正在第${this.clientCount}次重连`, error, this.topics);
      this.clientCount++;
      if (this.clientCount >= 10) {
        this.client.end();
      }
    },
    handleError(error) {
      // console.log("连接失败", error);
    },
    mqttPublish(topic, message) {
      if (!this.client.connected) {
        console.error("MQTT client is not connected");
        return;
      }
      console.log("mqtt_发送消息", topic, message);
      this.client.publish(topic, JSON.stringify(message));
    },
    refresh() {
      this.refreshTimer = refreshByTime(
        () => {
          this.$emit("mqtt-close");
        },
        false,
        1000 * 25,
      );
    },
    handleEnd() {
      this.client.end(true, {}, () => {
        // this.$message.success("MQTT连接已成功关闭");
      });
    },
  },
  async mounted() {
    await this.refresh();
    // await this.initMqtt();
  },
  beforeDestroy() {
    this.$emit("mqtt-close"); // 关闭mqtt链接需要的前置操作
    setTimeout(() => {
      this.client.end(true, {}, () => {
        console.log("MQTT连接已成功关闭");
      });
    }, 100);
    this.refresh();
  },
};
</script>

<style lang="scss" scoped></style>
 refreshByTime
**
 * 定时刷新
 * @param {function} callback 定时函数
 * @param {boolean} isFirst 是否首次执行
 * @param {number} time   定时请求周期
 */
export function refreshByTime(
  callback = () => {},
  isFirst = true,
  time = 1000 * 60 * 5,
) {
  console.error("refreshByTime 运行", timer);
  let timer = null;

  if (isFirst) {
    callback();
  }

  timer = setInterval(callback, time);

  return () => {
    if (timer) {
      console.error("refreshByTime 移除", timer);
      clearInterval(timer);
      timer = null;
    }
  };
}

 


http://www.kler.cn/news/355662.html

相关文章:

  • LeetCode146. LRU 缓存(2024秋季每日一题 37)
  • Centos7 安装升级最新版Redis7.4.1
  • 《太原理工大学学报》
  • JavaGuide(9)
  • Leetcode 最长连续有效括号
  • 服务器整合:提高数据中心效率的低成本高效策略
  • Linux中安装python3.8
  • userspace 和 kernelspace
  • 【算法】力扣:复制含有随机指针节点的链表
  • Python速成笔记——知识:图像操作
  • 十三、行为型(策略模式)
  • 数据结构顺序表超详细 (通讯录相关联) 含源码 C语言
  • uniapp移动端优惠券! 附源码!!!!
  • 数据库血缘工具学习,使用以及分享
  • 状态设计模式
  • JavaScript 第20章:Web Workers
  • 设计一个高效的日志分析系统:自动检测错误日志的实用指南
  • 计算机网络架构实例
  • Rocketmq 发送消息超时踩坑,消费正常
  • AJAX——HTTP 协议请求报文和响应报文结构