当前位置: 首页 > article >正文

Java模拟Mqtt客户端连接Mqtt Broker

Java模拟Mqtt客户端基本流程

引入Paho MQTT客户端库

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.mqttv5.client</artifactId>
    <version>1.2.5</version>
</dependency>

设置mqtt配置数据

在application.yml中添加如下配置

mqtt:
    broker-url: tcp://42.194.132.44:1883
    client-id: mqtt_receive_server
    username: mqtt_server
    password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0

MqttClient配置

将MqttClient加入到IoC容器,并连接客户端

package com.angel.ocean.config;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqttConfig {

    @Value("${mqtt.broker-url}")
    private String brokerUrl;

    @Value("${mqtt.client-id}")
    private String clientId;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password}")
    private String password;

    @Bean
    public MqttClient mqttClient() throws MqttException {
        MqttClient client = new MqttClient(brokerUrl, clientId);
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        client.connect(options);
        return client;
    }
}

MqttService

mqtt客户端,一些基本操作:连接、订阅、发消息,断开连接

package com.angel.ocean.mqtt;

import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Slf4j
@Service
public class MqttService {

    @Resource
    private MqttClient client;

    @Resource
    private KafkaService kafkaService;

    @PostConstruct
    public void init() throws MqttException {
        client.setCallback(new MqttCallbackHandler(kafkaService));
        subscribe(MqttTopicConstant.ACTIVATE);
        subscribe(MqttTopicConstant.RESET);
        subscribe(MqttTopicConstant.ONLINE);
        subscribe(MqttTopicConstant.OFFLINE);
        subscribe(MqttTopicConstant.REPORT);
    }

    /**
     * 连接
     */
    public void connect(String username, String password) throws MqttException {

        if(!client.isConnected()) {
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setCleanSession(true);
            client.connect(options);
        }
    }

    /**
     * 发送消息
     */
    public void publish(String topic, String data) {

        if(client.isConnected()) {
            MqttMessage message = new MqttMessage(data.getBytes());
            message.setQos(0);
            try {
                client.publish(topic, message);
                log.info("Message published:{}, topic:{}, content:{}", client.getClientId(), topic, data);
            } catch (MqttException e) {
                log.error("Message publish failed:{}, topic:{}", client.getClientId(), topic, e);
            }
            return;
        }

        log.info("Message publish failed, client:{} not online.", client.getClientId());
    }

    /**
     * 订阅
     */
    public void subscribe(String topic) {

        if(client.isConnected()) {
            try {
                client.subscribe(topic);
                log.info("Message subscribed:{}, topic:{}", client.getClientId(), topic);
            } catch (MqttException e) {
                log.error("Message subscribe failed:{}, topic:{}", client.getClientId(), topic, e);
            }
            return;
        }

        log.info("Message subscribe failed, client:{} not online.", client.getClientId());
    }

    /**
     * 断开连接
     */
    public void disconnect() {
        try {
            client.disconnect();
            client.close();
            log.info("Disconnected:{}", client.getClientId());
        } catch (MqttException e) {
            log.error("Message disconnect failed:{}", client.getClientId(), e);
        }
    }
}

自定义MqttCallback

对客户端连接丢失,收到消息做一些模拟处理

package com.angel.ocean.mqtt;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.domain.UpData;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.kafka.KafkaService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;

@Slf4j
public class MqttCallbackHandler implements MqttCallback {

    private KafkaService kafkaService;

    public MqttCallbackHandler(KafkaService kafkaService) {
        this.kafkaService = kafkaService;
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        log.info("连接断开...", cause);
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String data = new String(message.getPayload());
        log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
        UpData upData = JSONObject.parseObject(data, UpData.class);
        UpKafKaData upKafKaData = new UpKafKaData(topic, data);
        log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
        kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("deliveryComplete---------:{}", token.isComplete());
    }
}

MqttController

用于模拟客户端行为

package com.angel.ocean.controller;

import com.angel.ocean.common.ApiResult;
import com.angel.ocean.common.BaseController;
import com.angel.ocean.mqtt.MqttService;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

/**
 *  前端控制器
 *
 * @author Jaime.yu
 * @time 2024-12-01
 */
@Api(value = "接口", tags = {"相关接口"})
@RestController
@RequestMapping("/mqtt/client")
public class MqttController extends BaseController {

    @Resource
    private MqttService mqttService;

    @GetMapping("/subscribe")
    public ApiResult<?> subscribe(String topic) {
        mqttService.subscribe(topic);
        return ApiResult.success();
    }

    @GetMapping("/publish")
    public ApiResult<?> publish(String topic, String message) {
        mqttService.publish(topic, message);
        return ApiResult.success();
    }

    @GetMapping("/disconnect")
    public ApiResult<?> disconnect() {
        mqttService.disconnect();
        return ApiResult.success();
    }
}

代码验证

启动mqtt客户端

如下图客户端已上线:
在这里插入图片描述

发送消息

在这里插入图片描述如下图mqtt broker该客户端的日志,接收到了我们发送的数据:hello world
在这里插入图片描述

接收数据

首先我们先订阅个主题:mqtt/0/0

在这里插入图片描述

使用MQTTX客户端向该主题发消息

在这里插入图片描述

Java mqtt客户端接收数据

查询本地Java mqtt客户收到的消息,如下图收到该消息
在这里插入图片描述mqtt broker 也可以看到该日志:
在这里插入图片描述

断开连接

在这里插入图片描述如下图本地客户端862024121819020已断开连接:
在这里插入图片描述


http://www.kler.cn/a/449017.html

相关文章:

  • 0.96寸OLED显示屏详解
  • Mysql数据究竟是如何存储的
  • 百度热力图数据处理,可直接用于论文
  • pyparsing如何实现嵌套捕获
  • CTFHUB 历年真题 afr-1
  • Spring常见问题
  • 增强LabVIEW与PLC通信稳定性
  • 培训机构Day15
  • 小型 Vue 项目,该不该用 Pinia 、Vuex呢?
  • 云原生大数据计算服务 MaxCompute 是什么?
  • 图像处理基础 | 查看两张图像的亮度差异,Y通道相减
  • 《深入浅出 Servlet:Java Web 开发的基石》(二)
  • JMeter 使用详解
  • 银河麒麟 SSH Vscode连接
  • C++简明教程(文章要求学过一点C语言)(2)
  • 【网络云计算】2024第52周-每日【2024/12/23】小测-理论实操
  • 【一文了解】C#重点-委托1
  • Linux下Java通过JNI调用C++
  • NodeMCU驱动28BYJ-48型步进电机(Arduino)
  • Golang中的Goroutine调度策略
  • 爬虫自动化(DrissionPage)
  • [机器学习]XGBoost(3)——确定树的结构
  • python实现Excel转图片
  • Flutter Visibility控件详解
  • 天锐绿盾加密软件与Ping32两款企业防泄密软件对比:分析文件防止泄露解决方案
  • Qt获取本地计算的CPU温度