当前位置: 首页 > article >正文

ubuntu下openssl签名证书制作流程及验证demo

1.创建根CA证书

        生成 rootCA.key 以及 rootCA.crt. 用你的域名或者ip地址替换demo.mlopshub.com,比如192.168.10.11

openssl req -x509 \
            -sha256 -days 356 \
            -nodes \
            -newkey rsa:2048 \
            -subj "/CN=demo.mlopshub.com/C=CN/L=Beijing" \
            -keyout rootCA.key -out rootCA.crt 
或者
openssl req -x509 \
            -sha256 -days 356 \
            -nodes \
            -newkey rsa:2048 \
            -subj "/CN=192.168.10.11/C=CN/L=Beijing" \
            -keyout rootCA.key -out rootCA.crt 

如果上述命令提示Can’t load /home/username/.rnd into RNG,则需要你手动创建这个文件

2. 生成自签名证书

2.1生成server的私有key

openssl genrsa -out server.key 2048

2.2.创建证书签名请求配置

根据实际情况替换域名以及IP

cat > csr.conf <<EOF
[ req ]
default_bits = 2048
prompt = no
default_md = sha256
req_extensions = req_ext
distinguished_name = dn

[ dn ]
C = US
ST = Beijing
L = Beijing
O = MLopsHub
OU = MlopsHub Dev
CN =test.com

[ req_ext ]
subjectAltName = @alt_names

[ alt_names ]
DNS.1 = test.com
DNS.2 = test2.com
IP.1 = 192.168.10.11
IP.2 = 192.168.1.22

EOF

2.3.使用服务器私钥生成证书签名请求(CSR)

openssl req -new -key server.key -out server.csr -config csr.conf

2.4.创建一个外部文件


根据实际情况替换域名以及IP

cat > cert.conf <<EOF

authorityKeyIdentifier=keyid,issuer
basicConstraints=CA:FALSE
keyUsage = digitalSignature, nonRepudiation, keyEncipherment, dataEncipherment
subjectAltName = @alt_names

[alt_names]
DNS.1 = test.com
IP.1 = 192.168.10.62
EOF

2.5.使用自签名CA生成SSL证书

openssl x509 -req \
    -in server.csr \
    -CA rootCA.crt -CAkey rootCA.key \
    -CAcreateserial -out server.crt \
    -days 365 \
    -sha256 -extfile cert.conf

 将server.crt  server.key放到服务器目录下

将rootCA.crt放到客户端进行验签

3.python验证

3.1客户端单向/双向/不认证

(1)服务器端的代码

import socket
import ssl

# 服务器配置
SERVER_HOST = '192.168.10.62'
SERVER_PORT = 12345

def ssl_auth_double():
    # 创建一个普通的 TCP 套接字
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Server socket created")

    # 绑定到本地地址和端口
    server_address = (SERVER_HOST, SERVER_PORT)
    server_socket.bind(server_address)
    print("Bound to address and port")

    # 开始监听连接
    server_socket.listen(1)
    print("Listening for connections")

    # 创建 SSL 上下文
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile="server.crt", keyfile="server.key")
    ssl_context.load_verify_locations(cafile="root_cli.crt")
    ssl_context.verify_mode = ssl.CERT_REQUIRED

    try:
        while True:
            # 接受客户端连接
            client_socket, client_address = server_socket.accept()
            print(f"Accepted connection from {client_address}")

            # 将客户端 socket 包装成 SSL socket
            ssl_socket = ssl_context.wrap_socket(client_socket, server_side=True)

            try:
                while True:
                    # 接收客户端数据
                    data = ssl_socket.recv(1024)
                    if not data:
                        break

                    # 将接收到的数据原样返回
                    ssl_socket.sendall(data)
            except socket.error as e:
                print(f"Socket error: {e}")
            finally:
                # 关闭 SSL socket
                ssl_socket.close()
    except KeyboardInterrupt:
        print("Server stopped by user")
    finally:
        # 关闭服务器 socket
        server_socket.close()

def ssl_auth_single():
    # 创建一个socket对象
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Socket created")

    # 绑定到本地地址和端口
    server_address = (SERVER_HOST, SERVER_PORT)
    server_socket.bind(server_address)
    print("Bound to address and port")

    # 开始监听连接
    server_socket.listen(1)
    print("Listening for connections")

    # 创建SSL上下文
    ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile='server.crt', keyfile='server.key')
    print('Server is listening on {}:{}'.format(*server_address))
    while True:
        # 接受客户端连接
        client_socket, client_address = server_socket.accept()
        print('Accepted connection from {}:{}'.format(*client_address))

        # 将客户端socket包装成SSL socket
        ssl_socket = ssl_context.wrap_socket(client_socket, server_side=True)

        try:
            while True:
                # 接收客户端数据
                data = ssl_socket.recv(1024)
                if not data:
                    break

                # 将接收到的数据原样返回
                ssl_socket.sendall(data)
        except socket.error as e:
            print('Socket error:', e)
        finally:
            # 关闭SSL socket
            ssl_socket.close()

def ssl_only():
    # 创建一个普通的 TCP/IP 套接字
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((SERVER_HOST, SERVER_PORT))
    server_socket.listen(5)

    # 创建 SSL 上下文,禁用客户端证书验证
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile='server.crt', keyfile='server.key')
    context.verify_mode = ssl.CERT_NONE  # 不验证客户端证书

    # 将普通套接字封装为 SSL 套接字,开启 SSL 加密
    server_socket_ssl = context.wrap_socket(server_socket, server_side=True)

    print(f"Server is listening on {SERVER_HOST}:{SERVER_PORT}...")

    while True:
        # 等待客户端连接
        client_socket, addr = server_socket_ssl.accept()
        print(f"Connection from {addr} established!")

        try:
            # 接收数据
            data = client_socket.recv(1024)
            if data:
                print(f"Received: {data.decode('utf-8')}")
                # 将接收到的数据原样返回
                client_socket.sendall(data)
            else:
                print("No data received")
        except Exception as e:
            print(f"Error: {e}")
        finally:
            # 关闭客户端连接
            client_socket.close()
