当前位置: 首页 > article >正文

测试集群内主机链路带宽并生成网络拓扑图

测试集群内主机链路带宽并生成网络拓扑图

  • 一、背景
  • 二、操作步骤
    • 1.修改 Docker 源
    • 2.启动 Redis 服务
    • 3.创建 Python 3.10 容器
    • 4.在容器内安装依赖
    • 5.创建删除 Redis 集合的脚本 `reset.py`
    • 6.创建发布本地 IP 的脚本 `publish.py`
    • 7.创建带宽测试的脚本 `benchmark.py`
    • 8.创建生成网络拓扑图的脚本 `summary.py`
    • 8.创建自动化脚本 `auto.py`
    • 9.提交 Docker 镜像
    • 10.重新进入 `net_scan` 容器,运行自动化脚本

在这里插入图片描述

一、背景

我们希望生成集群内主机之间每条链路的带宽图。方案如下:

  • 通过 Redis 共享数据
  • 枚举所有的网卡,向 Redis 发布自己的 IP,监听请求,用于客户端测试发送带宽
  • 从 Redis 接受发布的 IP 并记录,尝试连接,如果连接成功则发送 2MB 的数据,测试发送带宽
  • 将测试记录(主机名、源 IP、目的 IP、发送带宽)发送到 Redis 服务器
  • 使用独立的 Python 程序读取 Redis 上的测试记录,生成网络拓扑图,节点为主机名,边为某个网卡对另一个节点的带宽

二、操作步骤

1.修改 Docker 源

sudo tee /etc/docker/daemon.json <<EOF
{
    "registry-mirrors":[
        "https://docker.1ms.run",
        "https://docker.xuanyuan.me"
    ]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

2.启动 Redis 服务

docker run -d --rm -p 6379:6379 redis:6.2

3.创建 Python 3.10 容器

docker run --shm-size=32g -it --privileged --net=host \
    -v $PWD:/home -w /home \
    --name net_scan python:3.10 /bin/bash

4.在容器内安装依赖

pip install redis
pip install psutil
pip install paramiko
pip install netifaces
pip install networkx
pip install matplotlib
apt install dmidecode -y

5.创建删除 Redis 集合的脚本 reset.py

cat > /mnt/reset.py <<-'EOF'
import redis
import sys
r = redis.Redis(host=sys.argv[1], port=6379)
if r.exists('host_info'):
    r.delete('host_info')

if r.exists('test_results'):
    r.delete('test_results')
print("reset end")
EOF

6.创建发布本地 IP 的脚本 publish.py

cat > /mnt/publish.py <<-'EOF'
import redis
import sys
import socket
import redis
import netifaces
import threading
import subprocess

def gethostname():
    command="dmidecode -s baseboard-serial-number"
    serial_number=subprocess.check_output(command,shell=True).decode().strip()
    import os
    return os.environ.get('NODENAME',serial_number)
    
def get_local_ips():
    """获取本地主机的所有IP地址"""
    ips = []
    interfaces = netifaces.interfaces()
    for iface in interfaces:
        # 排除回环接口
        if iface == 'lo':
            continue
        # 排除虚拟接口
        if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):
            continue
        addrs = netifaces.ifaddresses(iface)
        if netifaces.AF_INET in addrs:
            for addr_info in addrs[netifaces.AF_INET]:
                ip=addr_info['addr']
                ips.append(ip)
    return ips

def publish_host_info(redis_client, hostname, ips):
    """将主机名和IP地址发布到Redis"""
    host_info = {'hostname': hostname, 'ips': ips}
    redis_client.sadd('host_info', str(host_info))

def start_server(ip, port):
    """在指定IP和端口上启动TCP服务器"""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)
    print(f"服务器在 {ip}:{port} 上监听中...")
    while True:
        client_socket, addr = server.accept()
        threading.Thread(target=handle_client, args=(client_socket, addr)).start()

def handle_client(client_socket, addr):
    """处理客户端连接,接收数据"""
    while True:
        data = client_socket.recv(2 * 1024 * 1024)  # 接收2MB数据
        if not data:
            break
    client_socket.close()

