当前位置: 首页 > article >正文

docker安装Emqx并使用自签名证书开启 SSL/TLS 连接

docker安装Emqx并使用自签名证书开启 SSL/TLS 连接

  • 一、获取自签名证书
    • 1、创建openssl.cnf文件
    • 2、生成证书自签名证书
  • 二、docker安装EMQX
    • 1、初始化目录
    • 2、加载镜像文件并挂载相应的文件目录
    • 3、启动docker容器
    • 4、EMQX加载自签名证书
  • 三、客户端MQTTX连接测试
  • 四、Springboot整合

详细操作过程可以参考官方说明文档:官方说明文档
客户端TLS接入示例:客户端接入示例
从官方的文档中我们了解到,EMQX支持通过 X.509 证书实现单向和双向客户端/服务器互信认证,这里我们使用单项认证。

认证方式说明验证方式优缺点
单向认证客户端验证服务器身份,但服务器不验证客户端的身份客户端通常不需要提供证书,仅需验证服务器的证书是否由受信任的证书颁发机构(CA)签发只能实现通信数据的机密性和完整性,但无法保证通信双方的身份
双向认证服务器和客户端彼此验证对方的身份需要为每个设备签发证书,服务器验证客户端的证书以确认其身份的合法性可以确保服务器和客户端之间的互信关系,并防止中间人攻击

一、获取自签名证书

1、创建openssl.cnf文件

[ req ]
default_bits        = 2048
default_keyfile     = server-key.pem
distinguished_name  = req_distinguished_name
x509_extensions     = v3_req
prompt = no

[ req_distinguished_name ]
C = CN
ST = California
L = San Francisco
O = Example Organization
OU = Example Unit
CN = 192.168.3.51  # 这里设置为服务器的IP地址

[ v3_req ]
subjectAltName = @alt_names

[ alt_names ]
DNS.1 = 192.168.3.51  # 这里设置为服务器的IP地址
IP.1 = 192.168.3.51   # 这里设置为服务器的IP地址

2、生成证书自签名证书

在openssl.cnf目录下执行如下命令生成自签名证书:

openssl genpkey -algorithm RSA -out server-key.pem
openssl req -new -key server-key.pem -out server.csr -config openssl.cnf
openssl x509 -req -in server.csr -out server-cert.pem -signkey server-keyi.pem -days 3650 -extensions v3_req -extfile openssl.cnf

二、docker安装EMQX

1、初始化目录

mkdir -p /wz_conf/emqx/etc
mkdir -p /wz_conf/emqx/data
mkdir -p /wz_conf/emqx/etc

2、加载镜像文件并挂载相应的文件目录

