springboot 引入mqtt
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
配置文件
mqtt:
server:
username: xxx
password: xxx
serverURI: tcp://127.0.0.1:1883
clientId: xxx_${random.int[1000,9999]}
keepAliveInterval: 120
connectionTimeout: 30
topic: topic1,topic2
配置类
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
@Slf4j
@Configuration
public class MqttConfig {
@Value("${mqtt.server.username}")
private String username;
@Value("${mqtt.server.password}")
private String password;
@Value("${mqtt.server.serverURI}")
private String serverURI;
@Value("${mqtt.server.clientId}")
private String clientId;
@Value("${mqtt.server.keepAliveInterval}")
private int keepAliveInterval;
@Value("${mqtt.server.connectionTimeout}")
private int connectionTimeout;
@Value("${mqtt.topic}")
private List<String> topics;
private MqttClient mqttClient;
@Bean
public MqttClient mqttClient(MqttConsumeService mqttConsumeService) {
MqttClientPersistence persistence = new MemoryPersistence();
MqttClient client = null;
try {
client = new MqttClient(serverURI, clientId, persistence);
client.setTimeToWait(5000);
mqttConsumeService.setMqttClient(client);
client.setCallback(mqttConsumeService);
this.mqttClient = client;
} catch (MqttException e) {
log.error("mqtt init error {}", e);
}
return client;
}
public void connectAndSubscribe(){
try {
mqttClient.connect(this.mqttConnectOptions());
} catch (MqttException e) {
log.error("mqtt connection error", e);
}
this.subscribe(mqttClient);
}
private MqttConnectOptions mqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
options.setConnectionTimeout(connectionTimeout);
options.setKeepAliveInterval(keepAliveInterval);
return options;
}
public void subscribe(MqttClient mqttClient) {
topics.forEach(topic -> {
try {
mqttClient.subscribe(topic, 0);
} catch (MqttException e) {
log.error("mqtt subscribe error {}", e);
}
});
}
}
连接类 在spring 启动完成后再连接mqtt 防止过早触发messageArrived方法(messageArrived方法里面可能存放业务处理逻辑 过早触发会导致某些业务service还没注入完成就被调用)
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class MqttConnector implements ApplicationRunner {
@Resource
private MqttConfig mqttConfig;
@Override
public void run(ApplicationArguments args){
mqttConfig.connectAndSubscribe();
}
}
publish service
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
public class MqttPublishService {
@Resource
private MqttClient mqttClient;
public void send(String topic, String message, Integer qos) {
try {
mqttClient.publish(topic, message.getBytes(), qos, false);
} catch (MqttException e) {
throw new RuntimeException("mqtt发布消息异常",e);
}
}
public void send(String topic, String message) {
try {
mqttClient.publish(topic, message.getBytes(), 0, false);
} catch (MqttException e) {
throw new RuntimeException("mqtt发布消息异常",e);
}
}
}
consume service
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class MqttConsumeService implements MqttCallbackExtended {
private MqttClient mqttClient;
public void setMqttClient(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Override
public void connectComplete(boolean b, String s) {
}
@Override
public void connectionLost(Throwable throwable) {
try {
mqttClient.reconnect();
} catch (MqttException e) {
log.error("mqtt重新连接失败",e);
}
}
/**
* topic消费方法
*/
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info(topic);
log.info(mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}