企业级基于SpringBoot的MQTT的构建和使用
基于SpringBoot的MQTT配置及使用
首先要使用EMQX搭建一个MQTT服务器,参考文档:EMQX快速开始
本着开源分享的观点,闲话不多说,直接上代码
导入Maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.5.13</version>
</dependency>
配置文件
spring:
mqtt:
#MQTT服务端地址,如果是集群,用逗号隔开
url: tcp://localhost:1883
#用户名
username: root
#密码
password: 123
#clientId代表该服务挂起时的名字,在MQTT服务端clientId不可重复
clientId: MqttTest
#MQTT默认的消息订阅主题,可配置多个,‘#’为通配符,详情可自己看一下MQTT文档的通配符说明
defaultTopic:
- server1/#
- server2/#
# 默认Qos,关于消息等级可以查看文档关于Qos的不同等级对信息的约束力度,2是最高,有且只接受一次,由于我的项目中对数据要求很高,所以不考虑资源消耗的情况下,我一般采用2,此处有几个默认主题,就要设置几个Qos,按照顺序对应每个主题的等级
defaultQos:
- 2
- 2
配置类
该类为一个Config类,用于接收上一步在application.yml配置文件中配置的配置信息
@Component
@Slf4j
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
private String username;
private String password;
private String url;
private String clientId;
private List<String> defaultTopic;
private List<Integer> defaultQos;
}
MqttBO类
该类用于构建发送信息的对象
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttBO {
// int qos,boolean retained,String topic,String message
// 发送信息的消息等级,要求高的就是2
private Integer qos;
// 是否信息保留
private Boolean retained;
// 发送信息的主题
private String topic;
// 发送信息的主体
private byte[] message;
}
MQTTClient类
由于在springboot项目中,我们只想创建一个单例的Mqttclient进行连接,所以我们创建一个类似于工厂的类,在工厂Bean加载后,创建单例的client,并做连接,订阅,断开等方法的支持,该client加载完成后,在调用时,Spring会自动注入工厂创建连接的client。
/**
* @Description MQTT客户端实现工厂,该类主要做工厂生成client,在Bean加载后,创建单例的client。在所有MQTT的Bean中第一位加载
* 同时做一些连接,订阅,断开等方法的支持
*/
@Slf4j
@Component
public class MyMqttClient {
private MqttClient client;
@Autowired
private MqttProperties mqttProperties;
/**
* 在Bean加载后,创建单例的client
* PostConstruct会在该Bean加载后执行,初始化client
*/
@PostConstruct
public void init() {
try {
this.client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());
log.info("MQTT客户端初始化成功");
} catch (MqttException e) {
log.error("MQTT客户端初始化失败: {}", e.getMessage(), e);
throw new RuntimeException(e);
}
}
// 连接
public synchronized void connect(MqttCallBack mqttCallBack) {
try {
if (client != null && client.isConnected()) {
log.info("发现旧连接,正在断开...");
client.disconnectForcibly(); // 强制断开旧连接
}
MqttConnectOptions options = createConnectOptions();
client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());
client.setCallback(mqttCallBack);
client.connect(options);
log.info("MQTT连接成功");
} catch (MqttSecurityException e) {
log.error("MQTT安全异常: {}", e.getMessage(), e);
} catch (MqttPersistenceException e) {
log.error("MQTT持久化异常: {}", e.getMessage(), e);
} catch (MqttException e) {
log.error("MQTT连接失败: {}", e.getMessage(), e);
}
}
// 创建连接选项
private MqttConnectOptions createConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
// 保留会话
options.setCleanSession(false);
// 自动重连,无需在连接断联回调方法中处理重连
options.setAutomaticReconnect(true);
// 连接超时时间(秒)
options.setConnectionTimeout(10);
// 心跳间隔
options.setKeepAliveInterval(30);
// 遗嘱消息
options.setWill("willTopic", "客户端已断开".getBytes(), 2, false);
return options;
}
// 订阅
public synchronized void subscribe() {
try {
String[] topics = mqttProperties.getDefaultTopic().toArray(new String[0]);
int[] qoses = mqttProperties.getDefaultQos().stream().mapToInt(Integer::valueOf).toArray();
client.subscribe(topics, qoses);
log.info("订阅主题成功: {}", String.join(", ", topics));
} catch (MqttException e) {
log.error("订阅主题失败: {}", e.getMessage(), e);
}
}
/**
* 发布消息
*
* @param mqttBO 消息对象
*/
public synchronized void publish(MqttBO mqttBO) {
if (mqttBO == null || mqttBO.getTopic() == null || mqttBO.getMessage() == null) {
log.warn("发布消息失败: 参数不完整");
return;
}
MqttMessage mqttMessage = new MqttMessage();
if (mqttBO.getQos()==null){
mqttBO.setQos(2); //默认2
}
mqttMessage.setQos(mqttBO.getQos());
mqttMessage.setRetained(mqttBO.getRetained());
mqttMessage.setPayload(mqttBO.getMessage());
MqttTopic mqttTopic = client.getTopic(mqttBO.getTopic());
try {
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
log.info("消息发布成功: Topic={}, Payload={}", mqttBO.getTopic(), new String(mqttBO.getMessage()));
} catch (MqttException e) {
log.error("发布消息失败: {}", e.getMessage(), e);
}
}
// 断开连接
public synchronized void disConnect() {
try {
if (client != null && client.isConnected()) {
client.disconnect();
client.close(); // 确保释放资源
log.info("成功断开连接并释放资源");
}
} catch (MqttException e) {
log.error("断开连接失败: {}", e.getMessage(), e);
}
}
// 重新连接
public synchronized void reconnect() {
try {
if (!client.isConnected()) {
log.info("尝试重新连接...");
client.connect(createConnectOptions());
log.info("重新连接成功");
}
} catch (MqttException e) {
log.error("重新连接失败: {}", e.getMessage(), e);
}
}
}
MQTT回调实现类
要想实现接收到发送到mqtt中的信息,我们需要实现回调接口。
/**
* @Description MQTT的回调函数,此处在接收回调里仅作判断,具体逻辑放在mqttService里面
*/
@Slf4j
@Component
@DependsOn("myMqttClient")
public class MqttCallBack implements MqttCallbackExtended {
// 在回调中,我们的业务逻辑都放在mqttService里面
@Autowired
private MqttService mqttService;
// 注入已有的单例Bean
@Autowired
private MyMqttClient client;
@Autowired
private MqttProperties mqttProperties;
/**
* 客户端断开连接的回调,断开后,mqtt开启了断联自动重连机制,由于在创建连接时,我们开启了自动重连机制,此处无需处理重连,如果有其他需求可以改写
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("与服务器断开连接,尝试重新连接...");
// 断开后,mqtt开启了断联自动重连机制,此处无需处理
}
/**
* 接受到信息回调
* @param s
* @param mqttMessage
* @throws Exception
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
// 此处我们将接收到的信息传递给mqttService处理,其中s为发送到我们这里的具体地址,MqttMessage为Mqtt返回的对象,但是返回的是字节数组,需要自己转,如果传回的是Json,我们需要转化
String message = new String(MqttMessage.getPayload());
log.info("接收到信息: Topic={}, Payload={}", s, message);
// 然后下一步我们可以用FastJson或者其他的Json工具对接收到的json进行处理
}
/**
* 通知客户端某条消息已经成功发送到 MQTT 服务器并完成交付。此处没有特殊需求无需处理
* @param iMqttDeliveryToken
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
/**
* 连接成功后回调,由于我们开启了会话保留机制,在断线后会保留会话的信息,但是首次连接需要订阅主题。
* @param reconnect
* @param serverURI
*/
@Override
public void connectComplete(boolean reconnect, String serverURI) {
// 首次连接成功时订阅
if (!reconnect) {
client.subscribe();
log.info("首次连接,订阅主题成功");
}else{
log.info("重新连接成功");
}
}
}
MQTTConfig类
MQTTConfig类是为了解耦设立,异步连接mqtt,在项目启动时,会调用mqttInitRunner方法,进行连接。
/**
* @Description mqttConfig是为了解耦设立,异步连接mqtt
*/
@Configuration
@Slf4j
public class MqttConfig {
@Autowired
private MqttCallBack mqttCallBack;
@Autowired
private MyMqttClient client;
@Bean
public ApplicationRunner mqttInitRunner() {
return args -> {
try {
client.connect(mqttCallBack);
// 连接后,此处会回调connectComplete方法去进行订阅主题
} catch (Exception e) {
log.error("MQTT 初始化失败", e);
}
};
}
}
如何使用
导入Maven后,将上述代码复制进项目后,直接启动即可,该配置不唯一,根据项目需求可以更改。
在有的需求中,需要对设备去请求信息,这时就要发送到mqtt中请求信息,然后设备订阅mqtt主题,在MqttService中,注入client,使用client.publish(mqttBO)即可发送到mqtt请求信息,设备接收后,返回信息到Mqtt中,接收信息需要在MqttCallBack中的messageArrived方法中处理。
注
一切的返回都会在MqttCallBack的messageArrived方法中返回,具体逻辑在MqttService中处理,如果返回的是json,需要自己解析。
ps:
关于技术方案的构建,不同业务场景往往存在多种实现路径,本文所述仅为其中一种实践方案。若读者在具体实施过程中遇到技术选型或架构设计方面的疑问,欢迎在评论区留言探讨,笔者将结合过往经验给予针对性建议。
需要特别说明的是,文中代码源自笔者过往工作实践中的项目积累,应用于真实的企业级开发中,现经脱敏和简化处理后开源,以供读者借鉴。
鉴于当前技术生态的演进态势,传统Java技术栈的市场空间逐渐收窄,该篇文章可能是笔者的封笔作。技术浪潮奔涌不息,青山不改,绿水长流,期待与诸位在更广阔的数字化领域相逢,共同见证科技赋能未来的无限可能。