测试集群内主机链路带宽并生成网络拓扑图
测试集群内主机链路带宽并生成网络拓扑图
- 一、背景
- 二、操作步骤
- 1.修改 Docker 源
- 2.启动 Redis 服务
- 3.创建 Python 3.10 容器
- 4.在容器内安装依赖
- 5.创建删除 Redis 集合的脚本 `reset.py`
- 6.创建发布本地 IP 的脚本 `publish.py`
- 7.创建带宽测试的脚本 `benchmark.py`
- 8.创建生成网络拓扑图的脚本 `summary.py`
- 8.创建自动化脚本 `auto.py`
- 9.提交 Docker 镜像
- 10.重新进入 `net_scan` 容器,运行自动化脚本
一、背景
我们希望生成集群内主机之间每条链路的带宽图。方案如下:
- 通过 Redis 共享数据
- 枚举所有的网卡,向 Redis 发布自己的 IP,监听请求,用于客户端测试发送带宽
- 从 Redis 接受发布的 IP 并记录,尝试连接,如果连接成功则发送 2MB 的数据,测试发送带宽
- 将测试记录(主机名、源 IP、目的 IP、发送带宽)发送到 Redis 服务器
- 使用独立的 Python 程序读取 Redis 上的测试记录,生成网络拓扑图,节点为主机名,边为某个网卡对另一个节点的带宽
二、操作步骤
1.修改 Docker 源
sudo tee /etc/docker/daemon.json <<EOF
{
"registry-mirrors":[
"https://docker.1ms.run",
"https://docker.xuanyuan.me"
]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker
2.启动 Redis 服务
docker run -d --rm -p 6379:6379 redis:6.2
3.创建 Python 3.10 容器
docker run --shm-size=32g -it --privileged --net=host \
-v $PWD:/home -w /home \
--name net_scan python:3.10 /bin/bash
4.在容器内安装依赖
pip install redis
pip install psutil
pip install paramiko
pip install netifaces
pip install networkx
pip install matplotlib
apt install dmidecode -y
5.创建删除 Redis 集合的脚本 reset.py
cat > /mnt/reset.py <<-'EOF'
import redis
import sys
r = redis.Redis(host=sys.argv[1], port=6379)
if r.exists('host_info'):
r.delete('host_info')
if r.exists('test_results'):
r.delete('test_results')
print("reset end")
EOF
6.创建发布本地 IP 的脚本 publish.py
cat > /mnt/publish.py <<-'EOF'
import redis
import sys
import socket
import redis
import netifaces
import threading
import subprocess
def gethostname():
command="dmidecode -s baseboard-serial-number"
serial_number=subprocess.check_output(command,shell=True).decode().strip()
import os
return os.environ.get('NODENAME',serial_number)
def get_local_ips():
"""获取本地主机的所有IP地址"""
ips = []
interfaces = netifaces.interfaces()
for iface in interfaces:
# 排除回环接口
if iface == 'lo':
continue
# 排除虚拟接口
if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):
continue
addrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in addrs:
for addr_info in addrs[netifaces.AF_INET]:
ip=addr_info['addr']
ips.append(ip)
return ips
def publish_host_info(redis_client, hostname, ips):
"""将主机名和IP地址发布到Redis"""
host_info = {'hostname': hostname, 'ips': ips}
redis_client.sadd('host_info', str(host_info))
def start_server(ip, port):
"""在指定IP和端口上启动TCP服务器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((ip, port))
server.listen(5)
print(f"服务器在 {ip}:{port} 上监听中...")
while True:
client_socket, addr = server.accept()
threading.Thread(target=handle_client, args=(client_socket, addr)).start()
def handle_client(client_socket, addr):
"""处理客户端连接,接收数据"""
while True:
data = client_socket.recv(2 * 1024 * 1024) # 接收2MB数据
if not data:
break
client_socket.close()
def main():
redis_client = redis.Redis(host=sys.argv[1], port=6379)
hostname = gethostname()
ips = get_local_ips()
publish_host_info(redis_client, hostname, ips)
port = 5000 # 定义测试使用的端口
for ip in ips:
threading.Thread(target=start_server, args=(ip, port)).start()
if __name__ == '__main__':
main()
EOF
7.创建带宽测试的脚本 benchmark.py
cat > /mnt/benchmark.py <<-'EOF'
import redis
import sys
import socket
import time
import ast
import threading
import netifaces
import subprocess
def gethostname():
command="dmidecode -s baseboard-serial-number"
serial_number=subprocess.check_output(command,shell=True).decode().strip()
import os
return os.environ.get('NODENAME',serial_number)
def get_local_ips():
"""获取本地主机的所有IP地址"""
ips = []
interfaces = netifaces.interfaces()
for iface in interfaces:
# 排除回环接口
if iface == 'lo':
continue
# 排除虚拟接口
if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):
continue
addrs = netifaces.ifaddresses(iface)
if netifaces.AF_INET in addrs:
for addr_info in addrs[netifaces.AF_INET]:
ip=addr_info['addr']
ips.append(ip)
return ips
def test_bandwidth(redis_client, local_hostname, local_ip):
"""测试与其他主机的带宽"""
host_info_list = redis_client.smembers('host_info')
port = 5000
local_port=5001
results = []
for info_str in host_info_list:
info = ast.literal_eval(info_str.decode('utf-8'))
remote_hostname = info['hostname']
ips = info['ips']
if remote_hostname == local_hostname:
continue
for ip in ips:
if ip == local_ip:
continue
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
client.bind((local_ip, local_port))
client.settimeout(5)
start_time = time.time()
client.connect((ip, port))
data = b'x' * (2 * 1024 * 1024) # 2MB的数据
client.sendall(data)
client.shutdown(socket.SHUT_WR)
end_time = time.time()
client.close()
duration = end_time - start_time
bandwidth = (2 * 8) / duration # 单位:Mbps
result = {
'source_hostname': local_hostname,
'source_ip': local_ip,
'dest_hostname': remote_hostname,
'dest_ip': ip,
'bandwidth': round(bandwidth, 2)
}
results.append(result)
# 将结果发送到Redis
redis_client.rpush('test_results', str(result))
print(f"{local_hostname}:[{local_ip}] 与 {remote_hostname}:[{ip}] 的带宽为 {bandwidth:.2f} Mbps")
except Exception as e:
#print(f"无法连接到 {ip}:{port},错误:{e}")
continue
def main():
redis_client = redis.Redis(host=sys.argv[1], port=6379)
local_hostname = gethostname()
local_ips = get_local_ips()
for local_ip in local_ips:
test_bandwidth(redis_client, local_hostname, local_ip)
if __name__ == '__main__':
main()
EOF
8.创建生成网络拓扑图的脚本 summary.py
cat > /mnt/summary.py <<-'EOF'
import redis
import sys
import ast
import networkx as nx
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib
plt.figure(figsize=(19.2, 10.8), dpi=100)
redis_client = redis.Redis(host=sys.argv[1], port=6379)
test_results = redis_client.lrange('test_results', 0, -1)
data=[]
for result_str in test_results:
result = ast.literal_eval(result_str.decode('utf-8'))
data.append(result)
# 创建一个有向多重图
G = nx.MultiDiGraph()
# 添加节点和边到图中
for entry in data:
src_hostname = entry['source_hostname'].replace("NODE","")
dest_hostname = entry['dest_hostname'].replace("NODE","")
bandwidth = entry['bandwidth']
# 添加节点(如果节点已存在则不会重复添加)
G.add_node(src_hostname)
G.add_node(dest_hostname)
# 添加边,带宽作为属性
G.add_edge(src_hostname, dest_hostname, bandwidth=bandwidth)
# 获取边列表以及对应的带宽,用于设置边的粗细
edges = G.edges(data=True)
bandwidths = [d['bandwidth'] for (u, v, d) in edges]
# 计算边宽的归一化值,使得最大边宽不超过10
max_width = 8 # 定义最大边宽
max_bandwidth = max(bandwidths) # 最大带宽值
edge_widths = [(bandwidth / max_bandwidth) * max_width for bandwidth in bandwidths]
# 准备边标签,显示带宽
edge_labels = {(u, v): f"{d['bandwidth']}" for u, v, d in edges}
# 设置图片大小为1920x1080像素
plt.figure(figsize=(19.2, 10.8), dpi=100)
# 使用spring布局来定位节点
#pos = nx.spring_layout(G)
pos = nx.circular_layout(G)
# 绘制节点
nx.draw_networkx_nodes(G, pos, node_size=1000, node_color='red')
# 绘制节点标签(主机名)
nx.draw_networkx_labels(G, pos, font_size=12)
# 绘制边,边的粗细根据带宽归一化调整
nx.draw_networkx_edges(G, pos, width=edge_widths, arrowstyle='->', arrowsize=20)
# 从绘图中获取坐标转换器
ax = plt.gca()
# 计算边标签的位置,距离起始节点30个像素
edge_label_pos = {}
pixel_offset = 64 # 偏移量,像素
for (u, v, d) in edges:
# 获取起点和终点的坐标(数据坐标)
x_start, y_start = pos[u]
x_end, y_end = pos[v]
# 计算边的方向向量
dx = x_end - x_start
dy = y_end - y_start
# 计算边的长度(数据坐标系)
edge_length = (dx**2 + dy**2) ** 0.5
# 计算单位方向向量
ux = dx / edge_length
uy = dy / edge_length
# 将数据坐标转换为像素坐标
start_pixel = ax.transData.transform((x_start, y_start))
end_pixel = ax.transData.transform((x_end, y_end))
# 计算边的像素长度
dx_pixel = end_pixel[0] - start_pixel[0]
dy_pixel = end_pixel[1] - start_pixel[1]
edge_length_pixel = (dx_pixel**2 + dy_pixel**2) ** 0.5
# 计算偏移量在数据坐标系中的长度
offset_data = (pixel_offset / edge_length_pixel) * edge_length
# 计算新的标签位置(沿边方向偏移指定的像素距离)
label_x = x_start + ux * offset_data
label_y = y_start + uy * offset_data
# 将标签位置存储起来
edge_label_pos[(u, v)] = (label_x, label_y)
# 使用 Matplotlib 在指定位置绘制边标签
for (u, v), (x, y) in edge_label_pos.items():
label = edge_labels[(u, v)]
plt.text(x, y, label, fontsize=10, ha='center', va='center', bbox=dict(facecolor='white', alpha=0.5, edgecolor='none'))
# 隐藏坐标轴
plt.axis('off')
# 调整图形边距
plt.tight_layout()
plt.savefig('topo.png', dpi=100)
plt.show()
EOF
8.创建自动化脚本 auto.py
cat > /mnt/auto.py <<-'EOF'
import paramiko
import threading
import sys
import time
import os
if len(sys.argv)!=4:
print(f"{sys.argv[0]} redis_server docker_image_url hostfile")
os._exit(0)
redis_server=sys.argv[1]
docker_image_url=sys.argv[2]
hosts=[]
with open(sys.argv[3],"r") as f:
for line in f.readlines():
hosts.append([x.strip() for x in line.split(" ")])
def ssh_interactive_shell(hostname, port, username, password,ssh_requests,flog):
ssh = paramiko.SSHClient()
#print(hostname,ssh_requests)
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
ssh.connect(hostname=hostname, port=port, username=username, password=password)
channel = ssh.invoke_shell()
def recv_data():
while True:
if channel.recv_ready():
data = channel.recv(1024)
if not data:
break
sys.stdout.write(data.decode())
sys.stdout.flush()
flog.write(data)
else:
if channel.exit_status_ready():
break
recv_thread = threading.Thread(target=recv_data)
recv_thread.start()
channel.send(ssh_requests + '\n')
recv_thread.join()
except Exception as e:
print(f"Error: {e} {hostname},{port},{username},{password}")
finally:
ssh.close()
def remote_exec(alias,nodename,hostname,port,username,password,cmdline,flog,is_blocking):
if is_blocking:
mode=""
else:
mode="-d"
ssh_requests = f'''
docker run --rm {mode} --name {alias} -e NODENAME={nodename} --privileged --net=host {docker_image_url} {cmdline}
sleep 1
exit
'''
ssh_interactive_shell(hostname, port, username, password, ssh_requests,flog)
def stop_docker(hostname,port,username,password,flog):
print(f"stop_docker:{hostname}")
ssh_interactive_shell(hostname, port, username, password,
f"docker stop publish;docker pull {docker_image_url};exit",flog)
if __name__ == "__main__":
output_file = "log.txt"
flog = open(output_file, 'wb')
for nodename,hostname,port,username,password in hosts:
stop_docker(hostname,port,username,password,flog)
for nodename,hostname,port,username,password in hosts:
cmdline=f"python /mnt/reset.py {redis_server}"
remote_exec("reset",nodename,hostname,port,username,password,cmdline,flog,True)
for nodename,hostname,port,username,password in hosts:
cmdline=f"python /mnt/publish.py {redis_server}"
remote_exec("publish",nodename,hostname,port,username,password,cmdline,flog,False)
time.sleep(2)
for nodename,hostname,port,username,password in hosts:
cmdline=f"python /mnt/benchmark.py {redis_server}"
remote_exec("benchmark",nodename,hostname,port,username,password,cmdline,flog,True)
for nodename,hostname,port,username,password in hosts:
stop_docker(hostname,port,username,password,flog)
flog.close()
EOF
9.提交 Docker 镜像
docker commit net_scan <镜像地址>
docker push <镜像地址>
10.重新进入 net_scan
容器,运行自动化脚本
docker exec -it net_scan bash
创建 hostfile
文件:
cat > hostfile << 'EOF'
<NODE1> <IP> <端口> <用户名> <密码>
<NODE2> <IP> <端口> <用户名> <密码>
EOF
运行自动化脚本:
python /mnt/auto.py <Redis 服务 IP> <镜像地址> hostfile
python /mnt/summary.py <Redis 服务 IP>
执行完上述步骤后,将生成集群内主机链路带宽的拓扑图 topo.png
,可用于分析集群网络性能。