使用阿里云 MQTT 服务进行消息传输的基本实践
使用阿里云 MQTT 服务进行消息传输的基本实践
文章目录
- 使用阿里云 MQTT 服务进行消息传输的基本实践
- 1、项目背景
- 2、引入依赖
- 3、配置阿里云 MQTT 服务
- 4、动刷新配置
- 5、消息发送示例
- 6、总结
注意:实际上我的项目是 Spring Cloud Alibaba 微服务项目,并且配置写在了 Nacos 上面!
在现代应用开发中、消息传输是分布式系统中不可或缺的一部分. 阿里云的 MQTT 服务提供了一种高效且可靠的消息传输方式、适用于物联网 (IoT) 等场景. 本文将结合代码示例、介绍如何在 Spring Boot 项目中集成阿里云 MQTT 服务、并分享一些最佳实践.
1、项目背景
我们正在开发一个基于 Spring Boot 的应用、该应用需要使用阿里云的 MQTT 服务进行消息传输. 为了实现这一目标、我们需要配置 MQTT 服务的连接信息、并编写相关的消息发送逻辑.
2、引入依赖
<!-- 云消息队列 MQTT 版 -->
<dependency>
<groupId>com.alibaba.mqtt</groupId>
<artifactId>server-sdk</artifactId>
<version>1.0.8.Final</version>
</dependency>
3、配置阿里云 MQTT 服务
首先、我们需要创建一个配置类来存储 MQTT 服务的相关配置. 这包括接入点、端口、实例 ID、AccessKey 和 SecretKey 等信息.
4、动刷新配置
为了在配置变更时自动刷新 MQTT 连接、我们可以使用 @EventListener
监听 EnvironmentChangeEvent
事件. 这样、当配置发生变化时、MQTT 连接会自动重新初始化.
下面是『3、配置阿里云 MQTT 服务』和『4、动刷新配置』的代码示例:
package com.kumy.requrchase.treasure.conf;
import com.alibaba.mqtt.server.ServerProducer;
import com.alibaba.mqtt.server.config.ChannelConfig;
import com.alibaba.mqtt.server.config.ProducerConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 阿里云-云消息队列 MQTT 版配置类
* - <a href="https://help.aliyun.com/zh/apsaramq-for-mqtt/mqtt-upgraded/developer-reference/api-operations-and-parameters">云消息队列 MQTT 版文档地址</a>
* <a href="https://help.aliyun.com/zh/apsaramq-for-mqtt/mqtt-upgraded/product-overview/service-introduction-terms?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3">基本概念</a>
* 此类用于表示云消息队列 MQTT 版的配置信息.
* 包含接入点、协议端口、实例 ID、AccessKey 和 SecretKey 等字段.
*
* @author zibo
* @date 2024/10/25
* @slogan 慢慢学、不要停.
*/
@Data
@Slf4j
@Configuration
@RefreshScope
@ConfigurationProperties(prefix = "aliyuncs-mqtt")
public class AliyuncsMqttConfig {
/**
* 云消息队列 MQTT 版实例的接入点.
* 客户端通过接入点连接云消息队列 MQTT 版服务端.
*/
private String domain;
/**
* 协议端口.
* 端口和使用的协议必须匹配、云端 SDK 中、该参数值固定为 5672.
*/
private Integer port = 5672;
/**
* 您在云消息队列 MQTT 版控制台创建的实例的 ID.
*/
private String instanceId;
/**
* 您在阿里云账号管理控制台中创建的 AccessKey ID、用于身份认证.
*/
private String accessKey;
/**
* 您在阿里云账号管理控制台中创建的 AccessKey Secret、用于身份认证.
* 仅在 Signature 鉴权模式下需要设置.
*/
private String secretKey;
/**
* 此处参数内容为示意. firstTopic 是 MQTT 消息的一级 topic、需要在控制台申请才能使用.
* 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败、MQTT 服务端会断开客户端连接.
*/
private String firstTopic;
/**
* MQTT 的二级 Topic、甚至三级 Topic 都是父级 Topic 下的子类.
* 使用时无需在控制台创建、直接在代码中设置即可.
* 命名格式为: 父级 Topic 和各子级 Topic 间均使用正斜线 (/) 隔开、<父级 Topic 名称>/<二级 Topic 名称>/<三级 Topic 名称>、例如、SendMessage/demo/producer.
* 需要注意的是云消息队列 MQTT 版限制父级 Topic 和子级 Topic 的总长度为 64 个字符、如果超出长度限制将会导致客户端异常.
* 您可以使用 MQTT.fx 客户端验证子级 Topic 发布和订阅消息.
* -
* 不再配置、不同业务场景下、在代码中设置.
*/
// private String secondTopic;
@Bean
@RefreshScope
public ServerProducer serverProducer() throws IOException、TimeoutException {
return createServerProducer();
}
ServerProducer createServerProducer() throws IOException、TimeoutException {
// 创建 ChannelConfig 实例
ChannelConfig channelConfig = new ChannelConfig();
channelConfig.setDomain(domain);
channelConfig.setPort(port);
channelConfig.setInstanceId(instanceId);
channelConfig.setAccessKey(accessKey);
channelConfig.setSecretKey(secretKey);
// 创建 ServerProducer 实例
ServerProducer serverProducer = new ServerProducer(channelConfig、new ProducerConfig());
// 启动 ServerProducer
serverProducer.start();
// 实例创建日志
log.info("MQTT 配置下 ServerProducer 实例创建日志、配置: {}"、serverProducer.getChannelConfig());
// 返回 ServerProducer 实例
return serverProducer;
}
}
@Slf4j
@Component
class ServerProducerRefresher {
private final AliyuncsMqttConfig aliyuncsMqttConfig;
private ServerProducer serverProducer;
@Autowired
public ServerProducerRefresher(AliyuncsMqttConfig aliyuncsMqttConfig、ServerProducer serverProducer) {
this.aliyuncsMqttConfig = aliyuncsMqttConfig;
this.serverProducer = serverProducer;
}
@EventListener(EnvironmentChangeEvent.class)
public synchronized void onEnvironmentChange() throws IOException、TimeoutException {
// 停止旧的 ServerProducer
if (serverProducer != null) {
// 实例停止日志
log.info("MQTT 配置下 ServerProducer 实例停止日志、旧配置: {}"、serverProducer.getChannelConfig());
serverProducer.stop();
}
// 重新初始化 ServerProducer
serverProducer = aliyuncsMqttConfig.createServerProducer();
}
}
5、消息发送示例
在控制器中、我们可以通过 ServerProducer
发送消息. 以下是一个简单的示例:
package com.example.mqtt.controller;
import com.alibaba.mqtt.server.ServerProducer;
import com.example.mqtt.config.AliyuncsMqttConfig;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Resource
private ServerProducer serverProducer;
@Resource
private AliyuncsMqttConfig aliyuncsMqttConfig;
@RequestMapping("/send")
public String sendMessage() {
try {
serverProducer.sendMessage(aliyuncsMqttConfig.getFirstTopic() + "/test"、"hello world".getBytes());
} catch (IOException e) {
throw new RuntimeException("消息发送失败"、e);
}
return "消息发送成功";
}
}
6、总结
通过上述步骤、我们成功地在 Spring Boot 项目中集成了阿里云的 MQTT 服务、能够实现消息的发送和接收. 在实际项目中、您可以根据业务需求进一步扩展和优化代码. 同时、确保配置的安全性、避免敏感信息泄露.
希望这篇博客能帮助您更好地理解和使用阿里云的 MQTT 服务. 如果您有任何问题或建议、欢迎在下方留言讨论.