当前位置: 首页 > article >正文

企业级基于SpringBoot的MQTT的构建和使用

基于SpringBoot的MQTT配置及使用

首先要使用EMQX搭建一个MQTT服务器,参考文档:EMQX快速开始

本着开源分享的观点,闲话不多说,直接上代码

导入Maven

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.5.13</version>
        </dependency>

配置文件

spring:
  mqtt:
    #MQTT服务端地址,如果是集群,用逗号隔开
    url: tcp://localhost:1883
    #用户名
    username: root
    #密码
    password: 123
    #clientId代表该服务挂起时的名字,在MQTT服务端clientId不可重复
    clientId: MqttTest
    #MQTT默认的消息订阅主题,可配置多个,‘#’为通配符,详情可自己看一下MQTT文档的通配符说明
    defaultTopic:
      - server1/#
      - server2/#
# 默认Qos,关于消息等级可以查看文档关于Qos的不同等级对信息的约束力度,2是最高,有且只接受一次,由于我的项目中对数据要求很高,所以不考虑资源消耗的情况下,我一般采用2,此处有几个默认主题,就要设置几个Qos,按照顺序对应每个主题的等级
    defaultQos:
      - 2
      - 2

配置类

该类为一个Config类,用于接收上一步在application.yml配置文件中配置的配置信息

@Component
@Slf4j
@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttProperties {
    private String username;

    private String password;

    private String url;

    private String clientId;

    private List<String> defaultTopic;

    private List<Integer> defaultQos;
}

MqttBO类

该类用于构建发送信息的对象

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqttBO {
//    int qos,boolean retained,String topic,String message
// 发送信息的消息等级,要求高的就是2
    private Integer qos;
    // 是否信息保留
    private Boolean retained;
    // 发送信息的主题
    private String topic;
    // 发送信息的主体
    private byte[] message;
}

MQTTClient类

由于在springboot项目中,我们只想创建一个单例的Mqttclient进行连接,所以我们创建一个类似于工厂的类,在工厂Bean加载后,创建单例的client,并做连接,订阅,断开等方法的支持,该client加载完成后,在调用时,Spring会自动注入工厂创建连接的client。

/**
 * @Description MQTT客户端实现工厂,该类主要做工厂生成client,在Bean加载后,创建单例的client。在所有MQTT的Bean中第一位加载
 * 同时做一些连接,订阅,断开等方法的支持
 */
@Slf4j
@Component
public class MyMqttClient {

    private MqttClient client;
    @Autowired
    private MqttProperties mqttProperties;

