先从一个问题入手:verl 为何能将复杂的 RLHF/PPO 训练流程写得像一段单进程代码,而实际计算却能跑在多 GPU worker 上?答案隐藏在 HybridFlow 的设计边界中。
核心结论很清晰:HybridFlow 将 RL 算法的控制流保留在单 controller 中,而将 rollout、logprob、value、actor/critic 更新等重型计算交给 WorkerGroup。这不是一个独立的类,而是一组源码边界——RayPPOTrainer.fit()定义高层算法执行顺序;TaskRunner和 init_workers()把 role、resource pool、worker class 装配成可调用的系统;DataProto/ TensorDict承载每一步产生的训练数据;@register和 RayWorkerGroup将一次普通方法调用转化为 dispatch、remote execute、collect 流程。这套设计带来的收益是算法可读性、后端可替换性以及角色放置的灵活性;代价是 controller 成为数据往返和调度的汇聚点。本文不展开 single_controller的完整实现细节,那是下一篇的主题。这里先建立理解 HybridFlow 的最短源码路径。
先纠正一个误解:RLHF 不是普通训练脚本
阅读 HybridFlow 之前,不要从 Ray、decorator 或 worker 细节入手。更好的入口是理解 RLHF 的系统架构。普通监督训练大多可以简化为:
batch -> forward -> loss -> backward -> optimizer step
但 PPO/GRPO 后训练并非如此。它的一个 step 至少包含:
prompt-> rollout-> reward-> old/ref logprob-> value-> advantage-> actor/critic update-> weight sync-> next rollout
下面这张图请关注中间的闭环:rollout 依赖当前策略,reward/logprob/value 依赖 rollout 结果,actor 更新后还需将新权重同步给下一轮生成服务。

这张图的含义是:后训练系统的难点不仅在于“训练一个模型”,而是让训练、推理、奖励、数据协议与权重同步在同一个循环中对齐。HybridFlow 的作用,就是让这个循环在算法层依然可读,同时允许底层计算分布式执行。
HybridFlow 的边界:控制流留在 controller
本地文档 docs/hybrid_flow.rst:45-79将 RL dataflow 拆分为两层:control flow——高层算子执行的顺序,例如先 rollout,再算 advantage,最后训练;computation flow——每个高层算子内部的神经网络计算,如 forward、backward、optimizer step。verl 选择了第二种设计:单进程 controller 负责 control flow,多进程 worker 负责 computation flow。
下面这张图请对比左右两种路径的取舍。左边将控制流也下沉到 worker,在固定流程中可能更紧凑;右边是 verl 的路线,将算法顺序保留在 controller,将重型计算挂载到 worker 方法之后。

图后的源码证据有三处:docs/hybrid_flow.rst:68-79明确说明 verl 采用分离 control flow 与 computation flow 的策略;verl/trainer/ppo/ray_trainer.py:1274-1279中的 fit() docstring 指出 driver 通过 RPC 调用 worker group 的 compute functions 来构造 PPO dataflow;verl/single_controller/ray/base.py:48-66中的 func_generator()展示了一次 WorkerGroup 方法调用会经过 dispatch、remote execute、collect 和可选 unpad 步骤。
因此,HybridFlow 不仅仅是“将 PPO 分布式化”。其关键在于将算法顺序与计算执行分离:controller 负责“下一步做什么”,worker 负责“这一步如何在多 GPU 上计算”。
最短源码路径:只需抓住六个点
第二篇文章不需要读完 single_controller的所有实现。建议按照以下六个关键点逐步深入:
verl/trainer/main_ppo.py:48-98run_ppo()初始化 Ray,然后启动远程TaskRunner。verl/trainer/main_ppo.py:107-187TaskRunner建立 role 到 worker class、role 到 resource pool 的映射。verl/trainer/main_ppo.py:219-311TaskRunner.run()创建 tokenizer、dataset、resource pool manager、RayPPOTrainer,随后调用init_workers()和fit()。verl/trainer/ppo/ray_trainer.py:688-884RayPPOTrainer.init_workers()将配置和资源池转化为 WorkerGroup、LLM server manager、AgentLoopManager、CheckpointEngineManager。verl/trainer/ppo/ray_trainer.py:1274-1583RayPPOTrainer.fit()写出 PPO 的高层 dataflow。verl/workers/engine_workers.py:631-650和verl/single_controller/ray/base.py:48-66worker 方法通过@register声明分布式调用规则;WorkerGroup 调用时执行 dispatch、remote、collect。
下面这张图只解决一个问题:从入口脚本走到 PPO 主循环,中间需要经过哪些源码层。

