当前位置: 首页 > article >正文

Spring Boot集成MQTT完整示例和常见问题的解决方案

Spring Boot集成MQTT

我将指导您完成将MQTT与Spring Boot集成的过程,提供完整示例和常见问题的解决方案。

集成步骤

添加maven依赖:

<!-- Spring Integration MQTT -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

现在,让我们创建MQTT配置:

package com.example.mqttdemo.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker.url}")
    private String brokerUrl;

    @Value("${mqtt.client.id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Value("${mqtt.topic}")
    private String topic;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        
        options.setServerURIs(new String[] { brokerUrl });
        options.setCleanSession(true);
        
        if (username != null && !username.isEmpty()) {
            options.setUserName(username);
            options.setPassword(password.toCharArray());
        }
        
        factory.setConnectionOptions(options);
        return factory;
    }

    // 出站通道(用于发送消息)
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    // 出站消息的消息处理器
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "-publisher", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(topic);
        return messageHandler;
    }

    // 入站通道(用于接收消息)
    @Bean
    public MessageChannel mqttInboundChannel() {
        return new DirectChannel();
    }

    // 入站消息的消息生产者
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = 
            new MqttPahoMessageDrivenChannelAdapter(clientId + "-subscriber", mqttClientFactory(), topic);
        
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInboundChannel());
        return adapter;
    }
}

让我们创建一个服务来处理MQTT消息:

package com.example.mqttdemo.service;

import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.MessageChannel;

@Service
@Slf4j
@RequiredArgsConstructor
public class MqttService {

    private final MessageChannel mqttOutboundChannel;

    // 向MQTT主题发布消息的方法
    public void publishMessage(String payload) {
        log.info("发布消息: {}", payload);
        Message<String> message = MessageBuilder.withPayload(payload).build();
        mqttOutboundChannel.send(message);
    }

    // 处理传入MQTT消息的方法
    @ServiceActivator(inputChannel = "mqttInboundChannel")
    public void handleMessage(Message<?> message) {
        String payload = message.getPayload().toString();
        log.info("收到消息: {}", payload);
        
        // 根据需要处理消息
        // ...
    }
}

创建一个控制器来暴露REST端点:

package com.example.mqttdemo.controller;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.example.mqttdemo.service.MqttService;

import lombok.RequiredArgsConstructor;

@RestController
@RequestMapping("/api/mqtt")
@RequiredArgsConstructor
public class MqttController {

    private final MqttService mqttService;

    @PostMapping("/publish")
    public ResponseEntity<String> publishMessage(@RequestBody String message) {
        mqttService.publishMessage(message);
        return ResponseEntity.ok("消息发布成功");
    }
}

最后,创建应用程序属性:

# 服务器配置
server.port=8080

# MQTT配置
mqtt.broker.url=tcp://localhost:1883
mqtt.client.id=spring-boot-mqtt-client
mqtt.username=
mqtt.password=
mqtt.topic=test/topic

以及主应用程序类:

package com.example.mqttdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.annotation.IntegrationComponentScan;

@SpringBootApplication
@IntegrationComponentScan
public class MqttDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqttDemoApplication.class, args);
    }
}

测试集成

  1. 在本地机器上启动MQTT代理(如Mosquitto)或使用公共代理
  2. 运行您的Spring Boot应用程序
  3. 使用REST客户端向http://localhost:8080/api/mqtt/publish发送POST请求,并附带消息正文
  4. 使用MQTT客户端(如MQTT Explorer)验证消息是否已发布到主题

常见问题及解决方案

1. 连接被拒绝

问题:尝试连接到MQTT代理时出现Connection refused错误。

解决方案

  • 验证代理URL是否正确
  • 检查代理是否正在运行
  • 确保防火墙设置允许连接
  • 验证端口是否正确(非TLS默认为1883,TLS默认为8883)
// 排查连接问题
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(30); // 增加超时时间
options.setKeepAliveInterval(60); // 增加保持活动间隔
options.setAutomaticReconnect(true); // 启用自动重连

2. 认证失败

问题:连接到代理时认证失败。

解决方案

  • 验证用户名和密码是否正确
  • 检查代理是否需要认证
  • 确保客户端具有必要的权限
// 正确的认证设置
options.setUserName(username);
options.setPassword(password.toCharArray());

3. SSL/TLS连接问题

问题:无法建立与代理的安全连接。

解决方案

  • 使用正确的协议(非安全使用tcp://,安全使用ssl://)
  • 正确配置SSL/TLS属性
// SSL/TLS配置
options.setSocketFactory(SSLSocketFactory.getDefault());

4. 消息未收到

问题:消息已发布但订阅者未收到。

解决方案

  • 验证主题名称是否完全匹配(MQTT主题区分大小写)
  • 检查QoS级别
  • 确保在发布消息之前订阅者已连接
// 设置QoS级别
adapter.setQos(1); // 0: 最多一次, 1: 至少一次, 2: 恰好一次

5. 客户端ID冲突

问题:出现Client ID already in use错误。

解决方案

  • 为每个连接使用唯一的客户端ID
  • 为客户端ID添加随机后缀或时间戳
// 生成唯一的客户端ID
String uniqueClientId = clientId + "-" + System.currentTimeMillis();

高级配置

对于更健壮的应用程序,请考虑以下增强功能:

// 遗嘱消息(LWT)
MqttConnectOptions options = new MqttConnectOptions();
options.setWill("client/status", "Offline".getBytes(), 1, true);

// 持久会话
options.setCleanSession(false);

// 连接回退策略
options.setMaxReconnectDelay(5000); // 重连尝试之间的最大延迟为5秒

这个实现为将MQTT与Spring Boot集成提供了坚实的基础。您可以根据特定需求进行扩展。


http://www.kler.cn/a/590726.html

相关文章:

  • Netty基础—8.Netty实现私有协议栈二
  • 激光slam学习笔记10---ubuntu2004部署运行fastlivo2踩坑记录
  • 【Ratis】ReferenceCountedObject接口的作用及参考意义
  • springboot多种生产打包方式教程
  • 【从零开始学习计算机】计算机网络(一)计算机网络分层结构
  • javaEE————文件IO(1)
  • MySQL使用pxc实现高可用
  • Day34 | 300. 最长递增子序列、674. 最长连续递增序列、718. 最长重复子数组、1143. 最长公共子序列
  • 卓越的用户体验需要智能内容
  • MiddleVR for Unity插件
  • Linux FILE文件操作1-文件指针、文件缓冲区(行缓冲、全缓冲、无缓冲)的验证
  • Java学习------抽象类和接口
  • 图解AUTOSAR_CP_WatchdogDriver
  • 什么是梯度方差和缩放因子
  • Effective C++ 剖析(条款1~9)
  • 前端面试:axios 是否可以取消请求?
  • jmeter分布式原理及实例
  • 流量分析实践
  • Elixir语言的开源贡献
  • 【NLP】2. TF-IDF(Log 形式)