浅谈Spring Boot MQTT功能并实现手动连接操作
在Spring Boot中使用自身的MQTT功能并实现手动连接操作,可以通过以下步骤实现。以下是基于搜索结果中的信息整理的实现方法:
1. 添加依赖
在pom.xml
文件中添加Spring Boot的MQTT依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
2. 创建MQTT配置类
创建一个配置类MqttConfiguration
,用于加载MQTT连接的相关配置信息:
package com.example.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
@Configuration
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfiguration {
private String username;
private String password;
private String hostUrl;
private String clientId;
private int timeout;
private int keepalive;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setServerURIs(new String[]{hostUrl});
options.setCleanSession(true);
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true); // 自动重连
factory.setConnectionOptions(options);
return factory;
}
// Getters and Setters
}
3. 创建MQTT客户端工具类
创建一个工具类MqttClientUtil
,用于手动管理MQTT连接、订阅和发布消息:
package com.example.util;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttClientUtil {
private MqttClient client;
@Autowired
private MqttConfiguration mqttConfig;
public void connect() throws Exception {
if (client != null && client.isConnected()) {
System.out.println("客户端已连接,无需重复连接。");
return;
}
client = new MqttClient(mqttConfig.getHostUrl(), mqttConfig.getClientId(), new MemoryPersistence());
MqttConnectOptions options = mqttConfig.getMqttClientFactory().getConnectionOptions();
client.connect(options);
System.out.println("MQTT连接成功:" + mqttConfig.getHostUrl());
}
public void subscribe(String topic, int qos) throws MqttException {
if (!client.isConnected()) {
throw new IllegalStateException("未连接到MQTT服务器");
}
client.subscribe(topic, qos);
System.out.println("订阅主题成功:" + topic);
}
public void publish(String topic, String message, int qos) throws MqttException {
if (!client.isConnected()) {
throw new IllegalStateException("未连接到MQTT服务器");
}
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
client.publish(topic, mqttMessage);
System.out.println("消息发布成功:" + topic + " -> " + message);
}
public void disconnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
System.out.println("MQTT连接已断开");
}
}
}
4. 配置文件
在application.properties
中配置MQTT参数:
spring.mqtt.username=your-username
spring.mqtt.password=your-password
spring.mqtt.hostUrl=tcp://broker.hivemq.com:1883
spring.mqtt.clientId=client-id
spring.mqtt.timeout=10
spring.mqtt.keepalive=60
5. 创建测试接口
创建一个测试接口MqttController
,用于测试MQTT连接、订阅和发布消息:
package com.example.controller;
import com.example.util.MqttClientUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttClientUtil mqttClientUtil;
@GetMapping("/connect")
public String connect() {
try {
mqttClientUtil.connect();
return "MQTT连接成功";
} catch (Exception e) {
return "连接失败:" + e.getMessage();
}
}
@GetMapping("/subscribe")
public String subscribe(@RequestParam String topic) {
try {
mqttClientUtil.subscribe(topic, 1);
return "订阅成功:" + topic;
} catch (Exception e) {
return "订阅失败:" + e.getMessage();
}
}
@GetMapping("/publish")
public String publish(@RequestParam String topic, @RequestParam String message) {
try {
mqttClientUtil.publish(topic, message, 1);
return "消息发布成功:" + topic + " -> " + message;
} catch (Exception e) {
return "发布失败:" + e.getMessage();
}
}
@GetMapping("/disconnect")
public String disconnect() {
try {
mqttClientUtil.disconnect();
return "MQTT连接已断开";
} catch (Exception e) {
return "断开失败:" + e.getMessage();
}
}
}
6. 测试
启动Spring Boot应用后,可以通过以下方式测试MQTT功能:
连接到MQTT服务器:
GET http://localhost:8080/mqtt/connect
订阅主题:
GET http://localhost:8080/mqtt/subscribe?topic=test/topic
发布消息:
GET http://localhost:8080/mqtt/publish?topic=test/topic&message=Hello%20MQTT
断开连接:
GET http://localhost:8080/mqtt/disconnect