图后要牢记三层分工:main_ppo.py是启动和装配层,而非训练算法层;init_workers()是系统对象初始化层,负责连接资源、worker 和 manager;fit()是算法 dataflow 层,读者应在此处查看 PPO/GRPO 的阶段顺序。
装配层:先有 role,再有 resource pool
TaskRunner的职责并非训练,而是先将 RLHF 中的 high-level operator 角色化。源码中有两个核心字典:
self.role_worker_mapping = {}
self.mapping = {}
role_worker_mapping回答“哪个 role 使用哪个 worker class”;mapping回答“哪个 role 放入哪个 resource pool”。在当前代码中:actor/rollout/ref 使用 ActorRolloutRefWorker,默认映射到 global_pool,见 verl/trainer/main_ppo.py:122-142;critic 使用 TrainingWorker,默认也映射到 global_pool,见 verl/trainer/main_ppo.py:144-152;reward model 和 teacher model 先登记资源池映射,不在此处注册训练 worker,见 verl/trainer/main_ppo.py:189-208;init_resource_pool_mgr()根据 GPU 数量和节点数创建 ResourcePoolManager,见 verl/trainer/main_ppo.py:154-187。
下面这张图请关注 role、resource pool、WorkerGroup 的边界。它并非说明所有角色必须 colocate,而是强调 controller 首先看到的是角色和资源池,而非裸 Ray actor。

图后的设计含义是:算法主循环不直接关心 rank、进程和 GPU 拓扑。它只需要知道 actor、critic、ref、reward、rollout 这些 high-level operator 是否已被装配成可调用对象。
init_workers():将配置变成可调用系统
RayPPOTrainer.init_workers()是从“配置描述”走向“可调用系统”的关键步骤。它先创建资源池和 role/class 映射:self.resource_pool_manager.create_resource_pool()(verl/trainer/ppo/ray_trainer.py:695);self.resource_pool_to_cls = ...(verl/trainer/ppo/ray_trainer.py:697);actor/rollout/ref 放入 actor resource pool(verl/trainer/ppo/ray_trainer.py:699-709);critic 被转换为 TrainingWorkerConfig后放入 critic resource pool(verl/trainer/ppo/ray_trainer.py:713-738)。
随后创建 WorkerGroup:
create_colocated_worker_cls(class_dict)-> RayWorkerGroup(resource_pool, ray_cls_with_init)-> spawn(prefix_set=class_dict.keys())
对应 verl/trainer/ppo/ray_trainer.py:773-783。后半段再接入推理和环境侧组件:RewardLoopManager(verl/trainer/ppo/ray_trainer.py:812-822);LLMServerManager.create(...)(verl/trainer/ppo/ray_trainer.py:854-856);AgentLoopManager.create(...)(verl/trainer/ppo/ray_trainer.py:863-868);CheckpointEngineManager(...)(verl/trainer/ppo/ray_trainer.py:870-884)。这解释了为什么主循环中的一句 generate_sequences(...)并非普通的 model.generate(),其背后已经连接了 rollout server、agent loop、reward loop、teacher client 和 checkpoint replicas。
fit():PPO dataflow 被写成单进程程序
现在进入第二篇的核心:RayPPOTrainer.fit()。其 docstring 说明 driver process 通过 RPC 调用 worker group 的 compute functions 来构造 PPO dataflow,轻量级的 advantage computation 在 driver process 上完成,见 verl/trainer/ppo/ray_trainer.py:1274-1279。
下面这张图请观察 DataProto 如何在一个 step 中不断增加字段。rollout 增加 response,reward 增加 score,logprob/value 增加训练信号,advantage 将这些信号转化为 update 所需的字段。

