当前位置: 首页 > article >正文

MQTT--Java整合EMQX

目录

  • 1、简介
  • 2、准备
  • 3、使用步骤
    • 3.1 引入依赖
    • 3.2 创建生产者和消费者
    • 3.3 测试
  • 总结
  • PS:


1、简介

本文章实现了一个简单的MQTT客户端,使用Eclipse Paho库让Java和EMQX整合,测试客户端初始化时配置Broker地址、客户端ID、用户名和密码。连接成功后,订阅主题并发布消息

2、准备

前提是启动了EMQX服务,可以打开这个页面(目的是为了更清楚看见客户端连接和消息的发送):
在这里插入图片描述
不会的可以看这篇文章:MQTT–EMQX入门+MQTTX使用

3、使用步骤

3.1 引入依赖

<dependency>
  <groupId>org.eclipse.paho</groupId>
  <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  <version>1.2.2</version>
</dependency>

3.2 创建生产者和消费者

生产者类:

package com.itxhj.emqxdemo.io;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://192.168.176.128:1883"; // 地址修改成你开启EMQX的主机地址
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("admin");	// 客户端的账号,并非EMQX的
            connOpts.setPassword("123456".toCharArray());  // 客户端的密码,并非EMQX的
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new OnMessageCallback());

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            Thread.sleep(10000);	// 因为断开连接EMQX那边就看不见连接了,所以sleep一会

            client.disconnect();
            System.out.println("Disconnected");
            client.close();
            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

消费者类:

package com.itxhj.emqxdemo.io;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

3.3 测试

启动main方法后,看见这个表示成功
在这里插入图片描述
可以看见EMQX中也有连接显示

在这里插入图片描述


总结

总体的步骤如下:

  • 引入依赖:Eclipse Paho库
  • 编写生产者:配置了Broker地址、客户端ID、用户名和密码,成功连接后订阅了主题并发布了消息。
  • 编写消费者:实现消息回调,处理连接丢失和接收到的消息。
  • 启动项目测试:启动项目后在控制台和EMQX管理界面中查看了连接和消息的发送情况

PS:

感谢您的阅读!如果您觉得本篇文章对您有所帮助,请给予博主一个赞喔~


http://www.kler.cn/news/334913.html

相关文章:

  • 网页前端开发之Javascript入门篇(5/9):函数
  • 第5篇:Windows命令行文件下载方式汇总----应急响应之权限维持篇
  • czx前端(看完理解完必拿至少9Koffer)不断更新中
  • Pikachu-SSRF(curl / file_get_content)
  • 【Postman】接口测试工具使用
  • 【数据结构】什么是哈希表(散列表)?
  • html中src/href区别
  • LeetCode题练习与总结:完全平方数--279
  • 大厂出来的人为什么不比你高效?
  • 商品详情接口使用方法和对接流程如下
  • 【系统分析师】-案例篇-信息系统安全
  • 仿RabbitMQ实现消息队列三种主题的调试及源码
  • nacos1.4-CP架构源码
  • RabbitMQ 入门到精通指南
  • springboot项目中属性的使用优先级;maven编译插件切换环境变量
  • Qt操作主/从视图及XML——实例:汽车管理系统
  • 【Spring】“请求“ 之传递 JSON 数据
  • Lucene最新最全面试题及参考答案
  • How to Write an Effective Introduction for a Research Article
  • 数据结构--包装类简单认识泛型