if __name__ == '__main__':
    ssl_only()

(2)客户端代码

import socket
import ssl
import os
import time
# 客户端配置
SERVER_HOST = '192.168.10.62'
SERVER_PORT = 12345
CA_CERT_FILE = 'rootCA.crt'  # CA 证书(客户端用于验证服务端)
current_directory = os.getcwd()
print(f"当前目录是: {current_directory}")

def ssl_auth_double():
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Client socket created")

    # 创建 SSL 上下文
    ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ssl_context.load_cert_chain(certfile="client.crt", keyfile="client.key")
    ssl_context.load_verify_locations(cafile="rootCA.crt")
    ssl_context.verify_mode = ssl.CERT_REQUIRED

    # 将套接字包装为 SSL 套接字
    client_socket_ssl = ssl_context.wrap_socket(client_socket, server_hostname=SERVER_HOST)
    print("Client SSL socket wrapped")

    try:
        client_socket_ssl.connect((SERVER_HOST, SERVER_PORT))
        print(f"Connected to {SERVER_HOST}:{SERVER_PORT} over SSL.")

        while True:
            # 发送数据
            client_socket_ssl.sendall(b"Hello from SSL client!")
            print("Data sent to server")

            # 接收响应
            data = client_socket_ssl.recv(1024)
            print(f"Received from server: {data.decode('utf-8')}")
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # 关闭连接
        client_socket_ssl.close()
def ssl_auth_single():
    # 创建一个普通的 TCP 套接字
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Client socket created")

    # 将套接字包装为 SSL 套接字
    client_socket_ssl = ssl.wrap_socket(client_socket, certfile=None, keyfile=None, server_side=False, ssl_version=ssl.PROTOCOL_SSLv23, ca_certs=CA_CERT_FILE, cert_reqs=ssl.CERT_REQUIRED)
    print("Client SSL socket wrapped")

    try:
        client_socket_ssl.connect((SERVER_HOST, SERVER_PORT))
        print(f"Connected to {SERVER_HOST}:{SERVER_PORT} over SSL.")
        while True:
            # 发送数据
            client_socket_ssl.sendall(b"Hello from SSL client!")
            print("Data sent to server")

            # 接收响应
            data = client_socket_ssl.recv(1024)
            print(f"Received from server: {data.decode('utf-8')}")
            time.sleep(1)
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # 关闭连接
        client_socket_ssl.close()
        print("Connection closed")
def ssl_only():
        # 创建一个普通的 TCP 套接字
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print("Client socket created")

    # 创建 SSL 上下文
    ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    # 设置支持的加密套件
    ssl_context.set_ciphers('ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-DSS-AES128-GCM-SHA256:kEDH+AESGCM:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA:ECDHE-ECDSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-DSS-AES128-SHA256:DHE-RSA-AES256-SHA256:DHE-DSS-AES256-SHA:DHE-RSA-AES256-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:AES:CAMELLIA:DES-CBC3-SHA:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!MD5:!PSK:!aECDH:!EDH-DSS-DES-CBC3-SHA:!EDH-RSA-DES-CBC3-SHA:!KRB5-DES-CBC3-SHA')

    # 将套接字包装为 SSL 套接字
    client_socket_ssl = ssl_context.wrap_socket(client_socket, server_hostname=SERVER_HOST)
    print("Client SSL socket wrapped")

    try:
        client_socket_ssl.connect((SERVER_HOST, SERVER_PORT))
        print(f"Connected to {SERVER_HOST}:{SERVER_PORT} over SSL.")
        
        while True:
            # 发送数据
            client_socket_ssl.sendall(b"Hello from SSL client!")
            print("Data sent to server")

            # 接收响应
            data = client_socket_ssl.recv(1024)
            print(f"Received from server: {data.decode('utf-8')}")
            time.sleep(1)
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # 关闭连接
        client_socket_ssl.close()
        print("Connection closed")
if __name__ == '__main__':
    ssl_only()


http://www.kler.cn/a/391501.html

相关文章:

  • PySpark用sort-merge join解决数据倾斜的完整案例
  • STORM:从多时间点2D图像中快速重建动态3D场景的技术突破
  • 【算法学习】——整数划分问题详解(动态规划)
  • 【1】Word:邀请函
  • Perl语言的网络编程
  • 初识@ffmpeg/ffmpeg库
  • XML Schema 字符串数据类型
  • 干式电抗器的故障诊断和排除方法是什么?
  • 【大数据学习 | HBASE】hbase的写数据流程与hbase插入数据
  • Python教程笔记(1)
  • macOS sw_vers 查看当前系统版本
  • 【题解】—— LeetCode一周小结45
  • 光控资本:“自主可控”将进入新估值阶段
  • 【旷视科技-注册/登录安全分析报告】
  • FlinkSql读取kafka数据流的方法(scala)
  • 实现一个BLE HID鼠标
  • 面向对象的需求分析和设计(一)
  • 使用element UI实现表格行/列合并
  • 【Android】 插件化原理
  • 【网络】HTTP 协议
  • Python驱动ansys执行apdl文件
  • Qwen2.5-Coder-32B-Instruct Docker 部署openai接口
  • 网约车管理:规范发展,保障安全与便捷
  • windows tomcat 报错后如何让窗口不闪退
  • 知识竞赛活动中礼仪小姐有哪些工作,要如何安排
  • <AI 学习> 下载 Stable Diffusions via Windows OS