mqtt单次订阅多个主题
这个组件单次只能订阅一个主题,比较局限因此在此组件的基础上升级成本组件
vue连接mqtt实现收发消息组件超级详细_vue mqtt-CSDN博客
<!--
* @Author: mhf
* @Date: 2024/10/14 15:18
-->
<template>
<div>
<mqttManyComp
ref="mqttManyComp"
:topics="topicArr"
:mqttUrl="mqttUrl"
:mqttOpts="mqttOpts"
@message-received="getMqttMessage"
@mqtt-close="mqttClose"
/>
</div>
</template>
<script>
import mqttManyComp from "@/views/cockpit/modules/information/mqttManyComp.vue";
export default {
name: "drawCar",
components: { mqttManyComp },
mixins: [],
props: {},
computed: {},
watch: {},
filters: {},
data() {
return {
topicArr: [],
mqttUrl: {
head: "ws",
host: "",
port: ,
tailPath: "mqtt",
},
mqttOpts: {
keepalive: 60,
clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
username: "",
password: "",
connectTimeout: 10 * 1000,
},
radarList: [],
radarInViewport: [],
timer: null,
};
},
methods: {
/**
* @Event 订阅雷达轨迹主题消息
* @description:
* @author: mhf
* @time: 2024-10-15 10:18:33
**/
subscribeTopics() {
function getTopic(code) {
return code;
}
setTimeout(() => {
this.$refs.mqttManyComp.initMqtt();
}, 500);
setTimeout(() => {
this.mqttClose();
}, 1000);
},
getMqttMessage(message) {
setTimeout(() => {
// todo 这里是 收到的数据
}, 500);
},
mqttClose() {
/* 关闭 */
this.assetCodeArr.forEach((code) => {
this.$refs.mqttManyComp.mqttPublish(`smart_trans/${code}/down/track`, {
id: code,
switch: "on",
});
});
},
mqttEnd() {
setTimeout(() => {
this.assetCodeArr.forEach((code) => {
this.$refs.mqttManyComp.mqttPublish(
`smart_trans/${code}/down/track`,
{
id: code,
switch: "off",
},
);
});
}, 1000);
setTimeout(() => {
this.bus.$emit("information-remove-car-marker-drawCar");
this.$refs.mqttManyComp.handleEnd();
}, 1500);
},
listenFlowCanvasNewClose() {
this.bus.$on("reconnection", () => {
// this.$message.success("弹窗关闭,重建链接");
this.mqttClose();
});
},
},
created() {
},
mounted() {
this.listenFlowCanvasNewClose();
},
destroyed() {},
};
</script>
<style lang="scss" scoped></style>
使用方式
<template>
<div></div>
</template>
<script>
import mqtt from "mqtt";
import { refreshByTime } from "@/utils";
export default {
name: "mqttManyComp",
props: {
topics: {
type: Array,
default: () => [],
}, // 订阅主题数组
mqttUrl: {
type: Object,
default: () => ({
head: "", // 必须是 ws 或 wss
host: "", // ip地址
port: , // 服务端口
tailPath: "", // 服务路径
}),
}, // 服务地址
mqttOpts: {
type: Object,
default: () => ({
keepalive: 60,
clientId: "clientId-" + Math.random().toString(16).substr(2, 8),
username: "你的用户名",
password: "你的密码",
connectTimeout: 10 * 1000,
}),
}, // 连接配置
},
data() {
return {
client: "",
clientCount: 0,
receivedMessage: null, // 用于存储接收到的消息
refreshTimer: null, // 定时刷新
};
},
watch: {
topics(newTopics) {
if (newTopics.length > 0 && this.client) {
newTopics.forEach((topic) => {
this.client.unsubscribe(topic);
});
newTopics.forEach((topic) => {
this.client.subscribe(topic);
});
}
},
},
methods: {
async initMqtt() {
console.log(this.topics, "mqttManyComp组件接收到的topics");
let opts = JSON.parse(JSON.stringify(this.mqttOpts));
this.client = mqtt.connect(
`${this.mqttUrl.head}://${this.mqttUrl.host}:${this.mqttUrl.port}/${this.mqttUrl.tailPath}`,
opts,
);
this.client.on("connect", this.handleConnect);
this.client.on("message", this.handleMessage);
this.client.on("reconnect", this.handleReconnect);
this.client.on("error", this.handleError);
},
handleConnect() {
this.topics.forEach((topic) => {
console.log("mqtt_连接成功", topic);
this.client.subscribe(topic);
});
},
handleMessage(topic, message) {
this.receivedMessage = JSON.parse(message?.toString() || {});
this.$emit("message-received", { topic, message: this.receivedMessage });
},
handleReconnect(error) {
// console.log(`正在第${this.clientCount}次重连`, error, this.topics);
this.clientCount++;
if (this.clientCount >= 10) {
this.client.end();
}
},
handleError(error) {
// console.log("连接失败", error);
},
mqttPublish(topic, message) {
if (!this.client.connected) {
console.error("MQTT client is not connected");
return;
}
console.log("mqtt_发送消息", topic, message);
this.client.publish(topic, JSON.stringify(message));
},
refresh() {
this.refreshTimer = refreshByTime(
() => {
this.$emit("mqtt-close");
},
false,
1000 * 25,
);
},
handleEnd() {
this.client.end(true, {}, () => {
// this.$message.success("MQTT连接已成功关闭");
});
},
},
async mounted() {
await this.refresh();
// await this.initMqtt();
},
beforeDestroy() {
this.$emit("mqtt-close"); // 关闭mqtt链接需要的前置操作
setTimeout(() => {
this.client.end(true, {}, () => {
console.log("MQTT连接已成功关闭");
});
}, 100);
this.refresh();
},
};
</script>
<style lang="scss" scoped></style>
refreshByTime
**
* 定时刷新
* @param {function} callback 定时函数
* @param {boolean} isFirst 是否首次执行
* @param {number} time 定时请求周期
*/
export function refreshByTime(
callback = () => {},
isFirst = true,
time = 1000 * 60 * 5,
) {
console.error("refreshByTime 运行", timer);
let timer = null;
if (isFirst) {
callback();
}
timer = setInterval(callback, time);
return () => {
if (timer) {
console.error("refreshByTime 移除", timer);
clearInterval(timer);
timer = null;
}
};
}