当前位置: 首页 > article >正文

浅谈Spring Boot MQTT功能并实现手动连接操作

在Spring Boot中使用自身的MQTT功能并实现手动连接操作,可以通过以下步骤实现。以下是基于搜索结果中的信息整理的实现方法:

1. 添加依赖

pom.xml文件中添加Spring Boot的MQTT依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

2. 创建MQTT配置类

创建一个配置类MqttConfiguration,用于加载MQTT连接的相关配置信息:

package com.example.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {
    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setServerURIs(new String[]{hostUrl});
        options.setCleanSession(true);
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        options.setAutomaticReconnect(true); // 自动重连
        factory.setConnectionOptions(options);
        return factory;
    }

    // Getters and Setters
}

3. 创建MQTT客户端工具类

创建一个工具类MqttClientUtil,用于手动管理MQTT连接、订阅和发布消息:

package com.example.util;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqttClientUtil {
    private MqttClient client;

    @Autowired
    private MqttConfiguration mqttConfig;

    public void connect() throws Exception {
        if (client != null && client.isConnected()) {
            System.out.println("客户端已连接,无需重复连接。");
            return;
        }

        client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId(), new MemoryPersistence());
        MqttConnectOptions options = mqttConfig.getMqttClientFactory().getConnectionOptions();
        client.connect(options);
        System.out.println("MQTT连接成功:" + mqttConfig.getHostUrl());
    }

    public void subscribe(String topic, int qos) throws MqttException {
        if (!client.isConnected()) {
            throw new IllegalStateException("未连接到MQTT服务器");
        }
        client.subscribe(topic, qos);
        System.out.println("订阅主题成功:" + topic);
    }

    public void publish(String topic, String message, int qos) throws MqttException {
        if (!client.isConnected()) {
            throw new IllegalStateException("未连接到MQTT服务器");
        }
        MqttMessage mqttMessage = new MqttMessage(message.getBytes());
        mqttMessage.setQos(qos);
        client.publish(topic, mqttMessage);
        System.out.println("消息发布成功:" + topic + " -> " + message);
    }

    public void disconnect() throws MqttException {
        if (client != null && client.isConnected()) {
            client.disconnect();
            System.out.println("MQTT连接已断开");
        }
    }
}

4. 配置文件

application.properties中配置MQTT参数:

spring.mqtt.username=your-username
spring.mqtt.password=your-password
spring.mqtt.hostUrl=tcp://broker.hivemq.com:1883
spring.mqtt.clientId=client-id
spring.mqtt.timeout=10
spring.mqtt.keepalive=60

5. 创建测试接口

创建一个测试接口MqttController,用于测试MQTT连接、订阅和发布消息:

package com.example.controller;

import com.example.util.MqttClientUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/mqtt")
public class MqttController {
    @Autowired
    private MqttClientUtil mqttClientUtil;

    @GetMapping("/connect")
    public String connect() {
        try {
            mqttClientUtil.connect();
            return "MQTT连接成功";
        } catch (Exception e) {
            return "连接失败:" + e.getMessage();
        }
    }

    @GetMapping("/subscribe")
    public String subscribe(@RequestParam String topic) {
        try {
            mqttClientUtil.subscribe(topic, 1);
            return "订阅成功:" + topic;
        } catch (Exception e) {
            return "订阅失败:" + e.getMessage();
        }
    }

    @GetMapping("/publish")
    public String publish(@RequestParam String topic, @RequestParam String message) {
        try {
            mqttClientUtil.publish(topic, message, 1);
            return "消息发布成功:" + topic + " -> " + message;
        } catch (Exception e) {
            return "发布失败:" + e.getMessage();
        }
    }

    @GetMapping("/disconnect")
    public String disconnect() {
        try {
            mqttClientUtil.disconnect();
            return "MQTT连接已断开";
        } catch (Exception e) {
            return "断开失败:" + e.getMessage();
        }
    }
}

6. 测试

启动Spring Boot应用后,可以通过以下方式测试MQTT功能:

连接到MQTT服务器

GET http://localhost:8080/mqtt/connect

订阅主题

GET http://localhost:8080/mqtt/subscribe?topic=test/topic

发布消息

GET http://localhost:8080/mqtt/publish?topic=test/topic&message=Hello%20MQTT

断开连接

GET http://localhost:8080/mqtt/disconnect


http://www.kler.cn/a/551389.html

相关文章:

  • 500. 键盘行 771. 宝石与石头 简单 find接口的使用
  • 机械学习基础-6.更多分类-数据建模与机械智能课程自留
  • 基于kafka、celery的日志收集报警项目
  • NCV4275CDT50RKG 车规级LDO线性电压调节器芯片——专为新能源汽车设计的高可靠性电源解决方案
  • 网络安全特性
  • 【CSS进阶】常见的页面自适应的方法
  • 2.17日学习总结
  • 解决 MyBatis Plus 在 PostgreSQL 中 BigDecimal 精度丢失的问题
  • 什么是 大语言模型中Kernel优化
  • 在不使用 Spring Security 的情况下获取用户登录参数
  • 基于Cilium的全栈eBPF服务网格:颠覆传统Sidecar模式的云原生通信革命
  • Windows程序设计25:MFC中常用窗口类及关系
  • Linux-ISCSI
  • spring boot和spring cloud的关系
  • blender骨骼分层问题:某一层的骨骼怎么移动到第一层
  • Oracle 12c 并发统计信息收集功能:技术解析与实践指南
  • 执行pnpm run dev报错:node:events:491 throw er; // Unhandled ‘error‘ event的解决方案
  • 深入理解大语言模型的文本数据处理流程
  • 避雷,Ubuntu通过ollama本地化部署deepseek,open-webui前端显示
  • MoE架构中的专家选择门控机制:稀疏激活如何实现百倍效率突破?