def main():
    redis_client = redis.Redis(host=sys.argv[1], port=6379)
    hostname = gethostname()
    ips = get_local_ips()
    publish_host_info(redis_client, hostname, ips)
    port = 5000  # 定义测试使用的端口
    for ip in ips:
        threading.Thread(target=start_server, args=(ip, port)).start()

if __name__ == '__main__':
    main()
EOF

7.创建带宽测试的脚本 benchmark.py

cat > /mnt/benchmark.py <<-'EOF'
import redis
import sys
import socket
import time
import ast
import threading
import netifaces
import subprocess

def gethostname():
    command="dmidecode -s baseboard-serial-number"
    serial_number=subprocess.check_output(command,shell=True).decode().strip()
    import os
    return os.environ.get('NODENAME',serial_number)
    
def get_local_ips():
    """获取本地主机的所有IP地址"""
    ips = []
    interfaces = netifaces.interfaces()
    for iface in interfaces:
        # 排除回环接口
        if iface == 'lo':
            continue
        # 排除虚拟接口
        if iface.startswith(('docker', 'br-', 'veth', 'virbr', 'vnet', 'vmnet', 'vboxnet')):
            continue
        addrs = netifaces.ifaddresses(iface)
        if netifaces.AF_INET in addrs:
            for addr_info in addrs[netifaces.AF_INET]:
                ip=addr_info['addr']
                ips.append(ip)
    return ips


def test_bandwidth(redis_client, local_hostname, local_ip):
    """测试与其他主机的带宽"""
    host_info_list = redis_client.smembers('host_info')
    port = 5000
    local_port=5001
    results = []
    for info_str in host_info_list:
        info = ast.literal_eval(info_str.decode('utf-8'))
        remote_hostname = info['hostname']
        ips = info['ips']
        if remote_hostname == local_hostname:
            continue
        for ip in ips:
            if ip == local_ip:
                continue
            try:
                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                client.bind((local_ip, local_port))
                client.settimeout(5)
                start_time = time.time()
                client.connect((ip, port))
                data = b'x' * (2 * 1024 * 1024)  # 2MB的数据
                client.sendall(data)
                client.shutdown(socket.SHUT_WR)
                end_time = time.time()
                client.close()
                duration = end_time - start_time
                bandwidth = (2 * 8) / duration  # 单位:Mbps
                result = {
                    'source_hostname': local_hostname,
                    'source_ip': local_ip,
                    'dest_hostname': remote_hostname,
                    'dest_ip': ip,
                    'bandwidth': round(bandwidth, 2)
                }
                results.append(result)
                # 将结果发送到Redis
                redis_client.rpush('test_results', str(result))
                print(f"{local_hostname}:[{local_ip}] 与 {remote_hostname}:[{ip}] 的带宽为 {bandwidth:.2f} Mbps")
            except Exception as e:
                #print(f"无法连接到 {ip}:{port},错误:{e}")
                continue

def main():
    redis_client = redis.Redis(host=sys.argv[1], port=6379)
    local_hostname = gethostname()
    local_ips = get_local_ips()
    for local_ip in local_ips:
        test_bandwidth(redis_client, local_hostname, local_ip)

if __name__ == '__main__':
    main()
EOF

8.创建生成网络拓扑图的脚本 summary.py

cat > /mnt/summary.py <<-'EOF'
import redis
import sys
import ast
import networkx as nx
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib

plt.figure(figsize=(19.2, 10.8), dpi=100)

redis_client = redis.Redis(host=sys.argv[1], port=6379)
test_results = redis_client.lrange('test_results', 0, -1)

data=[]
for result_str in test_results:
    result = ast.literal_eval(result_str.decode('utf-8'))
    data.append(result)

# 创建一个有向多重图
G = nx.MultiDiGraph()

# 添加节点和边到图中
for entry in data:
    src_hostname = entry['source_hostname'].replace("NODE","")
    dest_hostname = entry['dest_hostname'].replace("NODE","")
    bandwidth = entry['bandwidth']

    # 添加节点(如果节点已存在则不会重复添加)
    G.add_node(src_hostname)
    G.add_node(dest_hostname)

    # 添加边,带宽作为属性
    G.add_edge(src_hostname, dest_hostname, bandwidth=bandwidth)

# 获取边列表以及对应的带宽,用于设置边的粗细
edges = G.edges(data=True)
bandwidths = [d['bandwidth'] for (u, v, d) in edges]