    /**
     * 在Bean加载后,创建单例的client
     * PostConstruct会在该Bean加载后执行,初始化client
     */
    @PostConstruct
    public void init() {
        try {
            this.client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            log.info("MQTT客户端初始化成功");
        } catch (MqttException e) {
            log.error("MQTT客户端初始化失败: {}", e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    // 连接
    public synchronized void connect(MqttCallBack mqttCallBack) {
        try {
            if (client != null && client.isConnected()) {
                log.info("发现旧连接,正在断开...");
                client.disconnectForcibly(); // 强制断开旧连接
            }
            MqttConnectOptions options = createConnectOptions();
            client = new MqttClient(mqttProperties.getUrl(), mqttProperties.getClientId(), new MemoryPersistence());
            client.setCallback(mqttCallBack);
            client.connect(options);
            log.info("MQTT连接成功");
        } catch (MqttSecurityException e) {
            log.error("MQTT安全异常: {}", e.getMessage(), e);
        } catch (MqttPersistenceException e) {
            log.error("MQTT持久化异常: {}", e.getMessage(), e);
        } catch (MqttException e) {
            log.error("MQTT连接失败: {}", e.getMessage(), e);
        }
    }

    // 创建连接选项
    private MqttConnectOptions createConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(mqttProperties.getUsername());
        options.setPassword(mqttProperties.getPassword().toCharArray());
        // 保留会话
        options.setCleanSession(false);
        // 自动重连,无需在连接断联回调方法中处理重连
        options.setAutomaticReconnect(true);
        // 连接超时时间(秒)
        options.setConnectionTimeout(10);
        // 心跳间隔
        options.setKeepAliveInterval(30);
        // 遗嘱消息
        options.setWill("willTopic", "客户端已断开".getBytes(), 2, false);
        return options;
    }

    // 订阅
    public synchronized void subscribe() {
        try {
            String[] topics = mqttProperties.getDefaultTopic().toArray(new String[0]);
            int[] qoses = mqttProperties.getDefaultQos().stream().mapToInt(Integer::valueOf).toArray();
            client.subscribe(topics, qoses);
            log.info("订阅主题成功: {}", String.join(", ", topics));
        } catch (MqttException e) {
            log.error("订阅主题失败: {}", e.getMessage(), e);
        }
    }

    /**
     * 发布消息
     *
     * @param mqttBO 消息对象
     */
    public synchronized void publish(MqttBO mqttBO) {
        if (mqttBO == null || mqttBO.getTopic() == null || mqttBO.getMessage() == null) {
            log.warn("发布消息失败: 参数不完整");
            return;
        }

        MqttMessage mqttMessage = new MqttMessage();
        if (mqttBO.getQos()==null){
            mqttBO.setQos(2); //默认2
        }
        mqttMessage.setQos(mqttBO.getQos());
        mqttMessage.setRetained(mqttBO.getRetained());
        mqttMessage.setPayload(mqttBO.getMessage());

        MqttTopic mqttTopic = client.getTopic(mqttBO.getTopic());
        try {
            MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
            token.waitForCompletion();
            log.info("消息发布成功: Topic={}, Payload={}", mqttBO.getTopic(), new String(mqttBO.getMessage()));
        } catch (MqttException e) {
            log.error("发布消息失败: {}", e.getMessage(), e);
        }
    }

    // 断开连接
    public synchronized void disConnect() {
        try {
            if (client != null && client.isConnected()) {
                client.disconnect();
                client.close(); // 确保释放资源
                log.info("成功断开连接并释放资源");
            }
        } catch (MqttException e) {
            log.error("断开连接失败: {}", e.getMessage(), e);
        }
    }

    // 重新连接
    public synchronized void reconnect() {
        try {
            if (!client.isConnected()) {
                log.info("尝试重新连接...");
                client.connect(createConnectOptions());
                log.info("重新连接成功");
            }
        } catch (MqttException e) {
            log.error("重新连接失败: {}", e.getMessage(), e);
        }
    }
}

MQTT回调实现类

要想实现接收到发送到mqtt中的信息,我们需要实现回调接口。

/**
 * @Description MQTT的回调函数,此处在接收回调里仅作判断,具体逻辑放在mqttService里面
 */
@Slf4j
@Component
@DependsOn("myMqttClient")
public class MqttCallBack implements MqttCallbackExtended {

    // 在回调中,我们的业务逻辑都放在mqttService里面
    @Autowired
    private MqttService mqttService;
    // 注入已有的单例Bean
    @Autowired
    private MyMqttClient client;
    @Autowired
    private MqttProperties mqttProperties;

    /**
     * 客户端断开连接的回调,断开后,mqtt开启了断联自动重连机制,由于在创建连接时,我们开启了自动重连机制,此处无需处理重连,如果有其他需求可以改写
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("与服务器断开连接,尝试重新连接...");
//        断开后,mqtt开启了断联自动重连机制,此处无需处理
    }

    /**
     * 接受到信息回调
     * @param s
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
    //    此处我们将接收到的信息传递给mqttService处理,其中s为发送到我们这里的具体地址,MqttMessage为Mqtt返回的对象,但是返回的是字节数组,需要自己转,如果传回的是Json,我们需要转化
    String message = new String(MqttMessage.getPayload());
    log.info("接收到信息: Topic={}, Payload={}", s, message);
    // 然后下一步我们可以用FastJson或者其他的Json工具对接收到的json进行处理
    

    }

    /**
     * 通知客户端某条消息已经成功发送到 MQTT 服务器并完成交付。此处没有特殊需求无需处理
     * @param iMqttDeliveryToken
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }

    /**
     * 连接成功后回调,由于我们开启了会话保留机制,在断线后会保留会话的信息,但是首次连接需要订阅主题。
     * @param reconnect
     * @param serverURI
     */
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        // 首次连接成功时订阅
        if (!reconnect) {
            client.subscribe();
            log.info("首次连接,订阅主题成功");
        }else{
            log.info("重新连接成功");
        }
    }
}

MQTTConfig类

MQTTConfig类是为了解耦设立,异步连接mqtt,在项目启动时,会调用mqttInitRunner方法,进行连接。

/**
 * @Description mqttConfig是为了解耦设立,异步连接mqtt
 */
@Configuration
@Slf4j
public class MqttConfig {

