当前位置: 首页 > article >正文

ddp pytoch多卡分布式训练

python -m torch.distributed.launch --nproc_per_node=2 main.py

这是PyTorch分布式训练的启动命令。它使用了PyTorch内置的分布式训练工具来启动多个训练进程,并将它们连接在一起以进行并行训练。其中的参数解释如下:

  • python -m torch.distributed.launch: 使用PyTorch内置的分布式训练工具启动训练进程。
  • --nproc_per_node=2: 每个节点使用的GPU数量。在这个例子中,每个节点使用2个GPU。
  • main.py: 需要运行的Python脚本。

当你在多台计算机上运行分布式训练时,你需要在每个节点上运行这个命令,以便启动多个训练进程并将它们连接在一起进行并行训练。在启动命令时,你需要使用不同的参数来指定每个节点的IP地址和端口号等信息,以便它们能够相互通信并共享训练数据。

什么是ddp

DDP(Distributed Data Parallel)是PyTorch提供的一种分布式训练策略,旨在加速模型训练和提高模型训练的效率和可扩展性。DDP使用数据并行的方式将模型和数据分散到多个GPU或机器上进行计算,并使用梯度累积和同步的方法对模型参数进行更新。

在DDP中,每个进程拥有自己的模型副本和数据子集,并在每个迭代中计算模型的损失和梯度。然后,使用reduce操作将每个进程的梯度进行累积和同步,并使用平均梯度对模型参数进行更新。通过这种方式,DDP能够显著提高模型训练的速度和效率,并使得模型训练可以扩展到多个GPU或机器上。

DDP还提供了多种优化策略,如同步BN(Batch Normalization)和随机种子固定等功能,以解决在分布式环境下出现的常见问题,如BN不同步和随机性不一致等问题。

除了提供数据并行的方式来加速模型训练和提高可扩展性外,DDP还提供了以下功能和优化策略:

  1. 同步BN

在使用BN(Batch Normalization)时,通常会计算每个GPU上的均值和标准差,然后使用全部GPU上的均值和标准差进行归一化。然而,这种方法可能会导致BN的均值和标准差不同步,从而影响模型的性能。为了解决该问题,DDP提供了同步BN的功能,可以使得每个GPU上的BN参数保持同步。通过在DDP中使用torch.nn.SyncBatchNorm类替换标准的BatchNorm类,即可实现同步BN的功能。

  1. 随机种子固定

在分布式训练中,由于每个进程拥有自己的随机数生成器,因此可能会导致随机性不一致,从而影响模型的性能和收敛速度。为了解决该问题,DDP提供了随机种子固定的功能,可以使得每个进程使用相同的随机数生成器和随机种子。通过在DDP中使用torch.manual_seed函数固定随机种子,即可实现随机种子固定的功能。

  1. 自适应优化器

DDP还提供了自适应优化器的功能,可以根据每个GPU上的梯度大小和方差自动调整学习率和动量。通过在DDP中使用torch.optim.AdaptiveLRtorch.optim.AdaptiveMomemtum优化器,即可实现自适应优化器的功能。

总之,DDP是一种用于分布式训练的优秀策略,可以显著提高模型训练的速度和效率,同时还提供了多种功能和优化策略,以解决在分布式环境下出现的常见问题。

DDP 原理

DDP(Distributed Data Parallel)是PyTorch中的一种分布式训练策略,它可以在多个GPU或多个机器之间分配和同步数据,从而加速训练过程。

DDP的基本原理如下:

  1. 将模型和数据划分为多个部分,并将它们分配到不同的GPU或机器上。

  2. 每个GPU或机器使用本地数据进行前向传播和反向传播,并计算梯度。

  3. 使用All-Reduce操作将梯度汇总到一个全局梯度中,并使用该全局梯度更新模型参数。

  4. 重复执行步骤2-3,直到训练结束。

DDP的优点是可以在多个GPU或机器之间并行计算并汇总梯度,从而加速训练过程。此外,DDP还可以自动处理数据划分和梯度同步等细节,使得分布式训练更加方便和易用。

