当前位置: 首页 > article >正文

04 - 尚硅谷 - MQTT 客户端编程

1.在Java中使用MQTT

1.1 Eclipse Paho Java Client

具体步骤:

1、创建一个Spring Boot项目,添加如下依赖

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.0.5</version>
</parent>

<dependencies>

    <!-- spring boot整合junit单元测试的起步依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>

    <!-- mqtt java客户端依赖 -->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>

</dependencies>

2、建立连接代码实现

@Test
public void createConnection() throws MqttException {

    // 定义链接相关参数
    String broker = "tcp://localhost:1883";
    String username = "zhangsan";
    String password = "123";
    String clientid = "mqtt_java_client_01";

    // 创建MqttJava客户端对象
    // MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息
    MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());   
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    client.connect(options);

    // 阻塞当前线程
    while (true) ;
}

3、发布消息代码演示

@Test
public void sendMessage() throws MqttException {

    // 定义链接相关参数
    String broker = "tcp://localhost:1883";
    String username = "zhangsan";
    String password = "123";
    String clientid = "mqtt_java_client_01";

    // 创建MqttJava客户端对象
    MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    client.connect(options);

    // 创建消息对象QoS
    String content = "hello mqtt";
    MqttMessage message = new MqttMessage(content.getBytes());
    message.setQos(2);
    message.setRetained(true);

    // 发送消息
    client.publish("a/c" , message);

    // 关闭链接释放资源
    client.disconnect();
    client.close();

}

4、订阅主题获取消息

@Test
public void receiveMessage() throws MqttException {

    // 定义链接相关参数
    String broker = "tcp://localhost:1883";
    String username = "zhangsan";
    String password = "123";
    String clientid = "mqtt_java_client_02";

    // 创建MqttJava客户端对象
    MqttClient client = new MqttClient(broker, clientid , new MemoryPersistence());
    MqttConnectOptions options = new MqttConnectOptions();
    options.setUserName(username);
    options.setPassword(password.toCharArray());

    // 添加回调函数获取主题消息
    client.setCallback(new MqttCallback() {
        
        @Override
        public void connectionLost(Throwable cause) {  // 连接丢失时被调用
            System.out.println("connectionLost: " + cause.getMessage());
        }

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {  // 接收到消息时被调用
            System.out.println("topic: " + topic);
            System.out.println("Qos: " + message.getQos());
            System.out.println("message content: " + new String(message.getPayload()));
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {  // 消息接收完成时被调用
            System.out.println("deliveryComplete---------" + token.isComplete());
        }
        
    });

    // 订阅主题
    client.connect(options);
    client.subscribe("a/d" , 2);

    while(true) ;

}

1.2 spring-integration-mqtt

基础环境搭建

1、创建一个Spring Boot项目,并加入如下依赖:

<dependencies>
	
    <!-- spring boot项目web开发的起步依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

	<!-- spring boot项目集成消息中间件基础依赖 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>

	<!-- spring boot项目和mqtt客户端集成起步依赖 -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
        <version>5.4.3</version>
    </dependency>

    <!-- lombok依赖 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <!-- fastjson依赖 -->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>

</dependencies>

2、编写启动类

@EnableConfigurationProperties(value = MqttConfigurationProperties.class)
@SpringBootApplication
public class MqttDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqttDemoApplication.class , args) ;
    }

}

3、在application.yml文件中添加如下配置

spring:
  mqtt:
    username: zhangsan
    password: 123
    url: tcp://localhost:1883
    subClientId: sub_client_id_123
    subTopic: atguigu/iot/lamp/line
    pubClientId: pub_client_id_123

4、创建实体类读取自定义配置

@Data
@ConfigurationProperties(prefix = "spring.mqtt")
public class MqttConfigurationProperties {

    private String username;
    private String password;
    private String url;
    private String subClientId ;
    private String subTopic ;
    private String pubClientId ;

}

5、创建配置类配置链接工厂

@Configuration
public class MqttConfiguration {

    @Autowired
    private MqttConfigurationProperties mqttConfigurationProperties ;

    @Bean
    public MqttPahoClientFactory mqttClientFactory(){

        // 创建客户端工厂
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

        // 创建MqttConnectOptions对象
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(mqttConfigurationProperties.getUsername());
        options.setPassword(mqttConfigurationProperties.getPassword().toCharArray());
        options.setServerURIs(new String[]{mqttConfigurationProperties.getUrl()});
        factory.setConnectionOptions(options);

        // 返回
        return factory;
    }

}

订阅主题获取消息

具体步骤:

1、配置入站适配器

@Configuration
public class MqttInboundConfiguration {

    @Autowired
    private MqttConfigurationProperties mqttConfigurationProperties ;

    @Autowired
    private ReceiverMessageHandler receiverMessageHandler;

    /**
     * 配置消息传输通道
     * @return
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * 配置入站适配器
     */
    @Bean
    public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter  =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfigurationProperties.getUrl() ,
                        mqttConfigurationProperties.getSubClientId() ,
                        mqttPahoClientFactory , mqttConfigurationProperties.getSubTopic().split(",")) ;
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter ;
    }

    /**
     * 配置入站消息处理器
     * @return
     */
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler messageHandler() {
        return this.receiverMessageHandler ;
    }

}