    @Autowired
    private MqttCallBack mqttCallBack;
    @Autowired
    private MyMqttClient client;
    @Bean
    public ApplicationRunner mqttInitRunner() {
        return args -> {
            try {
                client.connect(mqttCallBack);
//                连接后,此处会回调connectComplete方法去进行订阅主题
            } catch (Exception e) {
                log.error("MQTT 初始化失败", e);
            }
        };
    }

}

如何使用

导入Maven后,将上述代码复制进项目后,直接启动即可,该配置不唯一,根据项目需求可以更改。
在有的需求中,需要对设备去请求信息,这时就要发送到mqtt中请求信息,然后设备订阅mqtt主题,在MqttService中,注入client,使用client.publish(mqttBO)即可发送到mqtt请求信息,设备接收后,返回信息到Mqtt中,接收信息需要在MqttCallBack中的messageArrived方法中处理。

一切的返回都会在MqttCallBack的messageArrived方法中返回,具体逻辑在MqttService中处理,如果返回的是json,需要自己解析。

ps:
关于技术方案的构建,不同业务场景往往存在多种实现路径,本文所述仅为其中一种实践方案。若读者在具体实施过程中遇到技术选型或架构设计方面的疑问,欢迎在评论区留言探讨,笔者将结合过往经验给予针对性建议。

需要特别说明的是,文中代码源自笔者过往工作实践中的项目积累,应用于真实的企业级开发中,现经脱敏和简化处理后开源,以供读者借鉴。
鉴于当前技术生态的演进态势,传统Java技术栈的市场空间逐渐收窄,该篇文章可能是笔者的封笔作。技术浪潮奔涌不息,青山不改,绿水长流,期待与诸位在更广阔的数字化领域相逢,共同见证科技赋能未来的无限可能。


http://www.kler.cn/a/611035.html

相关文章:

  • Vue3项目中的.vscode文件夹
  • 【软考-架构】9.3、端口扫描-安全体系-网络安全技术和协议
  • 海思烧录工具HITool电视盒子刷机详解
  • 初识MySQL · 约束
  • 【react18】react项目使用mock模拟后台接口
  • 【R语言可视化】人口金字塔
  • 游戏引擎学习第183天
  • 7.5 窗体事件
  • 如何理解FFMPEG两个宏 1.MATCH_PER_TYPE_OPT, 2.MATCH_PER_STREAM_OPT
  • 【Python】编程50个经典操作
  • 图解神经网络和强化学习
  • 回滚日志(Undo Log) 的 Purge
  • 【SpringCloud】认识微服务
  • JVM类加载过程详解
  • 【Python-OpenCV】手势控制贪吃蛇
  • Linux系统管理与编程08:任务驱动综合应用
  • 音视频 三 看书的笔记 MediaPlayer的C/S架构
  • 基于LLM的Agent框架全面比较分析:MGX(MetaGPT X)、AutoGen、OpenHands与秒哒(MiaoDa)
  • WMS WCS系统架构
  • MongoDB 实际工作中应用场景