在PyTorch中,可以使用torch.nn.parallel.DistributedDataParallel类来实现DDP。使用DDP时,需要在每个进程中初始化分布式环境,指定进程的唯一标识符和通信方式等参数,并将模型和数据分配到每个进程中。然后,使用DistributedDataParallel类对模型进行包装,并使用torch.distributed.launch命令启动多个进程进行训练。在训练过程中,DDP会自动处理梯度的同步和更新等细节,并将训练结果保存在指定的目录中。

同一台机器多张显卡可以使用ddp

是的,DDP可以在一台机器上的多张GPU上运行。在这种情况下,每张GPU都将分配到不同的进程中,并使用本地数据进行前向传播和反向传播,然后使用All-Reduce操作将梯度进行汇总和同步。这样可以利用多张GPU的计算资源,并加速模型的训练过程。

在PyTorch中使用DDP时,需要指定每个进程使用的GPU编号,可以使用CUDA_VISIBLE_DEVICES环境变量来控制。例如,如果有4张GPU可用,可以使用以下命令在4个进程中启动DDP:

CUDA_VISIBLE_DEVICES=0,1,2,3  python -m torch.distributed.launch --nproc_per_node=4 your_training_script.py

这样就可以在一台机器上的4张GPU上使用DDP进行分布式训练。在训练过程中,每个进程都会使用指定的GPU进行计算,并自动处理梯度的同步和更新等细节。

从使用角度学习DDP

手写AI

# 1. 导包:一些需要导入的库
# 模型相关
from torch.nn.parallel import DistributedDataParallel as DDP
# 数据相关
from torch.utils.data.distributed import DistributedSampler
# ddp自身的机制相关
import torch.distributed as dist

# 2.后端多卡通讯及GPU序号(RANK)
if DDP_ON:
    init_process_group(backend="nccl")
    LOCAL_RANK = device_id = int(os.environ["LOCAL_RANK"])
    WORLD_SIZE = torch.cuda.device_count()

    device = torch.device('cuda', device_id) # note that device_id is an integer but device is a datetype.
    print(f"Start running basic DDP on rank {LOCAL_RANK}.")
    logging.info(f'Using device {device_id}')

# 3. DDP model
net = DDP(net, device_ids = [device_id], output_device=device_id)


# 4.喂数据给多卡
loader_args = dict(batch_size=batch_size, num_workers=WORLD_SIZE*4, pin_memory=True) # batchsize is for a single proc
if DDP_ON:
    train_sampler = DistributedSampler(train_set)
    train_loader = DataLoader(train_set, sampler=train_sampler, **loader_args)
else:
    train_loader = DataLoader(train_set, shuffle=True, **loader_args)
    
# no need for distributed sampler for val
val_loader = DataLoader(val_set, shuffle=False, drop_last=True, **loader_args)


# 5.set_epoch 防止每次数据都是一样的(如下图)
# ref: https://blog.csdn.net/weixin_41978699/article/details/121742647
for epoch in range(start, start+epochs):
    if LOCAL_RANK == 0:
        print('lr: ', optimizer.param_groups[0]['lr']) 

    net.train()
    epoch_loss = 0

    # To avoid duplicated data sent to multi-gpu
    train_loader.sampler.set_epoch(epoch)
torchrun --nproc_per_node=4 \
          multigpu_torchrun.py \
          --batch_size 4 \
          --lr 1e-3
python -m torch.distributed.launch \
      --nproc_per_node = 4 \
        train.py \
      --batch_size 4
import argparse
import logging
import sys
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
import wandb
from torch import optim
from torch.utils.data import DataLoader, random_split
from tqdm import tqdm

from utils.data_loading import BasicDataset, CarvanaDataset
from utils.dice_score import dice_loss
from evaluate import evaluate
from unet import UNet
import os
import torch.distributed as dist

# for reproducibility
import random
import numpy as np
import torch.backends.cudnn as cudnn

