Python通过SSH隧道访问数据库
本文介绍通过sshtunnel类库建立SSH隧道,使用paramiko通过SSH来访问数据库。
实现了两种建立SSH方式:公私钥验证、密码验证。
公私钥可读本地,也可读取Aws S3上的私钥文件。
本质上就是在本机建立SSH隧道,然后将访问DB转发到本地SSH内去访问数据库。
简单易懂,上代码:
from sshtunnel import SSHTunnelForwarder
from sqlalchemy import create_engine, text
import paramiko
import io
import socket
#### 都换成你自己的
# SSH配置
ssh_host = '' #主机
ssh_port = 22 #端口
ssh_user = 'ec2-user' #用户名
ssh_password = '' #密码(如果是密码验证)
ssh_key_path = r'C:\Users\Desktop\test_primi.pem' #私钥本地地址(如果是公私钥验证)
# 数据库配置
database_user = '' #用户名
database_password = '' #密码
database_name = '' #数据库名
database_host = '' #主机
database_port = 3306 #端口号
def get_available_port():
"""
获取可用的本地端口。
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind(('127.0.0.1', 0))
port = sock.getsockname()[1]
finally:
sock.close()
return port
try:
#读取S3上的私钥
# s3_client = get_s3_client()
# response = s3_client.get_object(Bucket='桶名称', Key='私钥在S3上的key')
# private_key_content = response['Body'].read().decode('utf-8')
#private_key = paramiko.RSAKey.from_private_key(io.StringIO(private_key_content))
#读取本地私钥
private_key = paramiko.RSAKey.from_private_key_file(ssh_key_path)
local_port = get_available_port()
# 创建 SSH 隧道
tunnel= SSHTunnelForwarder(
(ssh_host, ssh_port),
ssh_username=ssh_user,
ssh_pkey=private_key,
#ssh_password=ssh_password,
remote_bind_address=(database_host, database_port),
local_bind_address=('127.0.0.1', local_port),
host_pkey_directories=[]
)
try:
# 启动 SSH 隧道
tunnel.start()
# 连接数据库
engine = create_engine(
f'mysql+pymysql://{database_user}:{database_password}@127.0.0.1:{local_port}/{database_name}')
# 测试数据库连接
with engine.connect() as connection:
result = connection.execute(text("SELECT count(*) from activity_logs"))
for row in result:
print(row)
finally:
# 关闭 SSH 隧道
tunnel.stop()
except Exception as e:
print(f"出现错误: {e}")
执行结果:

注意:host_pkey_directories=[] 的意思是 不要在指定的目录中寻找密钥,如果没有将出现如下错误,但不影响程序正常执行。
可以先在本地用Navicat之类的客户端测好了,sshtunnel应该是三种验证方法都支持的,源码如下