图后的源码路径可以压缩为:
DataProto.from_single_dict(batch_dict)-> _get_gen_batch-> repeat rollout.n-> async_rollout_manager.generate_sequences-> batch.repeat batch.union(response)-> reward-> _compute_old_log_prob-> _compute_ref_log_prob-> _compute_values-> compute_advantage-> _update_critic-> _update_actor-> checkpoint_manager.update_weights
这些步骤集中在 verl/trainer/ppo/ray_trainer.py:1330-1583。这里有若干分支需要注意,但不需要在第二篇展开:REMAX 将 sampled rollout 和 greedy baseline 合并到同一次生成请求中,见 verl/trainer/ppo/ray_trainer.py:1358-1390;rollout correction 可以选择 bypass 或 recompute old logprobs,见 verl/trainer/ppo/ray_trainer.py:1435-1482;ref policy、critic 均按配置条件执行,见 verl/trainer/ppo/ray_trainer.py:1484-1494;actor 更新后,checkpoint_manager.update_weights(...)将训练侧权重同步给 rollout replicas,见 verl/trainer/ppo/ray_trainer.py:1581-1583。这些分支说明 fit()并非刻板脚本,而是高层 dataflow 的编排位置。
_compute_*:controller 和 worker 的桥梁
fit()中会调用几类短函数:
self._compute_old_log_prob(batch)
self._compute_ref_log_prob(batch)
self._compute_values(batch)
self._update_actor(batch)
self._update_critic(batch)
这些函数并非重型计算本身。它们的职责是将 controller 侧的 DataProto转换为 worker 侧更适合处理的 TensorDict,设置元信息,调用 WorkerGroup,再将返回结果封装回 DataProto。
下面这张图请关注数据类型和职责变化:fit()持有 DataProto,_compute_*将其转为 TensorDict,WorkerGroup 返回 logprob/value/metrics,结果再回到 DataProto。

以 _compute_old_log_prob()为例,源码路径为:batch.to_tensordict()(verl/trainer/ppo/ray_trainer.py:1171);left_right_2_no_padding(...)(verl/trainer/ppo/ray_trainer.py:1173);tu.assign_non_tensor(...)(verl/trainer/ppo/ray_trainer.py:1176-1181);self.actor_rollout_wg.compute_log_prob(batch_td)(verl/trainer/ppo/ray_trainer.py:1182);DataProto.from_tensordict(...)(verl/trainer/ppo/ray_trainer.py:1201-1202)。同样的桥接模式也出现在 _compute_ref_log_prob()、_compute_values()、_update_actor()、_update_critic(),集中在 verl/trainer/ppo/ray_trainer.py:1130-1272。这说明 controller 不做 actor forward/backward,也不做 critic forward/backward。controller 的任务是准备 batch、选择阶段、调用对应的 WorkerGroup、合并返回结果。
@register:普通方法调用背后的分布式协议
桥梁的另一端是 worker 方法上的 @register。ActorRolloutRefWorker中有三个关键方法:
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="ref"))
def compute_ref_log_prob(...)
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
def compute_log_prob(...)
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
def update_actor(...)
对应 verl/workers/engine_workers.py:631-650。下面这张图请关注三层关系:worker 源码上有装饰器,装饰器将 dispatch/execute/blocking 元信息挂在方法上,controller 侧看到的只是一个普通方法调用。

图后的关键源码为:register()在 verl/single_controller/base/decorator.py:398-444,它为方法挂上 dispatch_mode、execute_mode、blocking;make_nd_compute_dataproto_dispatch_fn(mesh_name)在 verl/single_controller/base/decorator.py:300-304,它返回 mesh-specific dispatch/collect 函数;func_generator()在 verl/single_controller/ray/base.py:48-66,它实际执行 dispatch、Ray remote execute、collect 和可选 unpad。
因此 controller 写:self.actor_rollout_wg.compute_log_prob(batch_td),背后发生的是:dispatch_fn-> execute_fn(method_name, ...)-> optional ray.get-> collect_fn-> unpad if needed。下一篇讲 single_controller,就是要拆解这条链路。
DataProto:高层 dataflow 中流动的是训练证据
DataProto是 controller 侧 dataflow 的标准容器。其定义在 verl/protocol.py:318-328,由三部分组成:
batch: TensorDict
non_tensor_batch: dict
meta_info: dict
下面这张图需与 fit()中的 batch.union(...)一起理解。DataProto 不仅存放 tensor;它还需让 uid、reward extra info、temperature、global step、metrics 等上下文与 tensor 字段保持在同一 batch 轴。

在第二篇中,需要记住 DataProto 的五个操作:from_single_dict()将 dataloader 输出转换为 DataProto,见 verl/protocol.py:480-493;repeat()支持 rollout.n多采样,见 verl/protocol.py:971-1100;union()将每个阶段的新字段合并回 batch,见 verl/protocol.py:781-798;chunk()/ concat()支持切分和合并,见 verl/protocol.py:864-961;to_tensordict()支持进入 worker 前变换视图,见 verl/protocol.py:1102-1126。这就是为什么我们将 DataProto 称为“训练证据”的载体。rollout 后有 response;reward 后有 score;logprob/value 后有 policy/value 信号;advantage 后才有 actor/critic update 所需的字段。
取舍:可读算法流换来中心化数据往返
到这里可以回到设计取舍。HybridFlow 的收益是:算法顺序集中在 fit(),计算后端被 worker 抽象隔离,角色和资源可以通过 mapping 重放置。docs/hybrid_flow.rst:203-206也将“更换 computation backend”和“更改 WorkerGroup/ResourcePool placement”列为此范式的重要 takeaways。但收益并非免费。本地文档 docs/hybrid_flow.rst:75-79已指出,分离 control flow 与 computation flow 会带来额外的数据通信开销。
下面这张图请关注四类成本:字段增长、object columns、序列化、对齐风险。它并非说明 DataProto 设计错误,而是提醒读者:统一协议越有用,它经过 controller 时也可能变得越重。