docker load -i emqx.tar.gz
这里也可以使用docker pull emqx/emqx:latest拉取镜像包
cd /wz_conf/emqx  
chmod 777 ./*  
docker run -d --privileged=true --name emqx emqx/emqx:latest
# 将容器中的相关配置文件挂在到物理机,方便后续修改,*******则为容器id
docker cp *********:/opt/emqx/etc /wz_conf/emqx  
docker stop *******
docker rm *******

3、启动docker容器

docker run -d \
--privileged=true \
--restart=always \
--name emqx \
-p 1883:1883 \
-p 8883:8883 \
-p 8083:8083 \
-p 8084:8084 \
-p 8081:8081 \
-p 18083:18083 \
-v /wz_conf/emqx/etc:/opt/emqx/etc \
-v /wz_conf/emqx/data:/opt/emqx/data \
-v /wz_conf/emqx/log:/opt/emqx/log \
emqx/emqx:latest

这里就不再详细说明了,如果不需要开启SSL/TLS,到这里也就可以了。

4、EMQX加载自签名证书

这里我是用的EMQX是5.8.0版本的,通过修改emqx.conf配置文件实现,也可以通过官方文档说的在dashboard中配置。
将生成的证书文件挂载到容器内/opt/emqx/etc/certs目录下,修改/opt/emqx/etc/emqx.conf配置文件末尾添加如下内容:

listeners.ssl.default {
  bind = "0.0.0.0:8883"
    ssl_options {
      # PEM 格式的文件,包含一个或多个用于验证客户端证书的根 CA 证书
      # 单向认证时,该文件内容可以为空
      # cacertfile = "etc/certs/rootCAs.pem"
      # PEM 格式的服务器证书,如果证书不是直接由根 CA 签发,那么中间 CA 的证书必须加在服务器证书的后面组成一个证书链
      certfile = "/opt/emqx/etc/certs/server-cert.pem"
      # PEM 格式的密钥文件
      keyfile = "/opt/emqx/etc/certs/server-keyi.pem"
      # 设置成 'verify_peer' 来验证客户端证书是否为 cacertfile 中某个根证书签发。双向认证时,必须设置成 'verify_peer'。
      # 设置成 'verify_none' 则不验证客户端证书,即单向认证。
      verify = verify_none
      # 如果设置成 true,但是客户端在握手时候没有发送证书,服务端会终止握手。双向认证时,必须设置成 true。
      # 如果设置成 false,那么服务端只有在客户端发送一个非法证书时才会终止握手
      fail_if_no_peer_cert = false
  }
}

配置后重启docker容器即可。重启后也可以通过dashboard查看
在这里插入图片描述
在这里插入图片描述

三、客户端MQTTX连接测试

在这里插入图片描述

四、Springboot整合

<!-- mqtt -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

核心代码块
MqttPushCallback.java

import com.webuild.ai.outside.service.OutsideBaseService;
import com.webuild.ai.utils.WelMaxEncryptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collection;

@Component
@Slf4j
public class MqttPushCallback implements MqttCallback {

    @Autowired
    private MqttConfig mqttConfig;
    private static MqttClient client;

    @Override
    public void connectionLost(Throwable throwable) {
        log.info("连接断开,可以做重连");// 连接丢失后,一般在这里面进行重连
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        byte[] payload = mqttMessage.getPayload();
        String mqttMessagePayload = new String(payload);// 接收消息内容(转为字符串格式)
        // TODO 自己的处理逻辑
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
    }
}

MqttPushClient.java

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;

@Component
@Slf4j
public class MqttPushClient {

    @Autowired
    private MqttPushCallback mqttPushCallback;

    private static MqttClient client;

    public MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientID  客户端Id
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientID, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            if(host.startsWith("ssl")) {
                // 配置忽略 CN 验证
                SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
                sslContext.init(null, new TrustManager[]{
                        new X509TrustManager() {
                            @Override
                            public void checkClientTrusted(X509Certificate[] chain, String authType) {
                            }
                            @Override
                            public void checkServerTrusted(X509Certificate[] chain, String authType) {
                            }
                            @Override
                            public X509Certificate[] getAcceptedIssuers() {
                                return new X509Certificate[0];
                            }
                        }
                }, new java.security.SecureRandom());
                options.setSocketFactory(sslContext.getSocketFactory());
            }
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(mqttPushCallback);
                client.connect(options);
            } catch (Exception e) {
                log.error("设置mqttPushCallback异常. e={}",e);
            }
        } catch (Exception e) {
            log.error("mqtt链接异常. e={}", e);
        }
    }
}

MqttClientUtil.java

import com.webuild.ai.config.MqttPushClient;
import com.webuild.ai.config.SpringValueConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqttClientUtil {

    @Autowired
    private MqttPushClient mqttPushClient;

    /**
     * 发布消息
     *
     * @Description
     * @Param pushMessage
     * @Param topic
     * @Return void
     * @Author Administrator
     * @Date 2024/9/2 11:13
     **/
    public void publish(String pushMessage, String topic) {
        try {
            log.info(">>>>>>>mqtt推送消息>>>>>>>>. pushMessage={}, topic={}", pushMessage, topic);
            publish(pushMessage, topic, 0, false);
        } catch (Exception e) {
            log.error("mqtt发布消息异常. topic={}, pushMessage={}", topic, pushMessage);
        }
    }

    /**
     * 发布消息
     * 
     * @Description 
     * @Param pushMessage
     * @Param topic
     * @Param qos
     * QoS 0: 至多一次(At most once) 消息会被尽力而为地传递,但不保证消息会被送达。也就是说,消息可能会丢失,不会有确认消息发送给发布者
     * 1: 至少一次(At least once) 消息会被至少传递一次。发布者发送消息后会等待接收方的确认(PUBACK)。如果没有收到确认,消息会被重新发送,直到收到确认消息为止。这意味着接收方可能会收到重复的消息。
     * 2: 只一次(Only once)消息确保只会传递一次。通过四步握手(四个消息交换:PUBLISH、PUBREC、PUBREL、PUBCOMP)来保证消息既不会丢失也不会重复。这是最可靠的传输模式,但也会带来最大的开销
     * @Param retained
     * True (保留消息):当retained设为true时,Broker会保留这条消息,当新的客户端订阅这个Topic时,Broker会立即将这条保留消息发送给订阅者。新的保留消息会覆盖之前的保留消息。
     * False (非保留消息):当retained设为false时,消息不会被Broker保留。只有在消息发布后已有订阅者能收到该消息,新的订阅者不会收到这个消息。
     * @Return void
     * @Author Administrator
     * @Date 2024/9/2 11:07     
     **/
    public void publish(String pushMessage, String topic, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setPayload(pushMessage.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = mqttPushClient.getClient().getTopic(topic);
        if (null == mqttTopic) {
            log.error("topic is not exist");
        }
        MqttDeliveryToken token;//Delivery:配送
        synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁
            try {
                token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
//                token.waitForCompletion(5000L);
            } catch (Exception e) {
                log.error("mqtt publish error:={}", e);
            }
        }
    }
}


