一文了解 分布式训练
分布式训练
分布式训练是一种通过多个计算节点(如CPU、GPU或TPU)并行处理训练任务的技术,目的是加速大规模模型的训练过程。以下是分布式训练的简介,包括其定义、主要类型、优点和应用场景
定义
分布式训练是指将深度学习模型的训练任务分散到多个计算节点上进行并行处理。通过这种方式,可以充分利用多个硬件资源,从而显著加快训练速度,提高训练效率。
主要类型
-
数据并行(Data Parallelism)
原理:将训练数据分成多个子集,每个计算节点处理一个子集的数据。所有节点共享相同的模型参数,但在每次迭代中独立计算梯度,并在更新参数时进行同步。
优点:实现简单,易于扩展。适合数据量大但模型结构相对简单的场景。
缺点:需要在节点间同步梯度,可能会增加通信开销。此外,模型参数的同步可能导致训练效率降低。 -
模型并行(Model Parallelism)(了解即可)
原理:将模型的不同部分分配到不同的计算节点上。每个节点只负责模型的一部分计算,并在需要时与其他节点交换数据。
优点:适合模型结构庞大、单个设备无法容纳整个模型的场景。可以有效减少单个设备的内存压力。
缺点:实现复杂,需要仔细设计模型的分割方式,以减少节点间的通信开销。
示例:NVIDIA的Megatron-LM框架。
优点
- 加速训练过程
分布式训练通过并行处理多个任务,显著减少了训练时间。例如,在训练大规模深度学习模型时,单个GPU可能需要数天甚至数周的时间,而使用多个GPU可以将训练时间缩短到数小时或数天。
提高可扩展性 - 分布式训练允许使用更多的计算资源,从而可以处理更大的数据集和更复杂的模型。这使得研究人员和开发者能够探索更先进的模型架构和算法。
减少单点故障风险 - 在分布式系统中,即使某个节点出现故障,其他节点仍然可以继续工作。通过适当的容错机制,可以确保训练过程的连续性。
充分利用硬件资源 - 分布式训练可以充分利用多个GPU或TPU的计算能力,提高硬件资源的利用率,从而在有限的硬件条件下完成更复杂的任务。
- 支持大规模数据处理
分布式训练可以处理大规模数据集,因为数据可以被分割成多个子集,分别在不同的节点上处理。这使得训练过程更加高效,能够更好地利用数据的多样性。
实战 单机多卡(最常见)
- 数据集
#建立数据集
train_data = VideoDataset(ann_file=config.DATA.TRAIN_FILE, data_prefix=config.DATA.ROOT,
labels_file=config.DATA.LABEL_LIST, pipeline=train_pipeline)
#载入数据集
num_tasks = dist.get_world_size()
global_rank = dist.get_rank()
sampler_train = torch.utils.data.DistributedSampler(
train_data, num_replicas=num_tasks, rank=global_rank, shuffle=True
)
train_loader = DataLoader(
train_data, sampler=sampler_train,
batch_size=config.TRAIN.BATCH_SIZE,
num_workers=16,
pin_memory=True,
drop_last=True,
collate_fn=partial(mmcv_collate, samples_per_gpu=config.TRAIN.BATCH_SIZE),
)
dist.get_world_size()
返回机器总的进程数(一卡一进程)dist.get_rank()
返回进程的排名(给每个进程一个标识)(local_rank)torch.utils.data.DistributedSampler(train_data, num_replicas=num_tasks, rank=global_rank, shuffle=True
)
dataset:要分割的数据集。
num_replicas:总的进程数(world_size),即参与训练的设备总数。
rank:当前进程的全局排名(global_rank),用于确定当前进程应该处理的数据子集。
shuffle:是否在每个 epoch 开始时打乱数据。
train_loader = DataLoader( train_data, sampler=sampler_train, batch_size=config.TRAIN.BATCH_SIZE, num_workers=16, pin_memory=True, drop_last=True, collate_fn=partial(mmcv_collate, samples_per_gpu=config.TRAIN.BATCH_SIZE), )
train_data
这是训练数据集,通常是一个 torch.utils.data.Dataset 对象。
sampler=sampler_train
使用 DistributedSampler 来分割数据集,确保每个进程只处理数据集的一个子集。sampler_train 是之前创建的 DistributedSampler 实例。
batch_size=config.TRAIN.BATCH_SIZE
每个进程的局部批量大小。在分布式训练中,全局批量大小是所有进程的局部批量大小之和。
num_workers=16
数据加载器使用的子进程数量。增加 num_workers 可以提高数据加载的效率,但过多的子进程可能会导致内存占用过高或I/O瓶颈。
pin_memory=True
如果设置为 True,数据加载器会将数据加载到 CUDA 固定内存中,这样可以提高数据传输到 GPU 的速度。
drop_last=True
如果设置为 True,最后一个不完整的批次会被丢弃。这在训练过程中很有用,因为最后一个批次可能包含的样本数量少于 batch_size,导致某些操作(如 BatchNorm)出现问题。
collate_fn=partial(mmcv_collate, samples_per_gpu=config.TRAIN.BATCH_SIZE)
自定义的 collate_fn,用于将多个样本组合成一个批次。mmcv_collate 是一个来自 mmcv 库的函数,通常用于处理多模态数据(如图像和文本)。samples_per_gpu 参数指定了每个 GPU 的样本数量。
- 模型
# 从环境变量获取 local_rank
local_rank = int(os.environ['LOCAL_RANK'])
#模型
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], broadcast_buffers=False,
find_unused_parameters=False)
#--------------------------------------------------------------------------------------
# init_distributed
#从环境变量中获取全局排名(RANK)和总进程数(WORLD_SIZE)。
if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
rank = int(os.environ["RANK"])
world_size = int(os.environ['WORLD_SIZE'])
print(f"RANK and WORLD_SIZE in environ: {rank}/{world_size}")
else:
rank = -1
world_size = -1
# 从环境变量获取 local_rank
local_rank = int(os.environ['LOCAL_RANK'])
torch.cuda.set_device(local_rank)
#---------------------------------------------------------------
try:
torch.distributed.init_process_group(backend='nccl', init_method='env://', world_size=world_size, rank=rank)
torch.distributed.barrier(device_ids=[local_rank])
except Exception as e:
print(f"Error initializing process group: {e}")
raise
#--------------------------------------------------
seed = config.SEED + dist.get_rank()
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
cudnn.benchmark = True
#-----------------------------------------------------------
我把代码分为四部分
1model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank], broadcast_buffers=False, find_unused_parameters=False)
作用:将模型包装为分布式模型。
参数:
device_ids=[local_rank]:指定当前进程使用的 GPU 设备。
broadcast_buffers=False:不广播模型的缓冲区(如 BatchNorm 的运行均值和方差)。
find_unused_parameters=False:在每次迭代中不检查是否有未使用的参数(适用于固定模型结构的场景)。
用途:通过 DistributedDataParallel 实现模型的并行化,自动同步梯度和参数
2.torch.cuda.set_device(local_rank)
作用:将当前进程绑定到指定的 GPU 设备上。
用途:确保每个进程只使用一个 GPU,避免多个进程竞争同一个 GPU 资源
torch.distributed.init_process_group
作用:初始化分布式进程组。
参数:
backend=‘nccl’:使用 NVIDIA 的 NCCL 后端,适用于 GPU 分布式训练。
init_method=‘env://’:从环境变量中读取初始化参数。
world_size:总的进程数。
rank:当前进程的全局排名。
用途:设置分布式训练的通信后端和初始化方法。
torch.distributed.barrier
作用:同步所有进程,确保所有进程都到达同一点后再继续执行。
用途:防止某些进程提前进入下一步,导致潜在的同步问题。
4.seed = config.SEED + dist.get_rank()
这行代码的作用是为每个进程设置一个唯一的随机种子,确保在分布式训练中每个进程的随机性是独立的,同时整个训练过程仍然是可复现的
启动命令
CUDA_VISIBLE_DEVICES=0,1,2,3 torchrun --nproc_per_node=4 train.py
CUDA_VISIBLE_DEVICES:就是你当前可用的卡
–nproc_per_node=4 :就是你设置的进程