GPU集群上分布式训练大模型
总结一下如何在超算系统上进行预训练大模型的分布式训练 / 微调,文中代码已上传至 github
实验环境
集群1:国家广州超算 星逸A800智能AI集群
GPU:8 * Nvdia Tesla-A800 80G显存
CPU:2 * 28核 Intel Xeon Gold 6348
内存:1024GB
集群2:并行科技 中国国家网格 N12 区(cngrid12)
GPU:4 * Nvdia Tesla-V100 16G显存
CPU:20 核 Intel® Xeon® CPU E5-2640 v4
内存:128GB
在超算分布式环境上和本地训练有几点不同:
- 超算环境无法科学上网,需要手动下载并上传:数据、tokenizer、模型和模型参数,并在代码中作相应修改。
- 通过 slurm 进行作业管理,编写并提交 sbatch 脚本来运行作业。
- 每个集群的环境各不相同,移植时需要注意配置环境和预加载相关的库。
- 训练超大模型时,单个GPU显存有限,仅使用
torch.nn.parallel
数据并行常常无法加载完整模型无法完成训练,或只能小批次训练。因此需要用到分布式训练框架,常见的分布式训练框架有Horovod,Megatron-LM,DeepSpeed等。
1 举例:bert-large
1.1 本地单卡训练bert-large(假设GPU显存足够大)
Step1 编写训练代码 run_bert.py
:
import torch
from transformers import BertTokenizer, BertForMaskedLM, DataCollatorForLanguageModeling
from datasets import load_dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
# 加载预训练的tokenizer和模型
tokenizer = BertTokenizer.from_pretrained("bert-large-uncased")
model = BertForMaskedLM.from_pretrained("bert-large-uncased")
# 加载WikiText数据集
dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train")
# 数据预处理
def tokenize_function(examples):
return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)
tokenized_dataset = dataset.map(tokenize_function, batched=True, remove_columns=["text"])
# 设置数据加载器
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)
train_loader = DataLoader(tokenized_dataset, batch_size=4, shuffle=True, collate_fn=data_collator) # 根据显存调整batch_size
# 设置设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 设置优化器
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)
# 设置训练参数
num_epochs = 3
gradient_accumulation_steps = 8 # 梯度累积步数,根据显存调整
model.train()
# 手动实现训练循环
for epoch in range(num_epochs):
print(f"Epoch {epoch + 1}/{num_epochs}")
epoch_loss = 0
for step, batch in enumerate(tqdm(train_loader)):
# 将数据移到GPU
batch = {k: v.to(device) for k, v in batch.items()}
# 前向传播
outputs = model(**batch)
loss = outputs.loss
loss = loss / gradient_accumulation_steps # 梯度累积
# 反向传播
loss.backward()
# 更新参数并清空梯度
if (step + 1) % gradient_accumulation_steps == 0:
optimizer.step()
optimizer.zero_grad()
# 记录损失
epoch_loss += loss.item()
avg_loss = epoch_loss / len(train_loader)
print(f"Epoch {epoch + 1} finished with average loss: {avg_loss:.4f}")
print("Training complete.")
Step2 运行:
python run_bert.py
1.2 迁移到分布式训练(微调大模型)
按如下步骤转换为分布式训练代码,并移植到超算平台上完成训练(单节点多卡):
Step1 下载数据、tokenizer、模型
从huggingface官网或镜像网站(国内)下载对应文件,模型和tokenizer搜索bert-large-uncased,数据集搜索wikitext:
模型:config.json,pytorch_model.bin
数据:train-00000-of-00001.parquet
tokenizer:tokenizer.json,vocab.txt,tokenizer_config.json(可选)
Step2 修改训练代码 run_bert.py
:
- 分布式训练框架我用微软提供的deepspeed框架进行训练。deepspeed支持3D并行训练,同时集成了一些优化,如ZeRO、CPU-offload、混合精度训练等,能够提供比原生pytorch更加高效的训练。
- 修改内容:
- tokenizer、model、dataset 分别修改为从本地加载
- 添加 import deepspeed 和 import torch.distributed as dist
- 添加 deepspeed.init_distributed() 初始化分布式环境
- 添加 deepspeed.initialize() 将模型转换为deepspeed模型
- 获取 world_size 和 rank ,并将 .to(device) 替换为 .to(rank)
- 删除 device, optimizer, gradient_accumulation_steps 配置,转移到 ds_config.json 中
- 将所有的 model 替换为 model_engine ,即deepspeed模型
- 训练循环中只保留 model_engine(input_ids), model_engine.backward(), model_engine.step() 三个函数
- 添加 time.time() 函数用于计时
- 修改数据集加载和预处理逻辑,增加sampler,删除shuffle选项,使其符合分布式训练
- 对模型和数据添加 .contiguous() 以保证 tensor 的连续性
import torch
from transformers import BertTokenizer, BertForMaskedLM, DataCollatorForLanguageModeling
from datasets import load_dataset
from torch.utils.data import DataLoader, DistributedSampler
from tqdm import tqdm
import deepspeed
import torch.distributed as dist
import time
# 初始化分布式环境
deepspeed.init_distributed()
world_size = dist.get_world_size()
rank = dist.get_rank()
# 加载预训练的tokenizer和模型
tokenizer = BertTokenizer.from_pretrained("./tokenizer/")
model = BertForMaskedLM.from_pretrained("./model")
# 加载WikiText数据集
dataset = load_dataset("parquet", data_files="./data/train-00000-of-00001.parquet")
# 数据预处理
def tokenize_function(examples):
return tokenizer(examples["text"], truncation=True, padding="max_length", max_length=512)
tokenized_dataset = dataset.map(tokenize_function, batched=True, remove_columns=["text"])
train_dataset = tokenized_dataset["train"]
# 使用分布式采样器
sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
# 设置数据加载器
data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=True, mlm_probability=0.15)
train_loader = DataLoader(train_dataset, batch_size=4, collate_fn=data_collator, sampler=sampler)
# 设置训练参数
epochs = 3
# 确保模型参数的连续性
for param in model.parameters():
param.data = param.data.contiguous()
# 初始化deepspeed模型,将model转换成model_engine
model_engine, optimizer, _, _ = deepspeed.initialize(
args=None,
model=model,
model_parameters=model.parameters(),
config='./ds_config.json'
)
model_engine.train()
start_time = time.time()
if rank == 0:
print("Training start...")
# 手动实现训练循环
for epoch in range(epochs):
epoch_loss = 0
for step, batch in enumerate(tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", disable=(rank != 0), mininterval=20)): # 设置只有0号进程打印进度条且打印间隔为20s
# 将数据移到GPU
batch = {k: v.to(rank) for k, v in batch.items()}
# 前向传播
outputs = model_engine(**batch)
loss = outputs.loss
# 反向传播
model_engine.backward(loss)
# 更新参数
model_engine.step()
# 记录损失
epoch_loss += loss.item()
end_time = time.time()
elapsed_time = end_time - start_time
print(f"\nRank {rank}, Epoch {epoch+1}/{epochs}, Batch {step+1}/{len(train_loader)}, Loss: {epoch_loss/len(train_loader)}, total_time_used: {elapsed_time / 60:.2f} mins")
if rank == 0:
print("Training complete.")
- 上面代码是加载了预训练模型权重文件
pytorch_model.bin
然后对模型进行微调,若需要从头训练,只需要将model = BertForMaskedLM.from_pretrained("./model")
改为:
from transformers import BertConfig
config = BertConfig.from_pretrained('./model/config.json')
model = BertForMaskedLM(config)
Step3 编写运行脚本 sbatch.sh
:
#!/bin/bash
#SBATCH --nodes=1 # 节点数
#SBATCH --ntasks=4 # 任务数
#SBATCH --partition=gpu # 分区名称(根据集群修改)
#SBATCH --gres=gpu:4 # 设置使用的GPU数
module load nvidia/cuda/12.2
module load mpich/3.4.1-gcc9.3 # 加载gcc-5版本以上
deepspeed --num_nodes=1 \
--num_gpus=4 \
--launcher slurm \
run_bert.py
Step4 创建deepspeed的配置文件 ds_config.json
:
{
"train_batch_size": 4, // batch_size,必须等于 train_micro_batch_size_per_gpu * gradient_accumulation_steps * GPU数,且和训练代码中设置相同
"train_micro_batch_size_per_gpu": 1, // 每个GPU上micro_batch的数量
"gradient_accumulation_steps": 1, // 梯度累积多少个batch同步一次
// 设置使用ZeRO-3优化
"zero_allow_untested_optimizer": true,
"zero_optimization":{
"stage": 3
},
// 配置优化器
"optimizer": {
"type": "Adam",
"params": {
"lr": 1e-4,
"betas": [0.9, 0.999],
"eps": 1e-8
}
}
}
Step5 上传到服务器
- 将所有文件打包上传到服务器,其中模型文件
pytorch_model.bin
比较大,可能需要单独上传。上传后解压,文件结构如下:
bert-large
│ run_bert.py
│ sbatch.sh
│ ds_config.json
│
└───data
│ │ train-00000-of-00001.parquet
│
└───model
│ config.json
│ pytorch_model.bin
│
└───tokenizer
│ │ tokenizer.json
│ │ vocab.txt
│
Step6 配置服务器运行环境
# 创建虚拟环境
$ conda create -n bert-large python==3.10
$ conda activate bert-large
# 安装必要的库
$ pip install -r requirement.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
# 使用 conda 安装mpi4py,因为这个库需要的依赖太多了,pip很容易报错
$ conda install mpi4py
其中,requirement.txt
:
torch==2.4.1
transformers==4.46.0
deepspeed==0.15.2
datasets
tensorboard
fire==0.4.0
pytz==2021.1
loguru==0.5.3
sh==1.14.2
pytest==6.2.5
tqdm==4.62.3
Step7 命令行运行:
$ cd bert-large
$ sbatch sbatch.sh
运行结果(部分)