# 计算边宽的归一化值,使得最大边宽不超过10
max_width = 8  # 定义最大边宽
max_bandwidth = max(bandwidths)  # 最大带宽值
edge_widths = [(bandwidth / max_bandwidth) * max_width for bandwidth in bandwidths]

# 准备边标签,显示带宽
edge_labels = {(u, v): f"{d['bandwidth']}" for u, v, d in edges}

# 设置图片大小为1920x1080像素
plt.figure(figsize=(19.2, 10.8), dpi=100)

# 使用spring布局来定位节点
#pos = nx.spring_layout(G)
pos = nx.circular_layout(G)
# 绘制节点
nx.draw_networkx_nodes(G, pos, node_size=1000, node_color='red')

# 绘制节点标签(主机名)
nx.draw_networkx_labels(G, pos, font_size=12)

# 绘制边,边的粗细根据带宽归一化调整
nx.draw_networkx_edges(G, pos, width=edge_widths, arrowstyle='->', arrowsize=20)

# 从绘图中获取坐标转换器
ax = plt.gca()

# 计算边标签的位置,距离起始节点30个像素
edge_label_pos = {}
pixel_offset = 64  # 偏移量,像素
for (u, v, d) in edges:
    # 获取起点和终点的坐标(数据坐标)
    x_start, y_start = pos[u]
    x_end, y_end = pos[v]

    # 计算边的方向向量
    dx = x_end - x_start
    dy = y_end - y_start

    # 计算边的长度(数据坐标系)
    edge_length = (dx**2 + dy**2) ** 0.5

    # 计算单位方向向量
    ux = dx / edge_length
    uy = dy / edge_length

    # 将数据坐标转换为像素坐标
    start_pixel = ax.transData.transform((x_start, y_start))
    end_pixel = ax.transData.transform((x_end, y_end))

    # 计算边的像素长度
    dx_pixel = end_pixel[0] - start_pixel[0]
    dy_pixel = end_pixel[1] - start_pixel[1]
    edge_length_pixel = (dx_pixel**2 + dy_pixel**2) ** 0.5

    # 计算偏移量在数据坐标系中的长度
    offset_data = (pixel_offset / edge_length_pixel) * edge_length

    # 计算新的标签位置(沿边方向偏移指定的像素距离)
    label_x = x_start + ux * offset_data
    label_y = y_start + uy * offset_data

    # 将标签位置存储起来
    edge_label_pos[(u, v)] = (label_x, label_y)

# 使用 Matplotlib 在指定位置绘制边标签
for (u, v), (x, y) in edge_label_pos.items():
    label = edge_labels[(u, v)]
    plt.text(x, y, label, fontsize=10, ha='center', va='center', bbox=dict(facecolor='white', alpha=0.5, edgecolor='none'))

# 隐藏坐标轴
plt.axis('off')
# 调整图形边距
plt.tight_layout()
plt.savefig('topo.png', dpi=100)
plt.show()
EOF

8.创建自动化脚本 auto.py

cat > /mnt/auto.py <<-'EOF'
import paramiko
import threading
import sys
import time
import os

if len(sys.argv)!=4:
    print(f"{sys.argv[0]} redis_server docker_image_url hostfile")
    os._exit(0)
    
redis_server=sys.argv[1]
docker_image_url=sys.argv[2]
hosts=[]

with open(sys.argv[3],"r") as f:
    for line in f.readlines():
        hosts.append([x.strip() for x in line.split(" ")])

def ssh_interactive_shell(hostname, port, username, password,ssh_requests,flog):
    ssh = paramiko.SSHClient()
    #print(hostname,ssh_requests)
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        ssh.connect(hostname=hostname, port=port, username=username, password=password)
        channel = ssh.invoke_shell()
        def recv_data():
            while True:
                if channel.recv_ready():
                    data = channel.recv(1024)
                    if not data:
                        break
                    sys.stdout.write(data.decode())
                    sys.stdout.flush()
                    flog.write(data)
                else:
                    if channel.exit_status_ready():
                        break
        recv_thread = threading.Thread(target=recv_data)
        recv_thread.start()
        channel.send(ssh_requests + '\n')
        recv_thread.join()
    except Exception as e:
        print(f"Error: {e} {hostname},{port},{username},{password}")
    finally:
        ssh.close()