# ABOUT DDP
# for model loading in ddp mode
from torch.nn.parallel import DistributedDataParallel as DDP
# for data loading in ddp mode
from torch.utils.data.distributed import DistributedSampler

import torch.multiprocessing as mp
from torch.distributed import init_process_group, destroy_process_group



def init_seeds(seed=0, cuda_deterministic=True):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    # Speed-reproducibility tradeoff https://pytorch.org/docs/stable/notes/randomness.html
    if cuda_deterministic:  # slower, more reproducible
        cudnn.deterministic = True
        cudnn.benchmark = False
    else:  # faster, less reproducible
        cudnn.deterministic = False
        cudnn.benchmark = True

def train_net(net,
              device,
              start: int = 0,
              epochs: int = 5,
              batch_size: int = 1,
              learning_rate: float = 1e-5,
              val_percent: float = 0.1,
              save_checkpoint: bool = True,
              img_scale: float = 0.5,
              amp: bool = False):
    

    if DDP_ON: # modify the net's attributes when using ddp
        net.n_channels = net.module.n_channels
        net.n_classes  = net.module.n_classes

    # 1. Create dataset
    try:
        dataset = CarvanaDataset(dir_img, dir_mask, img_scale)
    except (AssertionError, RuntimeError):
        dataset = BasicDataset(dir_img, dir_mask, img_scale)

    # 2. Split into train / validation partitions
    n_val = int(len(dataset) * val_percent)
    n_train = len(dataset) - n_val
    train_set, val_set = random_split(dataset, [n_train, n_val], generator=torch.Generator().manual_seed(0))

    # 3. Create data loaders
    loader_args = dict(batch_size=batch_size, num_workers=WORLD_SIZE*4, pin_memory=True) # batchsize is for a single process(GPU)

    if DDP_ON:
        train_sampler = DistributedSampler(train_set)
        train_loader = DataLoader(train_set, sampler=train_sampler, **loader_args)
    else:
        train_loader = DataLoader(train_set, shuffle=True, **loader_args)
    
    
    # no need for distributed sampler for val
    val_loader = DataLoader(val_set, shuffle=False, drop_last=True, **loader_args)
    
    # (Initialize logging)
    if LOCAL_RANK == 0:
        experiment = wandb.init(project='U-Net-DDP', resume='allow', anonymous='must')
        experiment.config.update(dict(epochs=epochs, batch_size=batch_size, learning_rate=learning_rate,
                                  val_percent=val_percent, save_checkpoint=save_checkpoint, img_scale=img_scale,
                                  amp=amp))
            
        logging.info(f'''Starting training:
                Epochs:          {epochs}
                Start from:      {start}
                Batch size:      {batch_size}
                Learning rate:   {learning_rate}
                Training size:   {n_train}
                Validation size: {n_val}
                Checkpoints:     {save_checkpoint}
                Device:          {device.type}
                Images scaling:  {img_scale}
                Mixed Precision: {amp}
            ''')

    # 4. Set up the optimizer, the loss, the learning rate scheduler and the loss scaling for AMP
    criterion = nn.CrossEntropyLoss() 
    
    optimizer = optim.AdamW(net.parameters(), lr=learning_rate, weight_decay=1e-8)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=epochs, eta_min=1e-7)
    grad_scaler = torch.cuda.amp.GradScaler(enabled=amp)
    global_step = 0

    # 5. Begin training
    for epoch in range(start, start+epochs):
        if LOCAL_RANK == 0:
            print('lr: ', optimizer.param_groups[0]['lr']) 
        
        net.train()
        epoch_loss = 0

        # To avoid duplicated data sent to multi-gpu
        train_loader.sampler.set_epoch(epoch)

        disable = False if LOCAL_RANK == 0 else True

        with tqdm(total=n_train, desc=f'Epoch {epoch}/{epochs+start}', unit='img', disable=disable) as pbar:
            for batch in train_loader:
                images = batch['image']
                true_masks = batch['mask']
                    
                assert images.shape[1] == net.n_channels, \
                    f'Network has been defined with {net.n_channels} input channels, ' \
                    f'but loaded images have {images.shape[1]} channels. Please check that ' \
                    'the images are loaded correctly.'

                images = images.to(device=device, dtype=torch.float32)
                true_masks = true_masks.to(device=device, dtype=torch.long)

                with torch.cuda.amp.autocast(enabled=amp):
                    masks_pred = net(images)
                    loss = criterion(masks_pred, true_masks) \
                           + dice_loss(F.softmax(masks_pred, dim=1).float(),
                                       F.one_hot(true_masks, net.n_classes).permute(0, 3, 1, 2).float(),
                                       multiclass=True)

                optimizer.zero_grad(set_to_none=True)
                grad_scaler.scale(loss).backward()
                grad_scaler.step(optimizer)
                grad_scaler.update()

                pbar.update(images.shape[0])
                global_step += 1
                epoch_loss += loss.item()

                if LOCAL_RANK == 0:
                    experiment.log({
                        'train loss': loss.item(),
                        'step': global_step,
                        'epoch': epoch
                    })
                pbar.set_postfix(**{'loss (batch)': loss.item()})

                # Evaluation round
                division_step = (n_train // (5 * batch_size))
                if division_step > 0:
                    if global_step % division_step == 0:
                        histograms = {}
                        for tag, value in net.named_parameters():
                            tag = tag.replace('/', '.')
                            if not torch.isinf(value).any():
                                histograms['Weights/' + tag] = wandb.Histogram(value.data.cpu())
                            if not torch.isinf(value.grad).any():
                                histograms['Gradients/' + tag] = wandb.Histogram(value.grad.data.cpu())

                        val_score = evaluate(net, val_loader, device, disable_log = disable)

                        if LOCAL_RANK == 0:
                            logging.info('Validation Dice score: {}'.format(val_score))
                            experiment.log({
                                'learning rate': optimizer.param_groups[0]['lr'],
                                'validation Dice': val_score,
                                'images': wandb.Image(images[0].cpu()),
                                'masks': {
                                    'true': wandb.Image(true_masks[0].float().cpu()),
                                    'pred': wandb.Image(masks_pred.argmax(dim=1)[0].float().cpu()),
                                },
                                'step': global_step,
                                'epoch': epoch,
                                **histograms
                            })
        scheduler.step()
        if save_checkpoint and LOCAL_RANK == 0 and (epoch % args.save_every == 0):
            Path(dir_checkpoint).mkdir(parents=True, exist_ok=True)
            torch.save(net.module.state_dict(), str(dir_checkpoint / 'DDP_checkpoint_epoch{}.pth'.format(epoch)))
            
            logging.info(f'Checkpoint {epoch} saved!')


##################################### arguments ###########################################
parser = argparse.ArgumentParser(description='Train the UNet on images and target masks')
parser.add_argument('--epochs', '-e', metavar='E', type=int, default=5, help='Number of epochs')
parser.add_argument('--batch-size', '-b', dest='batch_size', metavar='B', type=int, default=1, help='Batch size')
parser.add_argument('--learning-rate', '-l', metavar='LR', type=float, default=1e-5,
                    help='Learning rate', dest='lr')
parser.add_argument('--load', '-f', type=str, default=False, help='Load model from a .pth file')
parser.add_argument('--scale', '-s', type=float, default=0.5, help='Downscaling factor of the images')
parser.add_argument('--validation', '-v', dest='val', type=float, default=10.0,
                    help='Percent of the data that is used as validation (0-100)')
parser.add_argument('--amp', action='store_true', default=False, help='Use mixed precision')
parser.add_argument('--bilinear', action='store_true', default=False, help='Use bilinear upsampling')
parser.add_argument('--classes', '-c', type=int, default=2, help='Number of classes')
parser.add_argument('--exp_name', type=str, default='hgb_exp')
parser.add_argument('--ddp_mode', action='store_true')
parser.add_argument('--save_every', type=int, default=5)
parser.add_argument('--start_from', type=int, default=0)




args = parser.parse_args()

dir_img = Path('./data/imgs/')
dir_mask = Path('./data/masks/')
dir_checkpoint = Path('./checkpoints/')

DDP_ON = True if args.ddp_mode else False

#########################################################################################

if DDP_ON:
    init_process_group(backend="nccl")
    LOCAL_RANK = device_id = int(os.environ["LOCAL_RANK"])
    WORLD_SIZE = torch.cuda.device_count()

    device = torch.device('cuda', device_id) # note that device_id is an integer but device is a datetype.
    print(f"Start running basic DDP on rank {LOCAL_RANK}.")
    logging.info(f'Using device {device_id}')


if __name__ == '__main__':
    #!highly recommended]
    # ref: pytorch org ddp tutorial 
    # 1. https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html#multi-gpu-training-with-ddp
    # 2. https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html
    
    init_seeds(0)
    # Change here to adapt to your data
    # n_channels=3 for RGB images
    # n_classes is the number of probabilities you want to get per pixel
    net = UNet(n_channels=3, n_classes=args.classes, bilinear=args.bilinear)
    
    if LOCAL_RANK == 0:
        print(f'Network:\n'
            f'\t{net.n_channels} input channels\n'
            f'\t{net.n_classes} output channels (classes)\n'
            f'\t{"Bilinear" if net.bilinear else "Transposed conv"} upscaling')

    if args.load:
        # ref: https://blog.csdn.net/hustwayne/article/details/120324639  use method 2 with module
        # net.load_state_dict(torch.load(args.load, map_location=device))
        net.load_state_dict({k.replace('module.', ''): v for k, v in                 
                       torch.load(args.load, map_location=device).items()})

        logging.info(f'Model loaded from {args.load}')


    torch.cuda.set_device(LOCAL_RANK)
    net.to(device=device)
    # wrap our model with ddp
    net = DDP(net, device_ids = [device_id], output_device=device_id)

    try:
        train_net(net=net,
                  start=args.start_from,
                  epochs=args.epochs,
                  batch_size=args.batch_size,
                  learning_rate=args.lr,
                  device=device,
                  img_scale=args.scale,
                  val_percent=args.val / 100,
                  amp=args.amp)
    except KeyboardInterrupt:
        torch.save(net.module.state_dict(), 'INTERRUPTED_DDP.pth')
        logging.info('Saved interrupt')
        raise
    destroy_process_group()

如何使用ddp?

使用DDP进行分布式训练可以加速深度学习模型的训练过程,减少训练时间。下面是使用PyTorch实现DDP的一般步骤:

  1. 初始化分布式环境

在进行DDP训练之前,需要初始化分布式环境。可以使用torch.distributed.init_process_group函数来初始化分布式环境,该函数需要指定分布式后端、主机名、端口号、进程编号等参数。例如,以下代码初始化4个进程的分布式环境:

import torch
import torch.distributed as dist

dist.init_process_group(backend="nccl", init_method="tcp://localhost:23456", rank=0, world_size=4)
  1. 加载数据和模型

在初始化分布式环境后,需要加载训练数据和模型。可以使用PyTorch的数据加载器(如torch.utils.data.DataLoader)加载训练数据,并使用torch.nn.parallel.DistributedDataParallel类对模型进行包装,以便在多个GPU或机器之间分配和同步数据。例如,以下代码加载训练数据和模型,并对模型进行DDP包装:

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.parallel

# 加载训练数据
train_dataset = ...

# 定义模型
model = ...

# 使用DDP包装模型
model = nn.parallel.DistributedDataParallel(model, device_ids=[torch.cuda.current_device()])
  1. 定义优化器和损失函数

在加载数据和模型后,需要定义优化器和损失函数。可以使用PyTorch的优化器(如torch.optim.SGD)定义优化器,并使用PyTorch的损失函数(如torch.nn.CrossEntropyLoss)定义损失函数。例如,以下代码定义了一个SGD优化器和一个交叉熵损失函数:

import torch.optim as optim
import torch.nn as nn

# 定义优化器
optimizer = optim.SGD(model.parameters(), lr=0.01)

# 定义损失函数
criterion = nn.CrossEntropyLoss()
  1. 训练模型

在定义优化器和损失函数后,可以开始训练模型。可以使用PyTorch的训练循环(如for循环)遍历训练数据集,并在每个迭代中计算模型的损失和梯度,并使用优化器更新模型参数。例如,以下代码展示了一个简单的训练循环:

for epoch in range(num_epochs):
    for inputs, targets in train_dataset:
        # 将输入和目标数据移到GPU上
        inputs = inputs.to(device)
        targets = targets.to(device)

        # 计算模型输出
        outputs = model(inputs)

        # 计算损失函数值
        loss = criterion(outputs, targets)

        # 计算梯度并更新模型参数
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
  1. 清理分布式环境

在训练完成后,需要清理分布式环境。可以使用torch.distributed.destroy_process_group函数来清理分布式环境,例如:

dist.destroy_process_group()

需要注意的是,在使用DDP进行分布式训练时,需要确保每个进程使用的GPU编号是不同的,以避免数据重复计算和同步问题。可以使用CUDA_VISIBLE_DEVICES环境变量来指定每个进程可以使用的GPU编号。例如,可以使用以下命令来启动4个进程,每个进程使用不同的GPU:

CUDA_VISIBLE_DEVICES=0 python train.py --rank 0 --world-size 4
CUDA_VISIBLE_DEVICES=1 python train.py --rank 1 --world-size 4
CUDA_VISIBLE_DEVICES=2 python train.py --rank 2 --world-size 4
CUDA_VISIBLE_DEVICES=3 python train.py --rank 3 --world-size 4

其中,--rank参数指定当前进程的编号,--world-size参数指定进程总数。

mmseg的ddp

mmseg支持使用DDP进行分布式训练,以下是使用mmseg进行DDP训练的一般步骤:

  1. 初始化分布式环境

在进行DDP训练之前,需要初始化分布式环境。可以使用torch.distributed.launch命令启动多个进程,并使用--nproc_per_node参数指定每个节点使用的GPU数量。例如,以下命令启动4个进程,每个进程使用1个GPU:

python -m torch.distributed.launch --nproc_per_node=1 train.py --launcher pytorch

train.py脚本中,需要使用mmcv.runner.init_dist函数初始化分布式环境。例如:

import mmcv.runner

mmcv.runner.init_dist()
  1. 加载数据和模型

在初始化分布式环境后,需要加载训练数据和模型。可以使用mmseg的数据加载器(如mmseg.datasets.build_dataset)加载训练数据,并使用mmseg.models.build_segmentor函数构建分割模型。例如,以下代码加载训练数据和模型:

from mmseg.datasets import build_dataset
from mmseg.models import build_segmentor

# 加载训练数据
train_dataset = build_dataset(cfg.data.train)

# 构建分割模型
model = build_segmentor(cfg.model)
  1. 定义优化器和学习率调度器

在加载数据和模型后,需要定义优化器和学习率调度器。可以使用mmseg的优化器(如mmcv.optim.build_optimizer)定义优化器,并使用mmseg的学习率调度器(如mmcv.runner.build_lr_scheduler)定义学习率调度器。例如,以下代码定义了一个SGD优化器和一个余弦退火学习率调度器:

from mmcv.optim import build_optimizer
from mmcv.runner import build_lr_scheduler

# 定义优化器
optimizer = build_optimizer(model, cfg.optimizer)

# 定义学习率调度器
lr_scheduler = build_lr_scheduler(
    optimizer, cfg.lr_scheduler, total_iters_per_epoch=len(train_dataset))
  1. 构建DDP模型

在定义优化器和学习率调度器后,需要使用mmcv.runner.DistributedDataParallel类将模型进行包装,以便在多个GPU或机器之间分配和同步数据。例如,以下代码构建一个DDP模型:

from mmcv.runner import DistSamplerSeedHook, Runner

# 构建DDP模型
model = mmcv.runner.DistributedDataParallel(
    model.cuda(),
    device_ids=[torch.cuda.current_device()],
    broadcast_buffers=False)

# 定义分布式采样器的随机数种子
dist_sampler_seed = cfg.get('dist_sampler_seed', None)

# 构建Runner对象
runner = Runner(
    model,
    batch_processor,
    optimizer,
    work_dir=cfg.work_dir,
    logger=logger,
    meta=cfg.get('meta', {}),
    max_iters=num_iterations,
    dist_sampler_seed=dist_sampler_seed)

# 注册分布式采样器的随机数种子
if dist_sampler_seed is not None:
    runner.register_hook(DistSamplerSeedHook(dist_sampler_seed))
  1. 训练模型

在构建DDP模型后,可以使用mmseg的训练器(如mmcv.runner.IterBasedRunner)进行分布式训练。可以使用mmseg的训练循环(如for循环)遍历训练数据集,并在每个迭代中计算模型的损失和梯度,并使用优化器更新模型的参数。例如,以下代码训练分割模型:

from mmcv.runner import IterBasedRunner

# 构建训练循环
runner = IterBasedRunner(
    model,
    batch_processor,
    optimizer,
    work_dir=cfg.work_dir,
    logger=logger,
    meta=cfg.get('meta', {}),
    max_iters=num_iterations,
    iter_based=True)

# 开始训练
runner.run(train_loader, valid_loader=valid_loader, lr_scheduler=lr_scheduler)

参考资料

pytorch官方

  1. https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html#multi-gpu-training-with-ddp

  2. https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html

使用多卡训练可以显著提高深度学习模型的训练速度和效率。在PyTorch中,可以使用DataParallel或DDP等工具来实现多卡训练。下面是使用DataParallel进行多卡训练的示例代码:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor

# 定义模型
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout2d(0.25)
        self.dropout2 = nn.Dropout2d(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = nn.functional.relu(x)
        x = self.conv2(x)
        x = nn.functional.relu(x)
        x = nn.functional.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = nn.functional.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = nn.functional.log_softmax(x, dim=1)
        return output

# 加载数据
train_data = MNIST(root='data', train=True, transform=ToTensor(), download=True)
train_loader = DataLoader(train_data, batch_size=64, shuffle=True)

# 定义模型、损失函数和优化器
model = Net()
criterion = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

# 多卡训练
device_ids = [0, 1, 2, 3] # 定义使用的GPU设备编号
model = nn.DataParallel(model, device_ids=device_ids) # 将模型包装为DataParallel模型
model.to(device_ids[0]) # 将模型和数据移动到第一个GPU设备上

num_epochs = 10
for epoch in range(num_epochs):
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device_ids[0]), target.to(device_ids[0])
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 100 == 0:
            print('Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

在上述代码中,我们首先定义了一个包含两个卷积层、两个全连接层和Dropout层的卷积神经网络模型,并加载了MNIST数据集。然后,我们使用DataParallel工具将模型包装为多卡模型,并指定使用的GPU设备编号。接着,我们将模型和数据移动到第一个GPU设备上,并使用多个GPU设备来进行训练。在训练过程中,我们需要将数据和模型移动到使用的GPU设备上,并在反向传播时累积梯度并更新模型参数。

DataParallel和DDP都是PyTorch中用于实现多卡训练的工具,但它们有一些不同之处。

区别:

  1. 数据并行方式不同

DataParallel采用数据并行的方式,即将输入数据划分为多份,在多个GPU设备上分别计算,然后将结果合并。每个GPU设备上都有一个完整的模型,并独立地计算梯度,最后将梯度合并并更新模型参数。

DDP采用模型并行的方式,即将模型分为多份,在多个GPU设备上分别计算,然后通过交换信息实现模型参数的更新。每个GPU设备上只有模型的一部分,并且每个设备上的模型参数都会随着训练的进行而不断更新。

  1. 通信方式不同

DataParallel使用All-Reduce算法对梯度进行合并,在每个GPU设备上计算梯度后,通过All-Reduce算法将梯度合并到单个梯度中,然后使用单个梯度来更新模型参数。

DDP使用分布式同步机制来更新模型参数,每个GPU设备上的模型参数都会在训练过程中进行同步。DDP使用了一种称为“全局同步”的方法,其中所有进程都在每个训练步骤中等待最慢进程完成计算,然后使用相同的参数更新所有进程的模型。

联系:

  1. 都可以在多个GPU设备上实现模型的并行计算和参数更新。

  2. 都需要在训练过程中对数据和模型进行划分和同步。

  3. 都可以显著提高模型训练的速度和效率。

选择使用哪种工具取决于不同的应用场景和硬件条件。如果您有多个GPU设备,并且每个设备都有足够的内存来存储模型和数据,那么可以考虑使用DataParallel。如果您的模型非常大,并且需要在多个节点上进行训练,那么可以考虑使用DDP。

无法一概而论哪种多卡训练方式更好,选择哪种方式取决于具体场景和需求。

DataParallel的优点在于实现简单,易于使用,可以在单个节点的多个GPU设备上进行训练,适用于小型或中等规模的模型。但是,DataParallel的缺点在于需要复制整个模型到每个GPU设备上,因此对于大型模型和数据集,可能会导致内存不足,训练速度变慢。

DDP的优点在于可以在多个节点上进行训练,适用于大型模型和数据集。DDP采用模型并行的方式,可以将模型分解为多个部分,从而使得每个部分可以在单个GPU设备上进行训练。此外,DDP使用了分布式同步机制来更新模型参数,可以避免由于网络传输的延迟和带宽限制而导致的训练效率下降。但是,DDP的实现比较复杂,需要对分布式环境和同步机制有一定的了解。

因此,选择哪种多卡训练方式应该根据具体的应用场景和需求来决定。如果您只有单个节点和多个GPU设备,并且模型较小,那么可以选择DataParallel;如果您的模型较大,需要在多个节点上进行训练,并且具备分布式环境的条件,那么可以选择DDP。


http://www.kler.cn/a/15634.html

相关文章:

  • PlantUML——时序图
  • 【项目开发】URL中井号(#)的技术细节
  • AWTK-WIDGET-WEB-VIEW 实现笔记 (4) - Ubuntu
  • 【异常解决】Linux shell报错:-bash: [: ==: 期待一元表达式 解决方法
  • linux笔记(防火墙)
  • 【软件测试】一个简单的自动化Java程序编写
  • Vue3之setup参数介绍
  • Java学习过程(韩顺平661-665)
  • 浅谈测试用例设计 | 京东云技术团队
  • 【HQL - 查询用户的累计消费金额及VIP等级】
  • 霍兰德人格分析雷达图
  • 【Qt】根据界面所在显示器自适应调整ui大小
  • 省钱!NewBing硬核新玩法;手把手教你训练AI模特;用AI替代同事的指南;B站最易上手AI绘画教程 | ShowMeAI日报
  • Raft 共识算法4-选举限制
  • 【JavaEE初阶】多线程(四)阻塞队列 定时器 线程池
  • async函数学习总结
  • Navicat和Dbeaver有什么区别
  • Java --- springboot2的静态资源配置原理
  • 轻量级「行泊一体」爆发前夜!这家智驾Tier1正加码抢占市场
  • VC++运行时库整理
  • Word转PDF:简单步骤,轻松完成!推荐两个实现的方法
  • 为什么重写equals时必须重写hashCode()
  • 【容器化应用程序设计和开发】2.2 Dockerfile 的编写和最佳实践
  • ChatGPT会一直火热下去吗?他会是下一个AR,区块链吗?
  • WRF模式应用:天气预报、模拟分析观测气温、降水、风场、水汽和湿度、土地利用变化、土壤及近地层能量水分通量、土壤、水体、植被等相关气象变量
  • 分治算法(Divide and Conquer)