从零开始的大模型强化学习框架verl解析
之前在职的时候给一些算法的同学讲解过verl的框架设计、实现细节以及超参配置,写这篇文章姑且作为离职修养这段时期的复健。
本文中提到的做法和思路可能随着时间推移有变化,或者是思想迪化,仅代表个人理解。如果有错漏的地方还请指出。
现在知乎上已有若干verl的使用相关文章了,覆盖了整体架构和快速的使用方法。本文将尝试从整体设计思路出发,致力于帮助不熟悉infra相关知识的算法同学快速理解整体框架,能自己上手魔改,并且知道各个超参的原理。
本文将尽量覆盖verl各模块的技术细节,但会排除SFT部分——当前社区已有充足的SFT框架方案可供选择。
本文中会掺杂一些算法同学可能会比较confusing的llm infra概念介绍,这里的讨论不会过于深入细节,诸如tp的横切/纵切,pp怎么消bubble,排出九文大钱讨论sequence parallel的4种写法,这些在知乎上都有非常好的文章讨论过了。这里的讨论仅旨在保持文章整体逻辑的完整性。
文中给了一些code example,应该会对部分概念和写法的理解有所帮助。
Get started without Ray
让我们不妨从sft出发,当你的上两级ld找到你,语重心长地说,小x啊,sft做不动,我们该rl了,组织已经研究决定,由你来踩这个坑,你环顾四周也实在没有谦虚的空间,接下了这个活。
此时你想的是什么,当然是从现有的sft的框架出发,看能不能魔改一下就能跑rl了。
众所周知,传统的RLHF使用的PPO,有actor/critic/reward/ref四个模块,按照如图所示的工作流迭代模型
出自HybridFlow论文的rlhf流程图
这里参考HybridFlow论文中伪代码的写法,用batch指代过程中所有变量组成的dict。
for prompts in dataloader:
# Stage 1: response生成
batch = actor.generate_sequences(prompts)
# Stage 2: 训练数据准备
batch = critic.compute_values(batch)
batch = reference.compute_log_prob(batch)
batch = reward.compute_reward(batch)
batch = compute_advantages(batch)
# Stage 3: actor和critic训练
critic_metrics = critic.update_critic(batch)
actor_metrics = actor.update_actor(batch)
整体流程的这三部分相信大家已经很熟了。可以观察到参数迭代仅发生在stage3(模型训练),而前两个阶段均为模型推理过程。
那么很自然地,一个很简单的魔改思路就是,我们可以把它们各自init,再各自用deepspeed.initialize()/FSDP()包一下,然后重新写一遍训练流就好了。至于megatron,在我们这种跑起来就能交差的场景先不考虑。
这个思路很朴素,但确实是大部分早期rlhf训练的雏形,例如trl这个框架。无它,心智成本低,方便看懂,符合sft时的使用习惯。
但是问题也很明显,当模型和序列长度开始scale up之后,哪怕是仅仅是7b以上的量级,已经慢慢出现在单机8*80g机器上运行困难,可能爆显存的问题了。
平时碰到模型跑不起来的问题,常规的做法是把zero_stage开高,从zero1到zero2往往没什么问题,但是当我们试图从zero2到zero3的时候,往往速度一下会慢几个量级。(这往往是算法同学开始向框架求援的契机)
所以我们也就从这里开始,介绍我们后面会用到的infra知识。
parallelism &SPMD
parallelism
让我们从sft出发,先介绍一些parallelism的基础知识。
简单来说,目前parallelism解决的主要问题是,在多卡场景下如何做到:
-
提升多卡训练效率,最好2卡是1卡训练速度的两倍。
-
突破单卡显存限制以支持更大模型
标准训练流程包含三个核心环节:
-
模型forward,计算loss,保存中间激活值
-
模型backward,通过中间激活值计算gradient
-
模型update,把gradient传给optimizer,更新模型weight。
我们简单地把模型看成是Y=XW的矩阵乘法。将模型抽象为Y=XW的矩阵运算时,参数切分存在两种基本策略:
-
输入切分(X维度):对应Data Parallel/Sequence Parallel
-
权重切分(W维度):对应Tensor Parallel/Pipeline Parallel/Expert Parallel
在这里,切分X要比切分W简单的多。因为我们的模型输入往往是一个或多个规整的tensor,在batch维度可以很容易地做切分。大不了就把原始数据均分放到若干个文件夹里,每一块gpu从一个文件夹里读自己的数据就好了嘛。
而切分W就要头疼得多了,一旦出现诸如诸如卷积这种非典型矩阵计算,或者unet这种前后复杂的依赖关系,都要经过精心设计才行。
举个例子来说明这两种切分方式的区别,典中典的python爬虫任务,数据并行相当于将目标URL均分给各进程独立抓取,而模型并行则类似将使用多进程,将抓取流程分段执行。前者实现成本显著低于后者。
考虑整个训练流程,如果要和单卡保持一样的batch size(bs),我们需要让每张卡拿到自己的bs/n条数据。
在step1和step2都不需要做通信,也就是每张卡算自己loss和gradient即可,并不会有什么影响。
而在step3之前,我们需要把各卡的梯度放在一起求平均,保证得到正确的完整bs的梯度,而这个操作也就是all-reduce通信。
聪明的你已经想到了,这整个流程实际上就是分布式的gradient accumulation。
让我们把目光从X上离开,重新看W部分。在目前这种朴素的data parallel策略下,每块卡都拥有完整的model weight/gradient/optimizer,尺寸和单卡训练无异。
而deepspeed使用的zero stage即是对这部分显存占用的优化。具体细节在这里不表,好的文章已经很多了。从结论来说是
-
zero1中,每张卡只需要保留1/n的optimizer参数,通信量保持不变
-
zero2在zero1的基础上,每张卡只需要保留1/n的graident,通信量保持不变
-
zero3在zero2的基础上,每张卡只需要保留1/n的model weight,通信量变为1.5倍。
其中,zero1和zero2影响的分别是optimizer和graident,对应的是后两步,并没有影响forward部分
而Zero3模式下的训练流程演进为:
1. Forward阶段:all-gather获取完整参数→计算loss→释放参数→保存中间激活
2. Backward阶段:all-gather获取完整参数→计算梯度→释放参数
3. Update阶段:reduce-scatter获取梯度切片→优化器更新局部参数"
要注意的是,zero123本质仍然属于data parallel,不属于model parallel的范畴,尽管zero3看起来做了模型参数的切分,但实际上计算时会先做all gather得到完整的模型参数,计算时使用的也是完整的参数和切分后的输入。
对比tp/pp,它们从头到尾都只存模型参数的一部分,计算时使用的是切分后的参数和完整的输入。
-
对于dp,通信的是模型参数,也就是W和它对应的weight/optimizer
-
对于tp/pp,通信的是中间激活值,例如PP需要将上一个rank计算得到的中间结果传给下一个rank继续计算。
SPMD
在典型的多卡训练场景中(如使用torchrun或accelerate launch),通过nvidia-smi可观察到每块GPU对应独立进程,这种模式本质源于SPMD(Single Program Multiple Data)架构。
那么问题来了,是torchrun之类的启动脚本把它们“分配”到每张卡上的吗?实际上并不是。主流并行框架(DDP/DeepSpeed/Megatron)均基于SPMD范式:所有进程执行相同代码逻辑,通过环境变量差异自主确定行为模式,无需中心调度节点。
一段经典的PyTorch分布式训练初始化的代码
import torch
import os
print(os.environ['RANK'], os.environ['WORLD_SIZE'], os.environ['MASTER_ADDR'], os.environ['MASTER_PORT'])
torch.distributed.init_process_group(backend="nccl")
torch.cuda.set_device(torch.distributed.get_rank())
当我们使用torchrun启动这段代码后,会启动多个进程,每个进程有着不同的环境变量,标识它们属于哪一台机器和端口,是第几个进程和进程总数。
之后torch.distributed.init_process_group会根据这些环境变量构建通信组,这是一个阻塞操作,所有进程都要完成init_process_group后才会继续往下走。
最后set_device将当前进程绑定到一块gpu上,对于RANK=0的进程绑定在0号卡,RANK=1的进程绑定在1号卡,以此类推,不存在一个进程去调度安排它们的行为,它们运行的是完全相同的代码,只是用不同的rank区分开他们的行为。
以SPMD的思维模式去思考代码的写法,就像是思考在没有老师的班级里,学生们应该怎样才能过有序的校园生活。
-
学生们报道后首先拿到自己的学号(torchrun拿到每个进程的rank),确认自己是班级的多少号,坐在第几排第几列,这一排的同学和这一列的同学都有谁(init_process_group),然后找到自己对应的座位坐下(set_device);
-
奇数学号去一食堂,偶数学号去二食堂;
-
坐在第一排的同学负责收作业,坐在后面的同学把作业往前传;
-
即使是算班级各科平均分也不用老师来计算和公布,而是大家把自己的成绩写小纸条上传两圈,每个人就都知道班级平均分了。
以naive dp为例,会发现在训练过程中并不存在各个dp rank之间对齐参数的行为,这是因为只要保证各个rank初始化时的模型参数保持一致,之后每个step的gradient一致,从而optimizer对模型参数的更新是一致的,自然每个rank的模型就是一致的。
这也就引出了一个问题,SPMD的编程模式心智负担较重,相信写过Megatron的朋友都有感受,当逻辑复杂以后要考虑不同rank之间的不同行为和通信,以及避免corner case造成的stuck,一写一个不吱声,都是容易掉头发的环节。
总结来说,SPMD由于没有中心控制器,在运行时更为高效,完全由worker自驱。但由于在编程模式上需要各worker运行相同的程序,灵活性不如single-controller模式。
我们会在后续ray相关的部分做更详细的阐述。
接下来,我们通过介绍TP来进一步理解SPMD的编程模式。不论是使用Megatron进行训练,亦或是使用vLLM做推理,tp都是绕不过去的内容。这里不会介绍tp的更多使用细节,仅从一个简单的例子出发。
# 我们用torchrun启动这段代码
import torch
import torch.nn as nn
torch.distributed.init_process_group(backend="nccl")
torch.cuda.set_device(int(os.environ['RANK']))
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
# 构建一个从column维度切分的linear layer
class ColumnTPLayer(torch.nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
self.layer = nn.Linear(input_size, output_size // int(os.environ['WORLD_SIZE']), bias=False).to(device='cuda')
def forward(self, x):
ret = self.layer(x.to(device='cuda'))
output_tensor = torch.zeros(size=(int(os.environ['WORLD_SIZE']), ret.shape[0], ret.shape[1]), dtype=ret.dtype, device=ret.device)
torch.distributed.all_gather_into_tensor(output_tensor, ret, async_op=False)
output_tensor = torch.cat(output_tensor.unbind(dim=0), dim=-1)
return output_tensor
def load_weights(self, weight):
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
dim_per_rank = weight.shape[0] // world_size
self.layer.weight.data.copy_(weight[rank*dim_per_rank: (rank+1)*dim_per_rank, :])
batch_size = 10
input_data = torch.randn(batch_size, 2)
# init一个PyTorch的linear layer,并让我们构建的layer和它保持参数一致。
full_layer = torch.nn.Linear(2, 6, bias=False)
weight = full_layer.state_dict()['weight']
tp_layer = ColumnTPLayer(2, 6)
tp_layer.load_weights(weight)
tp_ret = tp_layer(input_data).cpu()
fl_ret = full_layer(input_data).cpu()
torch.testing.assert_close(tp_ret, fl_ret)
这是一个2GPU的column tensor parallel的例子。在这段示例代码里,我们先做了分布式的初始化。然后构建了ColumnTPLayer,最后将它和完整的原始linear计算结果比较。
从这段代码里我们可以观察到TP和DP的差别
**Tensor Parallel (TP)**:
-
同组内各rank接收相同输入/输出
-
权重矩阵被切分存储(如列切分)
-
每个TP组构成完整模型副本(如TP=2时,2个GPU组成1个副本)
**Data Parallel (DP)**:
-
同组内各rank处理不同数据分片
-
保持完整权重复制
-
DP数量直接对应模型副本数(如DP=4即4个完整副本)
回想TP和zero3的区别,zero3在每次计算前会先gather完整的参数,计算后释放;而tp则是在计算前后对输入和计算结果做通信。
可以看出zero3的中间激活值是完整的,tp则是切分过的,在例子中仅是为了演示结果才立刻做了all_gather,实际上可以把两个linear层组合在一起,在入口和出口处才做通信,中间激活值的size减少为1/n。
在verl里一个很通用的场景是dp和tp互转,假设在world_size=8的集群里,整体pipeline上有三个模型,model1和model3没有做切分,使用的是dp=8;而model2使用的是tp=8的切分。
此时要如何安排数据流呢。
首先model1的输出结果在各个rank上是内容不同的tensor [bs, d],不能直接用于tp切分的model2的计算,需要经过all gather操作,使得各个rank得到一个内容相同的tensor [8*bs, d],作为model2的输入;
而model2的输出在各个rank上是一个内容相同的[8*bs, d]的tensor,固然可以直接给model3做计算,但显然是重复计算的,每个rank上的model是一个完整模型,可以接收不同的输入。所以在这里把[8*bs, d]的结果切回到各个rank上[bs, d]的形式,做model3的计算。
DP阶段 → TP阶段转换:
-
原始输入:[bs, d](各rank独立)
-
通过all-gather沿batch维度拼接 → [8bs, d](全局统一)
TP阶段计算:
-
每个rank维护1/8模型参数
-
执行分片矩阵运算
TP阶段 → DP阶段还原:
-
输出结果沿batch维度切分 → [bs, d]
-
避免DP阶段重复计算
我们再举一个llava的例子。对于llava这种模型,本质是视觉部分产出一个vision embedding,文本部分出一个text embedding,把它们拼在一起,放进transformers里面开始计算。
一个非常常见的场景是,视觉部分的模型很小,大概3b以下,而文本部分的模型很大,可能要到30b。这种情况下我们一般不会对vit部分做切分,而是对llm部分用tp切一下。这样改起来成本比较低,vit太小没必要切,而且把tp塞进去也要掉一点头发;而llm的切分比较成熟。
这种时候我们就可以动一点心思了,假设world_size=2,tp=2,vit不切,则共有两份完整的vit模型,即vit部分dp=2。
因此,对于一个batch我们可以切成两份,分别送给vit拿到vision embedding,再gather起来,避免重复计算。 然后文本部分由于tp=2,就正常做vocab embedding的切分,正常做transformer的计算。当然可以在vision embedding通信的时候async执行vocab embedding的计算,算是可以做一下overlap。
牢记tp group的输入需要保持一致,而dp group的输入不一致,就足以理解verl的数据流处理代码了。
rollout优化
好了,让我们回到之前的问题,为什么naive的方式难以scale up。首先最大的问题就是transformers原生的generate在zero3时表现拙劣,而不用zero3又会OOM。
Zero3模式下,每次前向计算都需通过all-gather获取完整模型参数。在自回归生成场景下,每个token的生成都触发独立的前向传播,导致通信量与模型参数量呈线性增长关系。对于更大的模型,这种通信模式将产生难以承受的带宽压力。
而stage3的actor model training部分,由于只需要做一次forward,耗时上升不那么明显。
TRL框架对此的优化策略是通过deepspeed.zero.GatheredParameters上下文管理器。
with deepspeed.zero.GatheredParameters(model.parameters()):
outputs = model.generate(...)
该方案将原本逐token的all-gather操作转换为单次全局参数收集,显著降低通信频率。但生成期间需持续占用完整参数显存。
这个时候通过分段耗时分析可见,性能瓶颈主要集中于actor.generate()阶段。即使在zero2配置下正常运行时,响应生成环节也常占据总耗时50%-80%的比例。我们自然第一个要拿它开刀。
显然基于huggingface transformers的actor_model.generate()这种形式无法满足我们的需要,一个很自然的想法就是利用现成的inference engine来接管这部分工作,verl中使用的是vLLM(以及coming song的SGLang)
这种做法的好处在于可以复用inference engine里成熟的组件,如
-
专用推理算子(如PagedAttention)
-
高效的KV Cache内存管理
-
自适应调度策略。
并且通过这种方式可以将actor的并行策略和rollout的并行策略分离,训练归训练,推理归推理。不过使用vllm不方便的地方也是显而易见的,对于vllm不支持的模型就只能退化回原始方案了。
在增加了inference engine作为我们rollout模块后,我们每次训练前需要对齐inference engine与actor model的参数,因为actor model在每个step后都会更新。vllm实质上会init一个自己的模型,相比hf的实现,其中的算子都会替换成自己实现的高效推理算子。尽管看起来增加了参数对齐的时间,但引入inference engine带来的速度提升收益远超过了这些。并且我们可以通过一些手段来降低开销,这个放在之后说。
引入vLLM存在两个主要问题:
-
vLLM广泛使用ray来调度任务,有一个driver来实现各GPU之间的工作调度,不是完全SPMD的,融合进现有框架的逻辑比较复杂。为了保护头发,在目前我们的思路里暂时还没有出现ray,仍然由torchrun拉起任务。
-
vLLM rollout的模型和权重都源自actor model,而原生vLLM通过hf权重初始化,需要更改模型初始化部分的逻辑。
基于这两点,verl在verl/third_party/vllm目录下有若干个vllm魔改SPMD的版本。魔改后,就可以脱离ray的依赖,使用torchrun的形式来使用vllm了。从而能融入目前SPMD的训练框架中。
不过目前最新版的vLLM已经支持了原生SPMD, 而verl的相应适配应该也在最新版本里做好了。
https://github.com/vllm-project/vllm/issues/11400我们这里看看verl是如何魔改vLLM的。
-
新建了SPMDGPUExecutor,去除ray的driver相关逻辑,完全让各个ray worker完成工作。用伪代码来表示就类似于:
# 原Ray依赖逻辑
class RayWorker:
def execute_model(self, inputs):
return driver.dispatch(inputs)
# SPMD化改造后
class SPMDWorker:
def execute_model(self, inputs):
return self._model(inputs) # 各worker独立执行
-
在初始化分布式环境的时候,去除driver计算node和rank的逻辑,而是直接根据torchrun设置的LOCAL_RANK/RANK/WORLD_SIZE得到init_process_group的参数。
-
Worker的execute_model实现中,不再需要driver把输入分发到每个worker里,而是每个worker有完全相同的输入,不需要额外的同步操作。
-
在计算logits后,之前的逻辑是gather到driver节点,而改成完全SPMD格式则需要在这里使用all-gather,让所有worker保存完整的logits
2. 增加了init_cache_engine()和free_cache_engine(), 其中前者是kv cache相关,后者是模型相关。
-
vllm会预先allocate一部分显存用来存储kv cache,所以vllm的显存占用实际上是model weight+kv cache,类似于tensorflow拿来跑mnist都会把所有显存吃完。实际上是程序预先占用掉了,并不是实际上就用了这么多。对于vllm部署的场景无关紧要,但是在rl训练的场景下,我们还有其它的模型要使用,自然是要严格管理vllm占用的显存。
-
vllm在模型初始化完成后,会运行一次dummy input,记录在该过程中的显存占用。然后根据用户指定的gpu_memory_utilization参数,将剩余的显存用来存储kv cache。
-
例如对于80G的显存,设置gpu_memory_utilization=0.5,dummy input记录下来最大的显存占用是20G,则将有80*0.5-20=20G用于存储kv cache。此时显存里有actor/critic/ref/reward四个模型,可能还有rollout模型的占用。
-
这里verl魔改了gpu_memory_utilization的定义,改为剩余显存的比例,参数配置同上的情况下,会有(80-20)*0.5=30G用于存储kv cache。这个改动比较合理,否则需要时刻关注不同并行情况下其它部分的显存占用,反复调整gpu_memory_utilization。
分配策略 | 原生vLLM | Verl改进版 |
---|---|---|
总可用显存 | total * util | (total - dummy_usage) |
KV Cache配额 | total * util - dummy_usage | (total - dummy_usage) * util |
-
这部分kv cache占用的显存在生成完成后可以释放,留给后面的stage使用。体现在代码上就是将self.cache_engine和self.gpu_cache设置为None。在下一个step的生成开始时再重新init回来。
https://github.com/volcengine/verl/blob/main/verl/workers/rollout/vllm_rollout/vllm_rollout.py#L157
3. sync_model_weights与offload_model_weights
-
offload_model_weights用于在生成结束后,把模型参数指向cpu上的一个empty tensor。最后在torch.cuda.empty_cache时释放掉。
-
我们这里主要看看sync_model_weights,在将vllm改造成SPMD模式后,如何实现高效的rollout与actor的参数同步。
FSDP
我们先来考虑fsdp的实现,fsdp从原理上来说和deepspeed是类似的,verl中只考虑了前者。
这部分实现的入口在FSDPVLLMShardingManager下
https://github.com/volcengine/verl/blob/b46f55ecc98e40fb36af465fdde9b7f7613e5e50/verl/workers/sharding_manager/fsdp_vllm.py#L36实际上它实现了一个context manager,将rollout.generate_sequence及其前后处理的过程包了起来。
我们假设在一个world_size=8,vllm tp_size=4,使用FULL_SHARD,也就是zero3的配置下运行这段逻辑,来捋一下流程。
首先我们需要考虑,vLLM是如何读取weight部分的逻辑在
https://github.com/vllm-project/vllm/blob/v0.7.2/vllm/model_executor/models/llama.py#L378概括一下就是对于一个vllm model,给定一个weight dict,在vLLM通常的使用方法下,这个weight dict是通过读取权重文件获得的;在verl的场景下,这个weight dict是从actor得到的。
-
遍历weight dict得到actor model的name和weight
-
根据name找到vllm model对应的子模块的param
-
这些子模块在定义时已经实现了weight_loader函数,
-
通过param_data.copy_(loaded_weight)的方式把actor的参数复制到vllm model里
让D指导给我们举个例子:
# 参数同步的快递员难题(通俗版)
## 场景设定
想象我们要把训练好的"Actor Model"的参数(快递包裹)准确投递到"推理引擎vLLM"(收货方),但遇到三个难题:
1. **地址不匹配**
vLLM的收货地址簿(参数命名规则)和actor model不同
→ 需要人工制作地址对照表(hard-coded weight loader)
2. **包裹分箱规则不同**
- 训练时用FSDP把actor model包裹拆成8个随机碎箱(FULL_SHARD)
- 推理时vLLM要求按固定规则分4箱(TP=4)
→ 需要先拼合碎箱,再按vLLM规则分装
3. **运输通道限制**
每次只能传送整箱货物,不能传碎片
## 分步快递流程
### 步骤1:收集碎片
```python
with FSDPVLLMShardingManager(): # 启动临时快递中心
# 把FSDP分散在8个GPU的参数碎片收集成完整包裹
full_weights = gather_fsdp_shards(actor_model)
# 使用预制地址对照表(如llama专用翻译器)
translator = get_weight_loader("llama")for param_name in full_weights:
vllm_name = translator(param_name) # 例如把"layers.0.qkv"转为"model.layers.0.attn.W_pack"
# 按vLLM的TP=4要求分箱for vllm_param in vllm_model.parameters():# 例如把[4096,4096]的矩阵按列切成4份
shard = split_parameter(full_weights[vllm_name], dim=1)
vllm_param.copy_(shard[rank]) # 每个GPU拿自己那份
这里要这样折腾一遍,原因是:
-
vllm内部有一套llm的实现,而这套实现和现有的开源模型并不完全匹配,因此需要调整checkpoint的格式,例如qkv和moe的ffn层,实现是concat还是seperate的。所以针对这些,目前采用hard coding的形式定义了各种模型的loader的形式,如gpt2_dtensor_weight_loader/llama_dtensor_weight_loader/deepseekv2_dtensor_weight_loader等,将开源的模型和vllm内部的模型在weight和name上做对齐。
-
因为vllm的模型使用了tensor parallel。在目前SPMD的编程模式下,需要确定每个rank会拿到weight的哪一部分,在哪个维度做切分。
-
FULL_SHARD切出来的tensor并不规整,不能直接使用,需要先通过full_tensor()获取完整的tensor。
# ColumnParallelLinear
def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor):
tp_rank = get_tensor_model_parallel_rank()
output_dim = getattr(param, "output_dim", None) # 这里为0
param_data = param.data
shard_size = param_data.shape[output_dim]
start_idx = tp_rank * shard_size
loaded_weight = loaded_weight.narrow(output_dim, start_idx,
shard_size)
assert param_data.shape == loaded_weight.shape
param_data.copy_(loaded_weight)
# RowParallelLinear
def weight_loader(self, param: Parameter, loaded_weight: torch.Tensor):
tp_rank = get_tensor_model_parallel_rank()
input_dim = getattr(param, "input_dim", None) # 这里为1
param_data = param.data
shard_size = param_data.shape[input_dim]
start_idx = tp_rank * shard_size
loaded_weight = loaded_weight.narrow(input_dim, start_idx,
shard_size)
assert param_data.shape == loaded_weight.shape
param_data.copy_(loaded_weight)
总体来看,FSDPVLLMShardingManager在__enter__部分,各rank先获取完整参数,再保留自己需要的部分。
以目前world_size=8/vllm tp_size=4为例,由于目前使用的是FULL_SHARD,每个rank只保留了1/8的参数,先通过full_tensor()获取完整参数,再通过weight_loader的逻辑确定保留区间。
以[16,16]维度的RowParallelLinear参数为例,其沿dim=1维度被切分为4个[16,4]的子张量,分别分配给rank0-3和rank4-7。这种切分策略实际形成两个完整模型副本(dp=2)。
到这里,我们已经把actor model的weight复制到了vllm model。
但是仍然有很多细节需要进一步处理,才能跑起整个pipeline。
-
在sync_model_weights完成同步后,立即通过del指令和torch.cuda.empty_cache()释放state_dict中full_tensor()产生的临时显存占用。。此处vllm model的显存无需特殊处理,因其初始化后即被offload至CPU,参数以空张量形式存储,即使复制actor参数后仍驻留CPU。
def __enter__(self):
#省略部分代码
params = self.module.state_dict()
load_format = 'hf'if self.full_params else'dtensor'
self.inference_engine.sync_model_weights(params, load_format=load_format)
del params
torch.cuda.empty_cache()
log_gpu_memory_usage('After del state_dict and empty_cache in sharding manager', logger=logger)
class vLLMRollout(BaseRollout):
def __init__(self, actor_module: nn.Module, config: DictConfig, tokenizer, model_hf_config, **kwargs):
# 省略中间代码
self.inference_engine = LLM(
actor_module,
# 省略参数配置
)
# Offload vllm model to reduce peak memory usage
self.inference_engine.offload_model_weights()
-
随机状态管理。对于一个分布式的vllm model,需要保证在一个tp group内的random state是一致的,在这里也就是rank0-3一致,rank4-7一致。回想generate的过程,在做sampling的时候会用到随机数,此时需要保证tp group内不同rank sample出来的结果是一致的,否则不同tp rank sample出不同的结果,在进行下一步的decoding计算时得到的结果无意义。
-
因此这里创建了一个gen_random_states,对于同一个dp内的所有tp rank使用相同的seed。在__enter__时切换这个random state,结束后切换回之前的torch_random_states。
gen_dp_rank = self.device_mesh['dp'].get_local_rank()
torch.cuda.manual_seed(gen_dp_rank + 1000) # make sure all tp ranks have the same random states
self.gen_random_states = torch.cuda.get_rng_state()
-
非vllm组件采用dp=8并行,而vllm model采用dp2 tp4的混合并行。也就是vllm model只有2份完整模型,而其它的模型都是8份,同时接收8份不同的数据作为输入。
-
在开始的介绍部分我们强调过,tp rank之间接收相同输入,dp rank之间接收不同输入。所以这里的做法是把8份数据打包成2份,rank0-3的数据concat为data0,作为该rank0-3的vllm model的输入;rank4-7的数据concat为data1,作为rank4-7的vllm model输入。
-
假设之前每个rank数据的batch_size为4,则vllm model处理的batch_size为4*4=16。这也就是preprocess_data中的做法,把tp group内的数据打包在一起,作为相同的输入。
-
在generate完成后,再做拆分,把这个bs=16的生成结果切回4个bs=4的小块,发回给各个rank。这样在后续dp=8的环境下,各个dp的输入又是不同的了。
data.batch = allgather_dict_tensors(data.batch.contiguous(),
size=vllm_ps.get_tensor_model_parallel_world_size(),
group=vllm_ps.get_tensor_model_parallel_group(),
dim=0)
-
根据free_cache_engine参数确定,是否在生成后就释放掉vllm allocate的cache engine,直到下一次生成才重新build。本质是用时间换显存空间。一般默认开启,收益非常大。
-
在generate和后处理完成后,退出context manager的过程中,通过offload_model_weights把vllm model也放回cpu,并再次torch.cuda.empty_cache()。
-
此时显存中不再有rollout相关的内容,可以继续进行stage2的data preparation和stage3的model training。可以看出整个rollout过程中对显存的管理比较谨慎,经常做offload和empty_cache,保证有完整的显存块可以用于后续stage。
以上是整个fsdp的hybrid engine的实现。我们来整理一下这部分的思路,相比简单的actor.generate()做了哪些改动
-
架构解耦:分离actor model和rollout model,使用vLLM作为推理后端,充分利用其高效算子与KV Cache管理机制
-
框架适配:将vLLM改造为SPMD模式,移除Ray依赖,保持torchrun的使用方式,避免引入额外复杂性
-
参数同步:通过FSDP的state_dict获取完整参数,基于tp_rank进行权重分区加载
-
显存优化:CPU初始化→按需加载→及时卸载→强制回收,实现吞吐量与内存占用的平衡。事了拂衣去,深藏功与名。不论是offload还是free cache的时间,相比generate花的几十上百秒都不值一提,而获得的收益却是可以降低并行度或是提高batch size,大大提高吞吐。
-
状态一致性:建立DP-TP混合并行下的数据流控制体系,确保随机状态、输入数据的拓扑一致性。
Megatron
接下来我们要开始看Megatron的写法了。最主要的改动在于vLLM model的构建。Megatron+vLLM的写法在actor model和rollout model做 sync weights的时候有很多改动,不同于FSDP的zero3转tp,Megatron的TP+PP转TP的写法也就是HybirdFlow论文中提到的3D Hybird Engine的完全体。
首先需要了解的背景是,目前的主流推理引擎在模型切分方式的选择上使用的都是TP而非PP,除非对于DS v3这种单机TP=8也塞不下的场景,不得已才使用PP。个人认为原因还是会出现pipeline各rank之间等待的bubble,GPU存在空闲。如果要用切micro batch的方式来减少bubble,会让每次计算的bs打不高,decoding阶段的memory bound更加严重,得不偿失。当然我没有读现在推理框架里PP的实现,不知道现在是否已经有成熟的解法了。
因此,这里衍生出了两个问题:
-
在推理时不能使用PP,需要将原有的PP维度换掉。
-
并行超参往往是我们考虑到后面的训练部分设置的,因为训练时显存需要存储optimizer/gradient/intermediate activations,所以会设置得比较保守。但是推理时是不需要的,推理时显存只需要全部留给kv cache即可,此时应该降低并行程度提高吞吐(由于TP通信开销很大,尽管降低TP维度会增加model size和per token的kv cache,使并发batch下降,但在吞吐上仍是稳步上升的)
所以得出的结论就是,actor model和rollout model的并行方式应该独立设置。对于一台8卡的机器,假设actor model使用的是TP=4 PP=2的设置,那么rollout model应该使用类似DP=4 TP=2这种更轻量的并行。
有了这个概念,我们就可以来看verl中关于这部分的代码了。verl中分为两步走,先通过AllGatherPPModel的设计将PP维度转换成DP维度,再通过合并TP rank的方式降低TP维度。
并且在这种设计下,可以让actor model和rollout model共享同一块显存空间,也就是不需要像FSDP中那样rollout model需要额外的显存了。
首先我们需要维护两套不同的distributed env,Megatron和vLLM的切分方式不同,需要不同的group。
这部分的写法在verl\third_party\vllm\vllm_v_0_6_3\parallel_state.py。通过分别使用Megatron的mpu.initialize_model_parallel和vllm的vllm_ps.initialize_parallel_state,将两套环境分别初始化在vllm_ps和mpu下,避免全局变量满天飞的情况。
-
PP to DP
我们首先要解决第一个问题,将PP维度转换成DP。
总体思路是将推理时的每一个PP rank都能拿到完整的属于当前tp的参数,这样说可能有些绕。我们用一个简单的示意图表示,对于tp=2 pp=3的场景,在转换后rank0/2/4拥有相同的模型左半边参数,rank1/3/5拥有相同的模型右半边参数。
此时inference engine中不再有PP维度,集群中有3份完整模型。然后我们就可以使用dp了,将原始输入切成三块,分别喂给这3份模型,提高吞吐。这也就是PP转DP的做法。
具体到实现阶段也很有意思。一个naive的实现是根据初始化后的rollout model,遍历其中的weight name,对每个pp rank做broadcast,互通有无。
这里verl用了一个比较优雅的做法,通过定义AllGatherPPModel来解决这一问题。AllGatherPPModel被用于actor和rollout的init部分,即HybridEngine。
-
在初始化时,一般的做法是每个pp rank只构建当前rank的model,而这里的做法是每个pp rank会遍历构建所有pp rank的model,也即在rollout时当前rank所需的所有参数。
-
指定训练时要用到的部分为this_rank_models。
-
遍历时,在构建model后,会构建一个MemoryBuffer,将当前model weight重新指向一块完整的显存空间。因为后续需要频繁在pp rank间通信,以及offload或者.cuda()等操作,使用一整块tensor统一通信要好于遍历每个tensor依次通信。这个做法和思路类似于DP中使用bucket。
-
将非当前rank的参数offload到cpu备用。
当PP=3时,构建出来的模型如图所示。每个小框指的是一块MemoryBuffer,它把model weights放在一整块显存空间里。
结合这张图,接下来的流程就很清晰了。
def __enter__(self):
# create a new cuda space for parameters not in this pp rank
self.module.load_params_to_cuda()
# broadcast the parameters from pp rank to other ranks
self.module.allgather_params()
# obtain name to parameters in pp/vpp
params = self.module.get_all_params()
# bind the params to inference engine
self.params = normalize_pp_vpp_params(params=params,
num_hidden_layers=self.model_config.num_hidden_layers,
layer_name='layers')
self.origin_params = self._post_process_params(self.params)
self.inference_engine.sync_model_weights(self.params, load_format='megatron')
在进入rollout阶段时:
1. 因为我们在rollout时不使用PP,所以要先把其它pp rank的参数放到gpu上。然后将这些weight指向MemoryBuffer在gpu上的tensor的对应slice。把这里的操作当成是指针就很好理解了。而非当前rank的模型参数用的是torch.zeros,因为上一轮rollout的模型参数已经没有意义,无需再copy一遍。
2. 对于每个rank=n,只有rank=n部分的参数是经过上一轮迭代的正确参数,所以需要将它们broadcast给其它rank。
3. 准备actor model的weight dict,在后续喂给vllm model。这一步会将各PP rank的参数聚合成一份完整的参数。
4. Build vllm model,改一改actor weight dict里的key,从而和vLLM的参数名对齐。然后将vLLM对应的tensor指向和state_dict里的tensor相同的显存空间。这也就是说,这种方式的vLLM model无需额外的显存空间,因为和actor使用的同一块显存。和之前FSDP使用dtensor的代码对比如下所示。
# Megatron weight loader
def default_weight_loader(param: torch.Tensor, loaded_weight: torch.Tensor) -> None:
assert param.size() == loaded_weight.size()
assert (param.data.dtype == loaded_weight.data.dtype
), "if we want to shared weights, the data type should also be the same"
param.data = loaded_weight.data
# dtensor/hf weight loader from vLLM,省略部分逻辑
def default_weight_loader(param: torch.Tensor,
loaded_weight: torch.Tensor) -> None:
assert param.size() == loaded_weight.size(), (
f"Attempted to load weight ({loaded_weight.size()}) "
f"into parameter ({param.size()})")
param.data.copy_(loaded_weight)
由于我们将PP转换成了DP,原本所有的PP rank使用同一份数据,所以需要将这些数据做切分,使得转换后的DP rank使用的是不同的数据。
在生成结束后,我们再将不属于本rank的参数offload回cpu,并且将vllm model重新指向cpu的空间,并通过empty_cache()回收这部分显存。对于数据,把切分后的数据在gather到一起,保证每个PP rank有相同的输入。
-
Tp to dp
在第一步中我们解决了从PP到DP的问题,假设对actor model我们使用的是TP=4 PP=2的配置,则在第一步之后变成了TP=4 DP=2;我们想继续减少并行程度。
对于一个TP group内的rank[0,1,2,3],按照vLLM/Megatron的切分方式,在tp=4时每个rank包含的参数分别是[m_0], [m_1], [m_2], [m_3],在tp=2时分别是[m_0, m_1], [m_2, m_3], [m_0, m_1], [m_2, m_3],原因是Megatron切分时优先级是TP>DP,所以rank0和rank2重复,rank1和rank3重复。
这样就导致一个问题,在通信时必须先all gather全量参数,再做slice。例如对于rank2,原来的参数是[m_2],而需要的参数却是[m_0, m_1]。
verl对此改进了切分优先级,让TP<DP,这样tp=2时的模型参数是[m_0, m_1], [m_0, m_1], [m_2, m_3], [m_2, m_3]。这样就只需要rank0和rank1做all gather,rank2和rank3做all gather即可,通信量变为原来的1/2。
verl基于这一规则重新构建了MICRO_DATA_PARALLEL_GROUP,而非使用vLLM和Megatron默认规则构建的group。
之后增加的逻辑就很好理解了。我们将TP=4 DP=2转换为TP=2 DP=4,
-
模型参数上,将属于一个micro_dp_group的参数gather并concat起来,满足vLLM model在tp=2时的需要,生成完成后复原。
-
数据上,将原始输入根据micro_dp_group切分,并在生成结束后再gather并concat起来。
这里实际上就可以看出并行时的数据处理套路了。即
-
dp维度减小,就要gather;
-
dp维度增加,就要chunk。
技术选型上即尽量降低并行程度,尽量打高batch size,不要让显存白白浪费,看用offload换能开更大的batch size提高throughput是否值得。
stage2和stage3的优化
在完成stage1优化后,我们继续分析stage2和stage3的潜在优化空间。
首先能想到的是加入remove_padding,这个在两年前就已经是一个比较通用的做法了。
现在llm的输入一般是由input_ids和attention_mask组成。其中的PAD Token会参与全程的计算,这部分计算量实际上是浪费的。
因此考虑把[bs, seq_len]的input_ids打平成一维[total_seqlen]的做法,去除其中的PAD token,这对于embedding/lm_head/mlp层是无影响的,因为它们不存在token之间的交互,打平成一维并去除PAD token不会影响其它token的计算。
而attention部分则将原来使用的flash_attn_func替换为flash_attn_varlen_func,将打平后的qkv和cu_seqlens作为输入即可。cu_seqlens指的是每个样本实际长度的累加和,例如当bs=2,实际长度为3和4,则cu_seqlens为[0, 3, 7]。
而rope部分,我们可以通过类似的方式计算出position_id_unpad,然后apply到每一个q_unpad和k_unpad即可。
基于这种方式,我们可以先把原来(input_ids, attention_mask)转换为(input_ids_unpad, cu_seqlens, position_id_unpad),之后的计算流程就将移除PAD token。这在stage2和stage3会更加有效,因为得到stage1的generate结果往往包含prompt的left padding和response的right padding。
从这个角度来说,在使用remove padding之后,我们可以以token视角来看整个训练流程,而非以batch视角。
verl通过模块化封装,把原有的model包了一层,实现了一些通用功能。
-
Actor组件:dp_actor/megatron_actor
-
Critic组件:dp_critic/megatron_critic
-
功能:
-
compute_log_prob:策略评估
-
compute_values:价值估计
-
update_actor/update_critic:参数更新
对于这部分具体该怎么计算,即PPO/GRPO/Remax等该怎么实现,本文不做说明,这个已经有很多相关公式的介绍了。这里我们仅看几个框架上的实现细节。
-
Logits重计算 不论是transformers还是vllm的generate,都可以通过修改配置返回生成过程中每个token对应的probs,但是verl中会强制在stage2重算一遍,不会使用generate给的结果。原因有二:
-
后处理精度问题:在generate时如果使用了一些采样的后处理调整策略,例如penalty等,此时inference engine返回的logits值是经过了这些策略修改后的值,并不是原本从模型中计算出来的值,这一点虽然可以通过一些比较tricky的方式魔改,但还是有很多不方便的地方。
-
算子差异:对于vLLM这类的推理引擎,算子优化会做的更加激进,可能造成同样的数据,尽管vLLM model和actor model参数是对齐的,算出来的logits仍有一些diff。这种diff可能并不会影响每次计算出来的candidate token的排列顺序,也就不太会影响最终的采样结果。但是当logits的具体数值用于后续计算时,我们还是希望能尽量和actor model的实际结果对齐,不要给本就不太稳定的RL训练添加更多不确定要素了。多算一遍就当是花一次inference的时间保平安了。毕竟时间大头都在generate上。
2. 显存优化策略 对于actor和ref的compute_log_prob,critic的compute_values,reward的compute_rm_score。
-
这里只涉及单纯的前向计算,但是做法并不是一把梭,把generate部分传过来的一整个batch的prompt+response做一次infer就完事儿。但是这种写法实际上在scale up上去之后很容易OOM。
-
compute_log_prob时模型的输出结果是logits,shape是[total_seqlen, vocab_size],简单计算一下,对于bs=16,prompt+response平均长度为1000,vocab_size=150000,此时总参数量为16k*150k=2.4B,如果为了精度使用fp32,此时占用显存9.6G,已经是一个无法忽视的值了。
-
此时会出现的现象是:举手,为啥我训0.5B的模型还能OOM啊。因为0.5B的模型的vocab_size是一样的,反而因为0.5B的时候batch_size开得比较奔放然后OOM了。
-
早期的logprobs_from_logits计算比较粗放,都直接从trl拿过来。
def logprobs_from_logits(logits, labels):
logp = F.log_softmax(logits, dim=-1)
logpy = gather_from_labels(logp, labels)
return logpy
后面大家都意识到这种做法是有问题的,中间结果logp的shape和logits是一样的,导致了双倍的显存占用,开始用一些别的写法来解决这个问题了。可以参考openrlhf里的这个PR。
[small change] Use selective log-softmax to reduce peak vram consumption by tyler-romero · Pull Request #718 · OpenRLHF/OpenRLHFverl这里的解法有两种,一种是直接用flash_attn的cross_entropy,把这部分当-cross_entropy_loss来用;另一种是通过log_softmax(x_i) = x_i - logsumexp(x),两个tensor都是一维的,减少了显存占用。
当然,以上的方法不能完全避免大logits的出现。目前还采用了把当前batch切成micro_batch的实现,将一次infer变成多次infer,不再会因为中间结果过大而OOM。
但是仍然存在的问题是,当我们使用remove padding后,从token的角度来看,micro batch之间长度不均衡的问题,碰上corner case依然存在OOM的可能。
这里的解法实际上也很简单,我们切分batch的方式从batch_size改为max_token_len即可,之前是确定每个micro batch有几条样本,现在改为确定每个micro batch一共要计算多少token。
这样看起来每个micro batch里的样本数是不确定的,这也就是参数中use_dynamic_bsz=True里的dynamic的由来,而确定每个micro batch里有多少token,就是参数中max_token_len/ppo_max_token_len_per_gpu的含义。
此外用到的一些超参,use_liger是把transformers里实现的组件替换成liger实现的性能更好的组件;ulysses_sequence_parallel_size则是Deepspeed ulysses的实现,这里按下不表,不然太跑题了。
目前的流程梳理
verl团队比较推荐使用FSDP+vLLM方案,因为FSDP相比megatron要友好很多,用FSDP(model)就好了,而无需对模型结构本身做侵入式的改动,而zero3的方式也足够支持给70B+百卡这个量级的训练了,相应的把vllm的tp_size开大就好,对于95%以上的需求都是足够用的。另外还可以开启gradient checkpoint/offload等方案把显存挤出来。
现在,我们再来从模型和数据角度捋一遍verl使用FSDP+vLLM方案的全流程。
init阶段
给定prompt数据集与N卡集群,建立分布式训练框架的训练环境。
数据分片策略
在使用FSDP+vLLM方案下,除了rollout之外的模块使用的都是dp。也就是说我们在构建Dataloader的时候采用DistributedSampler实现数据分片,确保各rank获取batch内不同数据分区。Rollout模块外组件均使用数据并行(DP)
模型初始化流程
对init好的transformers模型使用liger的_apply_liger_kernel_to_instance,替换成高性能内核实现。然后对actor/critic/ref/reward四个模型用FSDP包起来。
HybridEngine构建
构建rollout的vllm model和ShardingManager,它们共同组成HybridEngine。
vLLM适配:改造v0.7前版本实现SPMD并行,移除Ray依赖
构建模型 对vllm model传入actor的相关参数,确保在它和actor是同一个模型结构,然后暂时把rollout model offload到内存。
对齐参数 ShardingManager则需要将vLLM model的参数和actor model对齐,并放到gpu上;而在生成结束后,将vLLM model offload回去。同时需要处理输入前后的数据。
完善功能 然后对于模型的不同角色,把原始model warp一下实现不同角色模型的功能,如actor在stage2计算logprobs,在stage3做model update;critic在stage2计算values,在stage3做model update。
forward阶段
-
stage1
从dataloader拿到数据后,进入stage1生成response。
此时每个rank上遍历actor model的state_dict,对于每个weight做full_tensor()拿到完整的weight,然后对它做一些split/concat的操作,保证它是和vLLM的实现对齐,weight.copy_(),把vllm model放到对应rank的gpu上,然后释放掉之前的state_dict节约显存。
然后对数据,根据vllm的tp_size实现相应的all gather操作。在vllm的generate结束后,再offload模型和kv cache,并把数据分发到之前的rank。
对于GRPO这种需要对一条样本sample多次的方式,目前verl的处理方式是在stage1将原始数据repeat n份,在stage2再根据uid计算相同样本之间的mean和var
-
stage2
actor/critic/ref/reward各自需要做inference only的计算,使用micro_batch和use_dynamic_bsz优化显存占用,避免显存成为batch size开不大导致throughout上不去的瓶颈。
-
stage3
在stage3,训练数据已经收集齐全了,常规训练即可。有个要注意的地方是,这里在超参的设置上有data_batch_size/mini_batch_size/micro_batch_size的区别要注意,这个我们会在最后讨论。
当然,由于ray的存在,我们上面介绍的流程和verl的实际流程仍有一点区别,我们将在下一章解释。这里只是提出一种可以不依赖ray的方案。看完以上FSDP+vLLM的写法,相信你已经有能力魔改一个去除ray和Megatron的丐版verl了。
如果有一些Megatron基础,把满血版3D HybridEngine加上去也不是难事了。把这些组件糊到一起,我们就可以得到一个不依赖ray的verl版本了。根据HybridFlow论文和之前的实践经验,这种方式在模型不大且gpu较少(大约是<100卡)时拥有很好的性能。
Dive into Ray
西卡西,从头审视一下,有一种可能的写法我们一直在回避。我们一直是把actor/critic/ref/reward四个模型摊平部署在同一个gpu集群的,即每一块卡里都至少有这四个模型的一部分,也就是论文中提到的colocate。
但是有一种更直觉的想法,为什么不把所有的机器切成四个集群,分别放置这四个模型呢,这样每个集群里的机器只有一个模型。这种做法是否会更高效。最早的朴素的想法就是,我们完全可以把ref/reward这种只有inference的model做成远程rpc请求。
也就是除了第一张图的写法,我们完全可以用右边两张图的写法。
如果每个模型单独放置一个集群,从图中可以看出,除了stage2在actor recompute的情况下没有资源闲置,在stage1和stage3都存在资源浪费的情况。当然我们可以对这个方案做一点改进,如果把actor model和reference model放在一组机器,critic model和reward model放在一组机器,此时stage3的浪费情况得到缓解。
尽管分开放置可能存在一些闲置的情况,但是它有如下的好处:
-
在colocate的做法中,每张gpu上都需要放4个模型,也就意味着对于每个模型,需要使用比分开放置更高的并行度才能放得下。而更高的并行度带来了额外的通信开销。随着模型尺寸scale up,这一问题更加明显。
-
colocate所有模型也就意味着所有模型共享一种并行策略,对于很多情况并不灵活。例如早期大critic小actor的训练方式。小actor被迫和大critic共享一种并行度;对于ref和reward,它们不需要optimizer,和actor/critic使用同一并行度也是吃亏的。
-
随着集群规模scale up,colocate的方法会开更高的dp,这也带来了更多的通信开销;而分开放置的方式可以用更小的集群去运行不同角色的模型。
总而言之,
-
colocate的做法好处在于,从头到尾都是在工作的,全流程里不存在各模型之间的互相等待和空转;
-
分开放置的好处在于,在并行度的设置上可以更加灵活,降低通信开销。
-
实际上就是看通信开销和集群空转的开销哪个是更可以接受的。
-
在HybridFlow论文中的第六章讨论的Auto Device Mapping,实际上也是为了搜索出一个效率最高的放置方式。例如对于13B 128卡的训练,最优的放置方式是actor 64卡,critic 32卡,reference model和reward model各16卡。
但是问题来了,分开放置的思路似乎有些超出SPMD能处理的范畴了。
我们之前colocate的写法中,放置4个模型的方式无非是把放置1个模型的逻辑重复4次,而计算流和数据流也可以很自然地复用现有SPMD的DP/PP/TP的那一套逻辑。
但对于分开放置之后,问题变得不一样了。一部分gpu只负责actor的计算,另一部分gpu只负责critic的计算,已经不符合单一程序并行执行的假设了。
从通信的角度来说,之前colocate时只存在计算时的通信,如DP的梯度all reduce,TP的activation all-gather/reduce-scatter;
现在我们还需要考虑计算外的通信,也就是将数据从一个角色传到另一个角色,例如stage1-2需要从actor传到critic/ref/reward,stage2-3需要从ref/reward传到actor/critic。这部分用SPMD的写法来实现会非常别扭。乱动SPMD内部也会导致BP梯度断掉等奇怪的问题。
那不如考虑这种写法。我们划分集群1/2/3/4,分别运行actor/critic/ref/reward。集群内由于角色单一,仍然保留SPMD的写法,该FSDP用FSDP,该Megatron用Megatron,复用这些并行框架;而集群间的通信通过RPC框架来实现,例如TorchRPC这种。
此外,数据在角色间的传输上也有一些要考虑的地方。举个例子,集群1有8卡,分布式方式是DP2 TP4,集群2有16卡,分布式方式是DP8 TP2,我们要把集群1计算完的数据发给集群2,如何优雅地实现呢?
-
一把梭的方式是,集群1把所有数据gather到rank0,然后发给集群2的rank0,再分配给各个rank;
-
优雅一些的方式是,集群1各rank算好自己要把数据发给集群2的哪个rank,然后各自发送。
注意到我们这里纠结的点都在于数据流,所以verl提出的解法是引入ray,用ray driver来管理数据流,而各个ray worker做自己的计算即可。也就是在目前的实现基础上,ray worker专注于自己内部的计算,不需要再考虑各个role之间数据传递的逻辑,交给ray driver去处理就好。
总之在我们上面章节所说的torchrun版本,使用colocate的方式回避了role之间传输数据,
而引入ray则是来处理非colocate的情况。
HybridFlow原文中提到了single-controler和multi-controler,一个负责数据流一个负责计算流。我们可以把它看作有形的大手和无形的大手。
目前的LLM分布式训练的范式就是SPMD,运行过程中它们之间遵循一定的规则,根据rank领取自己的weights和data,自己算自己的部分,到点了通信一下就好了,不需要调度,就像无形的大手让它们各自正常工作。
但是在rl场景中,正如我们上面分析的一样,单纯的SPMD在split的场景下会使得沟通效率低下,逻辑复杂繁琐,此时需要引入有形的大手,也就是ray driver来主动调度任务和管理数据流,将数据流和计算流解耦。
-
对ray driver来说,模型如何infer如何train都是黑盒,它只需要编排整体工作流;
-
对ray worker来说,内部有独立的环境实现分布式计算,不用再担心有别的逻辑扰乱它SPMD的范式,可以充分复用之前的分布式计算的技术。
用ray把数据流和计算流分开来是一个很舒服的改动。之前没有ray的时候,在魔改时最常见的错误就是不熟悉SPMD的范式,破坏了各个rank之间同步。例如rank0多做了一次random,导致random state出了问题。或者是tensor的shape没有保持一致,导致broadcast的时候卡住。
现在我们可以让ray driver上负责轻量的算法逻辑,调度各个任务;而让ray worker来做复杂的分布式计算逻辑,做到二者的分离。一般来说我们要尝试新的rlhf算法,魔改的是前者而非后者。实际上是用rpc带来的overhead的性能损失,换来不需要和分布式计算的SPMD范式打交道,
而当我们需要优化分布式的计算流效率时,也无需考虑改了之后是否会导致原来的算法跑不起来。毕竟对于算法来说,这部分分布式计算变成了黑盒,可以把之前pretrain和sft的优化经验无缝迁移过来。
引入有形的大手在具体实现上会有什么门槛呢,这个时候我们执行计算的对象,由TORCHRUN拉起的process,变成了Ray Worker。
TORCHRUN Worker | Ray Worker | |
---|---|---|
执行单元 | Python脚本进程 | Class实例化对象 |
任务触发方式 | 同步执行 | RPC异步调用 |
分布式通信 | NCCL原生通信 | Ray Object Store |
Ray相关的实现
看过几个ray tutorial的example后,我们大概对ray的概念就是要用@ray.remote作为decorator。
那么直接用形如这种形式是否可行呢?
@ray.remote(num_gpus=4)
class Actor:
pass
@ray.remote(num_gpus=2)
class Critic:
pass
...
答案是否定的。这种方式启动的ray worker本质是单进程的,类似于直接执行python xxx.py去使用多GPU。而非torchrun xxx.py的形式。除非手动在内部拉起多进程,否则是无法优雅地用到多GPU的。这样也不符合我们说的,在ray worker里复用FSDP/Megatron这些SPMD的写法。
因此,要使用多GPU的SPMD范式,我们需要init多个worker,然后通过一套可复用的范式去管理它们。这也就是verl里WorkerGroup的概念,对于rl中的每个role,都有一组ray worker和一组GPU来构成相应的WorkerGroup。
我们先看Worker的init。在使用torchrun时,它会给我们配置好每个进程对应的world_size/rank/master_addr/master_port环境变量。之后我们丝滑地使用torch.distributed.init_process_group(backend="nccl")即可。
但是使用ray时,我们需要自己根据集群的设置来配置这些环境变量了。
下面用一个Column的TensorParallel的例子。如果不记得了它是什么,还是看这张图:
import ray
import torch
import torch.nn as nn
import socket
import os
import time
import logging
class ColumnTPLayer(torch.nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
self.layer = nn.Linear(input_size, output_size // int(os.environ['WORLD_SIZE']), bias=False).cuda()
def forward(self, x):
ret = self.layer(x.cuda())
output_tensor = torch.zeros(size=(int(os.environ['WORLD_SIZE']), ret.shape[0], ret.shape[1]), dtype=ret.dtype, device=ret.device)
torch.distributed.all_gather_into_tensor(output_tensor, ret, async_op=False)
output_tensor = torch.cat(output_tensor.unbind(dim=0), dim=-1)
return output_tensor
def load_weights(self, weight):
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
dim_per_rank = weight.shape[0] // world_size
self.layer.weight.data.copy_(weight[rank*dim_per_rank: (rank+1)*dim_per_rank, :])
def state_dict(self):
return self.layer.state_dict()
ray.init()
master_addr = ray._private.services.get_node_ip_address()
with socket.socket() as sock:
sock.bind(('', 0))
master_port = sock.getsockname()[1]
num_gpus = 2
workers = []
for i in range(num_gpus):
options = {'runtime_env': {'env_vars': {'WORLD_SIZE': str(num_gpus), 'RANK': str(i), 'MASTER_ADDR': master_addr, 'MASTER_PORT': str(master_port)}}}
workers.append(ray.remote(num_gpus=1)(ColumnTPLayer).options(**options).remote(2, 6))
batch_size = 10
input_data = torch.randn(batch_size, 2)
full_layer = torch.nn.Linear(2, 6, bias=False)
weight = full_layer.state_dict()['weight']
ret_list = []
for i in range(num_gpus):
_ = ray.get(workers[i].load_weights.remote(weight))
for i in range(num_gpus):
ret_list.append(workers[i].forward.remote(input_data))
ret = ray.get(ret_list)
ray.shutdown()
fl_ret = full_layer(input_data).cpu()
torch.testing.assert_close(ret[0], ret[1])
torch.testing.assert_close(ret[0].cpu(), fl_ret)
这个例子是我们之前那个TP例子的ray版,同样的使用2GPU。可以看出,在具体的ColumnTPLayer实现上并没有什么区别。主要是增加了init_process_group的逻辑。
-
在torchrun版本中,每个rank会完整运行整段代码,所以init_process_group可以放在外面;
-
而ray版本每个Worker只会运行class部分的代码,所以需要写在class的function里。
同时,torchrun会配置环境变量和拉起进程,这部分需要ray补上。可以看到init的流程变成了在options中配置好了一串环境变量,然后通过ray.remote(num_gpus=1)拉起了worker。
而对于load_weights和forward操作,也需要ray driver通过remote方法去让ray worker运行相应的函数。而在torchrun写法中则因为是SPMD写法,不存在driver,直接用worker运行就好。
我们接下来看之前没有注意到的地方。这里使用的是多个ray.remote(num_gpus=1)来启动一个role。在所有role完全分开放置,也就是完全不colocate的情况下是没有什么问题的。
但是当我们想把actor/critic/ref/reward两两放在同一个集群,问题来了。此时是两个模型共享一个gpu集群,也就是说每块gpu上都要有这两个模型的一部分。但num_gpus=1的写法是独占一整块内存的。
固然我们可以转而尝试ray.remote(num_gpus=0.5)这种写法。但将上面代码改成0.5会得到这个报错。
Duplicate GPU detected : rank 1 and rank 0 both on CUDA device 80
因为这样本质是在gpu0上init两个worker,每个worker占了gpu0的“一半“,而不是在gpu0的“一半”和gpu1的“一半”上分别init。而后者却是我们想要的。
因此我们需要更细粒度的资源管理方式,也就是verl中的RayResourcePool。
def get_placement_groups(self, strategy="STRICT_PACK", name=None):
pg_scheme = [[{
"CPU": self.max_collocate_count,
"GPU": 1
} if self.use_gpu else {
"CPU": self.max_collocate_count
} for _ in range(process_count)] for process_count in self._store]
lifetime = 'detached'if self.detached else None
pgs = [
placement_group(bundles=bundles, strategy=strategy, name=pg_name_prefix + str(idx), lifetime=lifetime)
for idx, bundles in enumerate(pg_scheme)
]
ray.get([pg.ready() for pg in pgs])
self.pgs = pgs
return pgs
RayResourcePool先将机器资源切分为若干bundle,每个bundle包含1块gpu。然后在worker调度时,就可以指定将当前worker调度在哪一个bundle上,写法例如:
for gpu_idx in range(num_gpus):
ray_options = {"placement_group": pg, "placement_group_bundle_index": gpu_idx, 'num_gpus': 0.5}
rst = run.options(**ray_options).remote(x)
在明确了RayResourcePool的init,以及如何将它绑定到worker上之后。接下来我们要思考数据流的问题了。
从我们ColumnTPLayer的例子看出,现在class内部只负责计算以及进行计算所必须的通信,不再考虑数据流了。
我们再举一个2gpu DP和TP混合的例子,可能更好理解现在这种写法的区别。我们在这个例子中实现了这样的一个计算,模拟使用ray在不同并行度的模型之间调度的逻辑。
import ray
import torch
import torch.nn as nn
import socket
import os
import logging
from ray.util.placement_group import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Follow defination above
@ray.remote
class DPModel(torch.nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
self.layer = nn.Linear(input_size, output_size, bias=False).cuda()
def forward(self, x):
x = dp_data_future_process(x)
return self.layer(x.cuda())
def load_weights(self, weight):
self.layer.weight.data.copy_(weight)
def state_dict(self):
return self.layer.state_dict()
@ray.remote
class ColumnTPModel(torch.nn.Module):
def __init__(self, input_size, output_size):
super().__init__()
if not torch.distributed.is_initialized():
torch.distributed.init_process_group(backend="nccl")
self.layer = nn.Linear(input_size, output_size // int(os.environ['WORLD_SIZE']), bias=False).cuda()
def forward(self, x):
if isinstance(x, ray.ObjectRef):
x = ray.get(x)
elif isinstance(x, list) and isinstance(x[0], ray.ObjectRef):
x = torch.cat(ray.get(x), dim=0)
ret = self.layer(x.cuda())
output_tensor = torch.zeros(size=(int(os.environ['WORLD_SIZE']), ret.shape[0], ret.shape[1]), dtype=ret.dtype, device=ret.device)
torch.distributed.all_gather_into_tensor(output_tensor, ret, async_op=False)
output_tensor = torch.cat(output_tensor.unbind(dim=0), dim=-1)
return output_tensor
def load_weights(self, weight):
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
dim_per_rank = weight.shape[0] // world_size
self.layer.weight.data.copy_(weight[rank*dim_per_rank: (rank+1)*dim_per_rank, :])
def state_dict(self):
return self.layer.state_dict()
def create_placement_group(num_gpus):
"""Create and return a placement group for GPU allocation."""
bundles = [{"CPU": 4, "GPU": 1} for _ in range(num_gpus)]
pg = placement_group(bundles=bundles, strategy="STRICT_PACK")
ray.get(pg.ready())
return pg
def get_network_config():
"""Get network configuration for distributed setup."""
master_addr = ray._private.services.get_node_ip_address()
with socket.socket() as sock:
sock.bind(('', 0))
master_port = sock.getsockname()[1]
return master_addr, master_port
def create_worker_options(pg, rank, num_gpus, master_addr, master_port):
"""Create options for Ray workers."""
return {
'runtime_env': {
'env_vars': {
'WORLD_SIZE': str(num_gpus),
'RANK': str(rank),
'MASTER_ADDR': master_addr,
'MASTER_PORT': str(master_port)
}
},
'scheduling_strategy': PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_bundle_index=rank
),
'num_gpus': 0.2
}
def dp_data_future_process(x):
rank = int(os.environ['RANK'])
world_size = int(os.environ['WORLD_SIZE'])
if isinstance(x, list):
if isinstance(x[0], ray.ObjectRef):
x = torch.cat(ray.get(x), dim=0)
else:
x = torch.cat(x, dim=0)
if isinstance(x, torch.Tensor):
bs_per_rank = x.shape[0] // world_size
x = x[bs_per_rank*rank: bs_per_rank*(rank+1)]
elif isinstance(x, ray.ObjectRef):
x = ray.get(x)
return x
# Initialize Ray
ray.init()
# Configuration
num_gpus = 2
batch_size = 10
# Create placement group
pg = create_placement_group(num_gpus)
# Layer configurations
layer_configs = [
{'type': DPModel, 'num_gpus': 1, 'input_size': 2, 'output_size': 6},
{'type': ColumnTPModel, 'num_gpus': 2, 'input_size': 6, 'output_size': 8},
{'type': DPModel, 'num_gpus': 2, 'input_size': 8, 'output_size': 4}
]
# Create all layer workers
all_worker_group = []
for config in layer_configs:
master_addr, master_port = get_network_config()
worker_group = []
for i in range(config['num_gpus']):
options = create_worker_options(pg, i, config['num_gpus'], master_addr, master_port)
worker_group .append(config['type'].options(**options).remote(config['input_size'], config['output_size']))
all_worker_group.append(worker_group)
worker_group_1, worker_group_2, worker_group_3 = all_worker_group
# Prepare input data and reference layers
input_data = torch.randn(batch_size, 2)
reference_layers = [
torch.nn.Linear(2, 6, bias=False),
torch.nn.Linear(6, 8, bias=False),
torch.nn.Linear(8, 4, bias=False)
]
weights = [layer.state_dict()['weight'] for layer in reference_layers]
# Load weights
ray.get(worker_group_1[0].load_weights.remote(weights[0]))
for i in range(2): # max_gpus
ray.get(worker_group_2[i].load_weights.remote(weights[1]))
ray.get(worker_group_3[i].load_weights.remote(weights[2]))
cur_data = ray.put(input_data)
# Forward passes
# First layer (DP with 1 GPU)
batch_per_gpus = batch_size // num_gpus
cur_result = worker_group_1[0].forward.remote(cur_data)
ret = [cur_result]
# Second layer (ColumnTP with 2 GPUs)
ret_list_2 = []
batch_per_gpus = batch_size // num_gpus
for i in range(num_gpus):
cur_result = worker_group_2[i].forward.remote(ret)
ret_2 = [cur_result]
# Third layer (DP with 2 GPUs)
final_list = []
for i in range(num_gpus):
cur_data = ret_2
cur_result = worker_group_3[i].forward.remote(cur_data)
final_list.append(cur_result)
# Get and combine results
final = ray.get(final_list)
ret = torch.cat(final, dim=0)
# Compare with reference implementation
fl_ret = input_data
for i in range(3):
fl_ret = reference_layers[i](fl_ret)
torch.testing.assert_close(ret.cpu(), fl_ret)
# Cleanup
ray.shutdown()
我们沿用了之前的模型定义和计算流,三个模型作为三个worker group初始化,内部各自维护了一套分布式环境。
而driver只需要编排模型之间计算的顺序,维护数据流即可。具体来说,driver会管理当前batch的全部数据,区别于我们之前写的SPMD版本,数据是每个rank维护自己那份。
在这种方式下,用户通过改动driver部分的逻辑,即可实现自定义训练流程,以及各角色的资源分配,而无需关注worker部分的实现细节,深入worker部分去改数据流,对着一堆nccl hang掉头发。
以上的代码中,在worker group之间传递的不是原始Tensor,而是ray.ObjectRef。这样我们就可以把ray.get()这部分的逻辑放在worker里异步执行,而不必在driver启动任务的时候因为前一个任务的ray.get(),阻塞后一个任务的启动。这也就是HybridFlow中的这张图
在上面的例子中,我们定义了dp_data_future_process,用于将driver的数据分配到各个dp rank。
这实际上是一种数据处理标准化的写法。对应在verl里就是verl/single_controller/base/decorator.py中定义的transfer protocols。
不同的通信方式需要对数据做相应的操作。例如dp中需要在计算前把数据切分到各个rank,计算后又要将它们gather到一起。
进一步地,我们上面的例子中colocate的每个模型都有一个单独的进程,由ray拉起。
而verl中通过create_colocated_worker_cls,把colocate的模型放在同一个进程去运行,更SPMD了。或许会有一些性能上的收益,如同一个进程在显存释放和回收上可以做得更加高效。这个在这篇文章中有更详细的解析。
白强伟:【AI Infra】【RLHF框架】二、VeRL中colocate实现源码解析
至此,我们可以来完整过一遍verl中ray的使用逻辑了。
首先用户需要编写好各个Worker,即verl中给的ActorRolloutRefWorker或CriticWorker,它们的写法和SPMD别无二致,初始化模型,并封装给对应的角色,能够做相应角色的计算。
https://github.com/volcengine/verl/blob/v0.2.0.post2/verl/workers/fsdp_workers.py#L72稍有不同的是,对于需要暴露给ray driver调度的函数,需要增加@register(dispatch_mode)的decorator,从而将driver里的输入dispatch给worker,并且对worker的返回值按照一定的规则collect起来。
-
对于模型保存,每个worker的save path需要和driver给的save path一致;
-
对于计算部分,在dp时要把数据切分给各个worker,计算完成后再合并起来给driver。
单个worker定义好了之后,verl会把它们封装进RayWorkerGroup。在我们上面的demo中,WorkerGroup就是第一个简单的List[Worker],而verl的实现接口更加干净,但是思路和定位是类似的。
每个RayWorkerGroup初始化时,需要传入resource_pool和ray_cls_with_init。
-
前者就是我们上面介绍的PlacementGroup,即当前WorkerGroup所需要的机器资源;
-
而后者是对原始的worker class简单魔改了ray.remote拉起worker的逻辑,把PlacementGroup的配置加进了options里。
接下来比较关键的点在于魔改worker的function,让它能适配driver给过来的输入,并返回给driver正确的输出。我们先看@register(dispatch_mode)在做什么。
def register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.ALL, blocking=True, materialize_futures=True):
_check_dispatch_mode(dispatch_mode=dispatch_mode)
_check_execute_mode(execute_mode=execute_mode)
def decorator(func):
@wraps(func)
def inner(*args, **kwargs):
if materialize_futures:
args, kwargs = _materialize_futures(*args, **kwargs)
return func(*args, **kwargs)
attrs = {'dispatch_mode': dispatch_mode, 'execute_mode': execute_mode, 'blocking': blocking}
setattr(inner, MAGIC_ATTR, attrs)
return inner
return decorator
可以看出register只是给挂了该decorator的函数新增了一个MAGIC_ATTR,存储几个后续需要的变量。而主要的逻辑在_bind_worker_method中。
代码从这里开始
https://github.com/volcengine/verl/blob/4d27461d0296d1cd602c264e578d294b739795e5/verl/single_controller/base/worker_group.py#L138
def _bind_worker_method(self, user_defined_cls, func_generator):
user_defined_cls即为用户定义的worker,func_generator的定义则是定义了dispatch->execute->ray.get->collect的工作流。
def func_generator(self, method_name, dispatch_fn, collect_fn, execute_fn, blocking):
def func(*args, **kwargs):
args, kwargs = dispatch_fn(self, *args, **kwargs)
output = execute_fn(method_name, *args, **kwargs)
if blocking:
output = ray.get(output)
output = collect_fn(self, output)
return output
return func
_bind_worker_method首先遍历所有class的所有method,找出带有MAGIC_ATTR,也就是有decorator的function。
function.MAGIC_ATTR存储了dispatch_mode/execute_mode/blocking,根据不同的值找到预制好的dispatch/collect/execute逻辑,配置进func_generator中。
最后将原始的method替换成func_generator给定的魔改过的function,从而在原始function融入数据流的处理逻辑。
这一逻辑可复用且干净,不用再一个个手写相似的数据处理逻辑了,例如我们反复提到的DP的gather和split。
实际上这部分代码的本质是,verl这里把不同的SPMD并行模式用到的数据流逻辑做成“预制菜”,不需要对每种情况都写一遍相似的代码,从而来让上层接口变得简洁。
之前遗漏的超参分析
我们结合verl的这两个文档来看。
Performance Tuning Guide
Config Explanationrollout往往是rlhf过程中耗时的大头。config中的data.train_batch_size指的是跑一个iter过的数据条数。注意它既不是每个gpu/dp的batch_size,也不是rollout的batch_size。
事实上不存在rollout batch_size的说法。我们的场景下无需考虑延时,只需要最大化吞吐即可。在目前推理引擎使用Continuous Batching的前提下,batch_size都是“动态”的,这也意味着即使data.train_batch_size=114514,在rollout的时候也不会OOM(实际上后面的阶段也不会OOM)
vLLM会根据自己的max_num_seqs/max_num_batched_tokens去调度每轮需要跑多少数据,当然还会受vLLM预先分配的kv cache占用限制。
-
因此需要在不OOM的前提下调大gpu_memory_utilization,更大的kv cache能够跑更大的batch,提高吞吐量。
-
需要调高max_num_seqs和max_num_batched_tokens,同样能增大吞吐量,但是也要避免开太大导致频繁达到预分配的kv cache上限,频繁出现抢占/重新计算等情况。
-
调低tp size。通常来说更小的tp size意味着更高的显存消耗,但我们前面已经说过tp通信开销很大,一般来说只要不OOM,就尽量开小。
而data_batch_size/mini_batch_size/micro_batch_size 的区别用这段伪代码就能说明白。由于使用了ray,这些代码都是在ray driver中运行的,也就是这里的数据实际上指的是在整个集群运行的数据。
batch_data = dataset.sample(data_batch_size)
#stage1
batch_data = rollout_model.generate(batch_data)
...
#stage3
for mini_batch in batch_data.sample(mini_batch_size):
for micro_batch in mini_batch.sample(micro_batch_size):
loss1 = actor_model(micro_batch)
loss2 = critic_model(micro_batch)
loss1.backward()
loss2.backward()
actor_optim.step()
critic_optim.step()
一点碎碎念
不过随着技术的演进,现在GRPO把critic model优化掉了,用数学题这种可以通过function而非model来给定reward的方式把reward model优化掉了,最近又有提出可以不用kld约束,把reference model也优化掉了。现在糊一个RLHF流程的难度大大下降了,之后是否仍然会回到SPMD为主的范式,尚未可知。
文章来源:https://zhuanlan.zhihu.com/p/30876678559