def remote_exec(alias,nodename,hostname,port,username,password,cmdline,flog,is_blocking):
    if is_blocking:
        mode=""
    else:
        mode="-d"
    ssh_requests = f'''
docker run --rm {mode} --name {alias} -e NODENAME={nodename} --privileged --net=host {docker_image_url} {cmdline}
sleep 1
exit
    '''
    ssh_interactive_shell(hostname, port, username, password, ssh_requests,flog)

def stop_docker(hostname,port,username,password,flog):
    print(f"stop_docker:{hostname}")
    ssh_interactive_shell(hostname, port, username, password, 
           f"docker stop publish;docker pull {docker_image_url};exit",flog)

if __name__ == "__main__":

    output_file = "log.txt"
    flog = open(output_file, 'wb')

    for nodename,hostname,port,username,password in hosts:
        stop_docker(hostname,port,username,password,flog)
        
    for nodename,hostname,port,username,password in hosts:
        cmdline=f"python /mnt/reset.py {redis_server}"
        remote_exec("reset",nodename,hostname,port,username,password,cmdline,flog,True)
        
    for nodename,hostname,port,username,password in hosts:
        cmdline=f"python /mnt/publish.py {redis_server}"
        remote_exec("publish",nodename,hostname,port,username,password,cmdline,flog,False)
        
    time.sleep(2)
    for nodename,hostname,port,username,password in hosts:
        cmdline=f"python /mnt/benchmark.py {redis_server}"
        remote_exec("benchmark",nodename,hostname,port,username,password,cmdline,flog,True)

    for nodename,hostname,port,username,password in hosts:
        stop_docker(hostname,port,username,password,flog)
        
    flog.close()
EOF

9.提交 Docker 镜像

docker commit net_scan <镜像地址>
docker push <镜像地址>

10.重新进入 net_scan 容器,运行自动化脚本

docker exec -it net_scan bash

创建 hostfile 文件:

cat > hostfile << 'EOF'
<NODE1> <IP> <端口> <用户名> <密码>
<NODE2> <IP> <端口> <用户名> <密码>
EOF

运行自动化脚本:

python /mnt/auto.py <Redis 服务 IP> <镜像地址> hostfile
python /mnt/summary.py <Redis 服务 IP>

执行完上述步骤后,将生成集群内主机链路带宽的拓扑图 topo.png,可用于分析集群网络性能。


http://www.kler.cn/a/509808.html

相关文章:

  • Docker安装PostGreSQL docker安装PostGreSQL 完整详细教程
  • HackMyVM-Klim靶机的测试报告
  • QT笔记- Qt6.8.1 Android编程 添加AndroidManifest.xml文件以支持修改权限
  • Ubuntu VPS 上 Docker 部署 Nginx 服务器详细教程
  • 楚慧杯Web
  • ZooKeeper 核心知识全解析:架构、角色、节点与应用
  • TextButton组件的功能与用法
  • STM32 FreeRTOS 任务挂起和恢复---实验
  • 【机器学习:二十一、避免高偏差和高方差】
  • Java 8 Stream API
  • C++ macro: Variadic macros (可变参数宏)
  • ArkUI概述
  • idea中远程调试中配置的参数说明
  • 3,Linux文件与目录管理命令及其参数(基于Ubuntu示例进行讲解)
  • SOA(面向服务架构)全面解析
  • 合格的前端,使用xlsx
  • 蓝桥杯真题 - 公因数匹配 - 题解
  • 【LLM】Openai-o1及o1类复现方法
  • 《C++11》深入剖析正则表达式库:解锁文本处理的高效之道
  • vue | 插值表达式
  • K近邻算法实战——电影分类算法
  • 迅为瑞芯微RK3562开发板/核心板应用于人脸跟踪、身体跟踪、视频监控、自动语音识别(ASR)、图像分类驾驶员辅助系统(ADAS)...
  • QQ邮箱登录逆向
  • 前端包管理工具npm、pnpm 和 Yarn 的总结对比
  • Python爬虫(5) --爬取网页视频
  • C# (图文教学)在C#的编译工具Visual Studio中使用SQLServer并对数据库中的表进行简单的增删改查--14