当前位置: 首页 > article >正文

springboot 引入mqtt

依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

配置文件

mqtt:
  server:
    username: xxx
    password: xxx
    serverURI: tcp://127.0.0.1:1883
    clientId: xxx_${random.int[1000,9999]}
    keepAliveInterval: 120
    connectionTimeout: 30
  topic: topic1,topic2

配置类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;


@Slf4j
@Configuration
public class MqttConfig {

    @Value("${mqtt.server.username}")
    private String username;

    @Value("${mqtt.server.password}")
    private String password;

    @Value("${mqtt.server.serverURI}")
    private String serverURI;

    @Value("${mqtt.server.clientId}")
    private String clientId;

    @Value("${mqtt.server.keepAliveInterval}")
    private int keepAliveInterval;

    @Value("${mqtt.server.connectionTimeout}")
    private int connectionTimeout;

    @Value("${mqtt.topic}")
    private List<String> topics;

	private MqttClient mqttClient;


    @Bean
    public MqttClient mqttClient(MqttConsumeService mqttConsumeService) {
        MqttClientPersistence persistence = new MemoryPersistence();
        MqttClient client = null;
        try {
            client = new MqttClient(serverURI, clientId, persistence);
            client.setTimeToWait(5000);
            mqttConsumeService.setMqttClient(client);
            client.setCallback(mqttConsumeService);
            this.mqttClient = client;
        } catch (MqttException e) {
            log.error("mqtt init error {}", e);
        }
        return client;
    }

	public void connectAndSubscribe(){
        try {
            mqttClient.connect(this.mqttConnectOptions());
        } catch (MqttException e) {
            log.error("mqtt connection error", e);
        }
        this.subscribe(mqttClient);
    }


    private MqttConnectOptions mqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(connectionTimeout);
        options.setKeepAliveInterval(keepAliveInterval);
        return options;
    }

    public void subscribe(MqttClient mqttClient) {
        topics.forEach(topic -> {
            try {
                mqttClient.subscribe(topic, 0);
            } catch (MqttException e) {
                log.error("mqtt subscribe error {}", e);
            }
        });
    }

}

连接类 在spring 启动完成后再连接mqtt 防止过早触发messageArrived方法(messageArrived方法里面可能存放业务处理逻辑 过早触发会导致某些业务service还没注入完成就被调用)

import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class MqttConnector implements ApplicationRunner {

    @Resource
    private MqttConfig mqttConfig;

    @Override
    public void run(ApplicationArguments args){
        mqttConfig.connectAndSubscribe();
    }
}

publish service

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

@Slf4j
@Service
public class MqttPublishService {

    @Resource
    private MqttClient mqttClient;


    public void send(String topic, String message, Integer qos) {
        try {
            mqttClient.publish(topic, message.getBytes(), qos, false);
        } catch (MqttException e) {
            throw new RuntimeException("mqtt发布消息异常",e);
        }
    }

    public void send(String topic, String message) {
        try {
            mqttClient.publish(topic, message.getBytes(), 0, false);
        } catch (MqttException e) {
            throw new RuntimeException("mqtt发布消息异常",e);
        }
    }
}

consume service

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MqttConsumeService implements MqttCallbackExtended {


    private MqttClient mqttClient;


    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }

    @Override
    public void connectComplete(boolean b, String s) {

    }

    @Override
    public void connectionLost(Throwable throwable) {
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            log.error("mqtt重新连接失败",e);
        }
    }

    /**
     * topic消费方法
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        log.info(topic);
        log.info(mqttMessage.toString());
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}

http://www.kler.cn/a/322733.html

相关文章:

  • 【C++滑动窗口】1248. 统计「优美子数组」|1623
  • Qt 每日面试题 -10
  • 【分布式】万字图文解析——深入七大分布式事务解决方案
  • Go 语言切片初始化与性能优化:使用 cap 参数的重要性
  • AUTOSAR_EXP_ARAComAPI的7章笔记(5)
  • SpringBoot使用AspectJ的@Around注解实现AOP全局记录接口:请求日志、响应日志、异常日志
  • Redis 缓存雪崩、缓存穿透、缓存击穿详解
  • 基于 LangChain 的自动化测试用例的生成与执行
  • Java单体服务和集群分布式SpringCloud微服务的理解
  • 17、网络安全合规审查五大内容
  • vue按钮接收键盘回车事件
  • python:基于django的html订单提交页面
  • 小程序振动
  • 从零开始Ubuntu24.04上Docker构建自动化部署(三)Docker安装Nginx
  • centos8 升级openssh-9.8p1
  • 《C++开源贡献:提升职业竞争力的新途径》
  • 搜索引擎onesearch3实现解释和升级到Elasticsearch v8系列(四)-搜索
  • Spark Job 对象 详解
  • ‌[AI问答] Auto-sklearn‌ 与 scikit-learn 区别
  • 【SpringCloud】环境和工程搭建
  • 数据分析学习之学习路线
  • AI 将会促生哪些新的职业?
  • AT89C51 利用SBIT寻址,并且在内存中实现伪动态密码的混淆
  • gRPC协议简介
  • C++的动态数组
  • 统信服务器操作系统【targetcli部署】