PyTorch DDP快速上手附代码
如标题所示,博主有任务 想要快速上手DDP以测试RDMA介入下的延迟带宽.
环境准备:
验证PyTorch:
python -c "import torch; print(torch.__version__, torch.cuda.device_count())"
核心概念:
进程组:每个GPU对应一个独立进程
AllReduce:跨卡聚合数据(如梯度)
torchrun:官方推荐启动工具
测试两卡all-reduce
看看博主的卡:
import torch
import torch.distributed as dist
dist.init_process_group(backend='nccl')
rank = dist.get_rank()
world_size = dist.get_world_size()
print(f'I am rank: {rank} of world {world_size}')
input_tensor = torch.rand([1024, 1024, 10], dtype=torch.float).to('cuda:%d' % rank)
input_tensor.fill_(1.0)
dist.all_reduce(input_tensor)
print(input_tensor[0][0])
结果:
还想进一步测试不同buffer size下的延迟和带宽情况:
import os
import torch
import torch.distributed as dist
import time
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend='nccl')
rank = dist.get_rank()
torch.cuda.set_device(rank)
world_size = dist.get_world_size()
buffer_sizes = [1, 1024, 2048, 4096, 8192,
256 * 1024, 512 * 1024,
1024 * 1024, 4 * 1024 * 1024,
256 * 1024 * 1024]
def benchmark(size):
input_tensor = torch.ones(size, dtype=torch.float32).to(f'cuda:{rank}')
torch.cuda.synchronize()
start = time.time()
dist.all_reduce(input_tensor)
torch.cuda.synchronize()
elapsed = time.time() - start
if rank == 0:
data_size = size * 4 # float32占4字节
bandwidth = (data_size / elapsed) / (1024**2) # MB/s
print(f"Size: {size/1024:.1f} KB\tLatency: {elapsed*1000:.3f} ms\tBandwidth: {bandwidth:.2f} MB/s")
if rank == 0:
print("Buffer Size (KB)\tLatency (ms)\tBandwidth (MB/s)")
print("------------------------------------------------")
for size in buffer_sizes:
benchmark(size)
dist.destroy_process_group()
结果:
至此 单机两卡h20测试成功 带宽延迟符合预期