2、定义监听主题消息的处理器

@Component
public class ReceiverMessageHandler implements MessageHandler {

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        MessageHeaders headers = message.getHeaders();
        String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
        if("atguigu/iot/lamp/line".equals(receivedTopicName)) {
            System.out.println("接收到消息:" + message.getPayload());
        }
    }

}

测试:通过MQTTX向atguigu/iot/lamp/line主题发送消息

向指定主题发送消息

具体步骤:

1、配置出站消息处理器

@Configuration
public class MqttOutboundConfiguration {

    @Autowired
    private MqttConfigurationProperties mqttConfigurationProperties ;

    @Autowired
    private MqttPahoClientFactory pahoClientFactory ;

    @Bean
    public MessageChannel mqttOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutputChannel")
    public MessageHandler mqttOutboundMassageHandler() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfigurationProperties.getUrl() ,
                mqttConfigurationProperties.getPubClientId() , pahoClientFactory ) ;
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(0);
        messageHandler.setDefaultTopic("default");
        return messageHandler ;
    }

}

2、定义发送消息的网关接口

@MessagingGateway(defaultRequestChannel = "mqttOutputChannel")
public interface MqttGateway {

    /**
     * 发送mqtt消息
     * @param topic 主题
     * @param payload 内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送包含qos的消息
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     *          * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
     *          * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
     *          * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
    
}

3、定义发送消息的服务类

@Component
@AllArgsConstructor
public class MqttMessageSender {

    private MqttGateway mqttGateway;

    /**
     * 发送mqtt消息
     * @param topic 主题
     * @param message 内容
     */
    public void send(String topic, String message) {
        mqttGateway.sendToMqtt(topic, message);
    }

    /**
     * 发送包含qos的消息
     * @param topic 主题
     * @param qos 质量
     * @param message 消息体
     */
    public void send(String topic, int qos, byte[] message){
        mqttGateway.sendToMqtt(topic, qos, message);
    }
}

3.智能灯泡案例

需求:

1、智能灯泡设备上线以后向MQTT服务端发送消息,后端服务从MQTT中获取消息记录设备信息到数据库中

2、后端微服务向MQTT服务端发送开灯或者关灯消息,设备端从MQTT中获取消息控制灯泡的开和关

3、设备端对灯泡进行开和关操作的时候向MQTT中发送消息,后端服务获取MQTT消息记录灯泡的开关状态

3.1 环境准备

具体步骤:

1、创建对应的数据库表

-- 智能灯泡设备表
CREATE TABLE `tb_lamp` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `deviceId` varchar(50) DEFAULT NULL,
  `status` int DEFAULT NULL COMMENT '1:上线  0:下线',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ,
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