源码中的表现很具体:rollout 结果回到 controller,再通过 batch.union(gen_batch_output)合并,见 verl/trainer/ppo/ray_trainer.py:1404-1406;reward、old logprob、ref logprob、values 都会将结果合并回 batch,见 verl/trainer/ppo/ray_trainer.py:1426-1494;advantage 在 driver/controller 上计算,见 verl/trainer/ppo/ray_trainer.py:1496-1541;actor/critic update 的 metrics 回到 controller,见 verl/trainer/ppo/ray_trainer.py:1543-1586;actor 更新后还要同步权重给 rollout replicas,见 verl/trainer/ppo/ray_trainer.py:1581-1583。因此本文的判断可以更精确地表述为:HybridFlow 用 controller 上的可读算法流程,换取了更多的跨 worker 数据往返、collect 等待和协议字段管理成本。这不是缺点清单,而是理解 verl 后续文章的入口。第三篇讲 single_controller,就是要看这种“普通方法调用”的成本与收益如何在 dispatch/collect 层落地。
小结:第二篇只需记住这条边界
读完第二篇,应记住三句话:
RayPPOTrainer.fit()写 RLHF/PPO 的高层 dataflow
WorkerGroup @register将 high-level operator 变成分布式执行
DataProto / TensorDict在 controller 和 worker 之间携带训练证据
回到本系列地图,第一篇解释了为何 AI 后训练不是一个训练脚本;第二篇将系统拆解为 dataflow、controller、worker 和协议边界。下一篇将进入 Single Controller:一次看似普通的 Python 方法调用,如何变成一组 Ray worker 上的 dispatch、execute、collect。
本文源码索引
概念文档:
docs/hybrid_flow.rst:45-79:control flow / computation flow 以及 verl 的分离策略。docs/hybrid_flow.rst:160-177:dispatch、collect 与@register的文档解释。docs/hybrid_flow.rst:180-206:PPO 主循环示意和 takeaways。
入口和装配:
verl/trainer/main_ppo.py:48-98:run_ppo()初始化 Ray 并启动远程TaskRunner。verl/trainer/main_ppo.py:107-187:TaskRunner的 role mapping 与 resource pool mapping。verl/trainer/main_ppo.py:219-311:TaskRunner.run()装配 dataset、resource pool、trainer,并调用init_workers()/fit()。verl/trainer/ppo/ray_trainer.py:688-884:init_workers()创建 WorkerGroup、reward loop、LLM server、agent loop 和 checkpoint manager。
PPO 主循环:
verl/trainer/ppo/ray_trainer.py:1274-1279:fit()docstring 对 PPO dataflow 的说明。verl/trainer/ppo/ray_trainer.py:1330-1406:dataloader batch、uid、rollout、repeat、union。verl/trainer/ppo/ray_trainer.py:1426-1541:reward、old logprob、ref logprob、values、advantage。verl/trainer/ppo/ray_trainer.py:1543-1583:critic update、actor update、weight sync。
controller 到 worker:
verl/trainer/ppo/ray_trainer.py:1130-1272:_compute_values()、_compute_ref_log_prob()、_compute_old_log_prob()、_update_actor()、_update_critic()。verl/workers/engine_workers.py:76-81:TrainingWorker的定位。verl/workers/engine_workers.py:238-385:TrainingWorker的train_mini_batch()、train_batch()、infer_batch()。verl/workers/engine_workers.py:439-650:ActorRolloutRefWorker的 actor/ref/rollout 组合及 high-level methods。
协议和分发:
verl/protocol.py:318-328:DataProto 三层结构。verl/protocol.py:480-493:from_single_dict()。verl/protocol.py:781-798:union()。verl/protocol.py:864-961:chunk()/concat()。verl/protocol.py:971-1100:repeat()。verl/protocol.py:1102-1126:to_tensordict()。verl/single_controller/base/decorator.py:300-304:mesh-specific DataProto dispatch/collect 函数生成。verl/single_controller/base/decorator.py:398-444:register()为 worker method 挂分布式元信息。verl/single_controller/ray/base.py:48-66:func_generator()执行 dispatch、remote、collect。