http://www.kler.cn/a/429409.html

相关文章:

  • C++单例模式的设计
  • SpringBoot链接Kafka
  • 迅翼SwiftWing | ROS 固定翼开源仿真平台正式发布!
  • C++并发编程之std::async的异常安全性
  • 提供的 IP 地址 10.0.0.5 和子网掩码位 /26 来计算相关的网络信息
  • Oracle Dataguard(主库为双节点集群)配置详解(5):将主库复制到备库并启动同步
  • 数据库之连接池Druid
  • ZZCMS2023存在跨站脚本漏洞(CNVD-2024-44822、CVE-2024-44818)
  • sock_poll内核函数
  • No module named ‘_ssl‘ No module named ‘_ctypes‘
  • 如何防范顶级应用程序安全威胁
  • 【大语言模型】LangChain ModelsIO与Models I/O Promopts详解
  • 【CKS最新模拟真题】Dockerfile修改
  • CTF-RE/WEB: python-Hook(钩子)
  • 电子信息工程自动化 基于单片机的居室安全报警系统
  • 为什么 JavaScript 中的回调函数未按顺序执行?
  • Pydantic 动态字段:使用和不使用 `@computed_field` 的对比指南
  • 如何使用 JavaScript 获取页面滚动位置?
  • Java项目实战II基于微信小程序的跑腿系统(开发文档+数据库+源码)
  • Hasura 中间件API go操作示例
  • 专为高性能汽车设计的Armv9架构的Neoverse V3AE CPU基础知识与软件编码特性解析
  • 管理系统前端框架开发案例学习
  • redis-stack redisSearch环境安装搭建
  • 记录一下,解决js内存溢出npm ERR! code ELIFECYCLEnpm ERR! errno 134 以及 errno 9009
  • 智创 AI 新视界 -- AI 引领下的未来社会变革预测(16 - 6)
  • DP协议:术语表