-- 智能灯泡设备状态表
CREATE TABLE `tb_lamp_status` (
  `id` int NOT NULL AUTO_INCREMENT,
  `deviceId` varchar(50) DEFAULT NULL,
  `status` int DEFAULT NULL COMMENT '0: 关灯   1:开灯',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

2、在spring-integration-mqtt案例中加入如下依赖

<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.5.3.1</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.32</version>
</dependency>

3、在application.yml文件中加入如下依赖

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://192.168.136.147:3306/lamp_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
    username: root
    password: 1234

mybatis-plus:
  configuration:
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    map-underscore-to-camel-case: true
  mapper-locations: classpath*:mapper/*Mapper.xml

4、通过mybatis的逆向工程生成tb_lamp和tb_lamp_status表对应的基础代码

5、在启动类上添加@MapperScan注解指定Mapper接口的包路径

3.2 服务端获取设备上线消息

接口说明

接口一:设备上线

当终端设备连接上EMQX以后,发送上线消息到EMQX服务端,说明如下:

主题: atguigu/iot/lamp/line
消息内容:
    {
        "deviceId": "xxxxxx",
        "online": 1
    }
数据说明:
	deviceId: 设备id
	online:   上线状态,1表示上线,0表示离线

业务代码

ReceiverMessageHandler类的代码进行如下改造:

@Component
public class ReceiverMessageHandler implements MessageHandler {

    @Autowired
    private TbLampService tbLampService ;

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        MessageHeaders headers = message.getHeaders();
        String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
        if("atguigu/iot/lamp/line".equals(receivedTopicName)) {
            tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态
        }
    }

}

TbLampServiceImpl类的代码进行如下改造:

@Service
public class TbLampServiceImpl extends ServiceImpl<TbLampMapper, TbLamp> implements TbLampService {

    @Override
    public void updateLampOnlineStatus(String jsonInfo) {

        // 解析消息获取设备id和上线状态
        Map<String ,  Object> map = JSON.parseObject(jsonInfo, Map.class);
        String deviceId = map.get("deviceId").toString();
        Integer status = Integer.parseInt(map.get("online").toString());

        // 根据设备的id查询设备数据
        LambdaQueryWrapper<TbLamp> lambdaQueryWrapper = new LambdaQueryWrapper<>() ;
        lambdaQueryWrapper.eq(TbLamp::getDeviceid , deviceId) ;
        TbLamp tbLamp = this.getOne(lambdaQueryWrapper);
        if(tbLamp == null) {        // 设备不存在,新增设备
           tbLamp = new TbLamp() ;
           tbLamp.setDeviceid(deviceId);
           tbLamp.setStatus(status);
           this.save(tbLamp) ;
        }else {     // 设备已经存在,修改设备的状态
            tbLamp.setStatus(status);
            tbLamp.setUpdateTime(new Date());
            this.updateById(tbLamp) ;
        }
    }

}

3.3 服务端发送关灯开灯消息到MQTT

接口说明

接口三:后端发送消息控制智能灯泡开关

后端可以发送控制灯泡状态消息到EMQX中,设备端监听指定主题获取消息,控制灯泡的开关状态,说明如下:

主题: atguigu/iot/lamp/server/status
消息内容:
	{
		"deviceId": "xxxxxx",
		"status": 0
	}
数据说明:		
	status:	0:关灯   , 1:开灯

业务代码

@RestController
@RequestMapping(value = "/api/lamp")
public class LampApiController {

    @Autowired
    private MqttMessageSender mqttMessageSender;

    @GetMapping(value = "/{deviceId}/{status}")
    public String sendStatusLampMsg(@PathVariable(value = "deviceId") String deviceId , @PathVariable(value = "status") Integer status) {
        Map<String , Object> map = new HashMap<>() ;
        map.put("deviceId" , deviceId) ;
        map.put("status" , status) ;
        String json = JSON.toJSONString(map);
        mqttMessageSender.send("atguigu/iot/lamp/server/status" , json);
        return "ok" ;
    }

}

3.4 服务端获取设备开灯关灯消息

接口说明

接口四:设备端改变智能灯泡开关的状态,状态发给给后端,后端记录状态

主题:atguigu/iot/lamp/device/status
消息内容:
	{
		"deviceId": "xxxxx"  
		"status": 0
	}
数据说明:	
	deviceId:设备id
	status:0:关灯   , 1:开灯

业务代码

@Override
public void handleMessage(Message<?> message) throws MessagingException {
    MessageHeaders headers = message.getHeaders();
    String receivedTopicName = (String) headers.get("mqtt_receivedTopic");
    if("atguigu/iot/lamp/line".equals(receivedTopicName)) {
        tbLampService.updateLampOnlineStatus(message.getPayload().toString()) ;        // 更新智能灯泡的上线状态
    }else if("atguigu/iot/lamp/device/status".equals(receivedTopicName)) {
        tbLampStatusService.saveDeviceStatus(message.getPayload().toString()) ;
    }
}
@Service
public class TbLampStatusServiceImpl extends ServiceImpl<TbLampStatusMapper, TbLampStatus> implements TbLampStatusService {

    @Override
    public void saveDeviceStatus(String json) {

        // 获取消息内容
        Map<String , Object> map = JSON.parseObject(json, Map.class);
        String deviceId = map.get("deviceId").toString();
        Integer status = Integer.parseInt(map.get("status").toString());

        // 创建对象封装消息
        TbLampStatus tbLampStatus = new TbLampStatus() ;
        tbLampStatus.setDeviceid(deviceId);
        tbLampStatus.setStatus(status);
        this.save(tbLampStatus) ;

    }

}


http://www.kler.cn/a/408437.html

相关文章:

  • 搜索引擎中广泛使用的文档排序算法——BM25(Best Matching 25)
  • (动画)Qt控件 QLCDNumer
  • 01Web3.0行业
  • Oracle 19c Rac + ADG搭建(源库:RAC,目标库FS)
  • 常见面试题----深入源码理解MQ长轮询优化机制
  • VSCode汉化教程【简洁易懂】
  • 一加ACE 3 Pro手机无法连接电脑传输文件问题
  • Window11+annie 视频下载器安装
  • Sketch在线版不存在?即时设计来填补空白
  • Flink【基于时间的双流联结 Demo】
  • 时序预测 | Matlab实现PSO-Elman粒子群优化递归神经网络时间序列预测
  • raw文件如何打开
  • shell编程之sed
  • 探索 Python 任务自动化的新境界:Invoke 库揭秘
  • 如何用Python统计Excel文件中的特定字段数量
  • 【Java系列】随机生成大小写混合的卡密
  • 大数取模 详解
  • Redis除了做缓存,还能做什么???
  • 密码系统设计实验3-2
  • SQLite 管理工具 SQLiteStudio 3.4.5 发布
  • C语言中的指针和字符串的赋值
  • 3.13MayBeSomeJava that are BUTTON and listener
  • 基于网页的大语言模型聊天机器人
  • java中的最小堆
  • 深入理解 Seata:分布式事务的最佳解决方案
  • Vue.js 学习总结(15)—— 如何快速删除 node_modules 依赖文件