游乐游手机版
首页/AI教程/文章详情

HybridFlow:将RLHF视为高层数据流

时间:2026-07-02 12:05
先从一个问题入手:verl 为何能将复杂的 RLHF PPO 训练流程写得像一段单进程代码,而实际计算却能跑在多 GPU worker 上?答案隐藏在 HybridFlow 的设计边界中。 核心结论很清晰:HybridFlow 将 RL 算法的控制流保留在单 controller 中,而将 roll

先从一个问题入手:verl 为何能将复杂的 RLHF/PPO 训练流程写得像一段单进程代码,而实际计算却能跑在多 GPU worker 上?答案隐藏在 HybridFlow 的设计边界中。

核心结论很清晰:HybridFlow 将 RL 算法的控制流保留在单 controller 中,而将 rollout、logprob、value、actor/critic 更新等重型计算交给 WorkerGroup。这不是一个独立的类,而是一组源码边界——RayPPOTrainer.fit()定义高层算法执行顺序;TaskRunnerinit_workers()把 role、resource pool、worker class 装配成可调用的系统;DataProto/ TensorDict承载每一步产生的训练数据;@registerRayWorkerGroup将一次普通方法调用转化为 dispatch、remote execute、collect 流程。这套设计带来的收益是算法可读性、后端可替换性以及角色放置的灵活性;代价是 controller 成为数据往返和调度的汇聚点。本文不展开 single_controller的完整实现细节,那是下一篇的主题。这里先建立理解 HybridFlow 的最短源码路径。

先纠正一个误解:RLHF 不是普通训练脚本

阅读 HybridFlow 之前,不要从 Ray、decorator 或 worker 细节入手。更好的入口是理解 RLHF 的系统架构。普通监督训练大多可以简化为:

batch -> forward -> loss -> backward -> optimizer step

但 PPO/GRPO 后训练并非如此。它的一个 step 至少包含:

prompt-> rollout-> reward-> old/ref logprob-> value-> advantage-> actor/critic update-> weight sync-> next rollout

下面这张图请关注中间的闭环:rollout 依赖当前策略,reward/logprob/value 依赖 rollout 结果,actor 更新后还需将新权重同步给下一轮生成服务。

\

这张图的含义是:后训练系统的难点不仅在于“训练一个模型”,而是让训练、推理、奖励、数据协议与权重同步在同一个循环中对齐。HybridFlow 的作用,就是让这个循环在算法层依然可读,同时允许底层计算分布式执行。

HybridFlow 的边界:控制流留在 controller

本地文档 docs/hybrid_flow.rst:45-79将 RL dataflow 拆分为两层:control flow——高层算子执行的顺序,例如先 rollout,再算 advantage,最后训练;computation flow——每个高层算子内部的神经网络计算,如 forward、backward、optimizer step。verl 选择了第二种设计:单进程 controller 负责 control flow,多进程 worker 负责 computation flow。

下面这张图请对比左右两种路径的取舍。左边将控制流也下沉到 worker,在固定流程中可能更紧凑;右边是 verl 的路线,将算法顺序保留在 controller,将重型计算挂载到 worker 方法之后。

\

图后的源码证据有三处:docs/hybrid_flow.rst:68-79明确说明 verl 采用分离 control flow 与 computation flow 的策略;verl/trainer/ppo/ray_trainer.py:1274-1279中的 fit() docstring 指出 driver 通过 RPC 调用 worker group 的 compute functions 来构造 PPO dataflow;verl/single_controller/ray/base.py:48-66中的 func_generator()展示了一次 WorkerGroup 方法调用会经过 dispatch、remote execute、collect 和可选 unpad 步骤。

因此,HybridFlow 不仅仅是“将 PPO 分布式化”。其关键在于将算法顺序与计算执行分离:controller 负责“下一步做什么”,worker 负责“这一步如何在多 GPU 上计算”。

最短源码路径:只需抓住六个点

第二篇文章不需要读完 single_controller的所有实现。建议按照以下六个关键点逐步深入:

  • verl/trainer/main_ppo.py:48-98run_ppo()初始化 Ray,然后启动远程 TaskRunner
  • verl/trainer/main_ppo.py:107-187TaskRunner建立 role 到 worker class、role 到 resource pool 的映射。
  • verl/trainer/main_ppo.py:219-311TaskRunner.run()创建 tokenizer、dataset、resource pool manager、RayPPOTrainer,随后调用 init_workers()fit()
  • verl/trainer/ppo/ray_trainer.py:688-884RayPPOTrainer.init_workers()将配置和资源池转化为 WorkerGroup、LLM server manager、AgentLoopManager、CheckpointEngineManager。
  • verl/trainer/ppo/ray_trainer.py:1274-1583RayPPOTrainer.fit()写出 PPO 的高层 dataflow。
  • verl/workers/engine_workers.py:631-650verl/single_controller/ray/base.py:48-66worker 方法通过 @register声明分布式调用规则;WorkerGroup 调用时执行 dispatch、remote、collect。

下面这张图只解决一个问题:从入口脚本走到 PPO 主循环,中间需要经过哪些源码层。

\

图后要牢记三层分工:main_ppo.py是启动和装配层,而非训练算法层;init_workers()是系统对象初始化层,负责连接资源、worker 和 manager;fit()是算法 dataflow 层,读者应在此处查看 PPO/GRPO 的阶段顺序。

装配层:先有 role,再有 resource pool

TaskRunner的职责并非训练,而是先将 RLHF 中的 high-level operator 角色化。源码中有两个核心字典:

self.role_worker_mapping = {}
self.mapping = {}

role_worker_mapping回答“哪个 role 使用哪个 worker class”;mapping回答“哪个 role 放入哪个 resource pool”。在当前代码中:actor/rollout/ref 使用 ActorRolloutRefWorker,默认映射到 global_pool,见 verl/trainer/main_ppo.py:122-142;critic 使用 TrainingWorker,默认也映射到 global_pool,见 verl/trainer/main_ppo.py:144-152;reward model 和 teacher model 先登记资源池映射,不在此处注册训练 worker,见 verl/trainer/main_ppo.py:189-208init_resource_pool_mgr()根据 GPU 数量和节点数创建 ResourcePoolManager,见 verl/trainer/main_ppo.py:154-187

下面这张图请关注 role、resource pool、WorkerGroup 的边界。它并非说明所有角色必须 colocate,而是强调 controller 首先看到的是角色和资源池,而非裸 Ray actor。

\

图后的设计含义是:算法主循环不直接关心 rank、进程和 GPU 拓扑。它只需要知道 actor、critic、ref、reward、rollout 这些 high-level operator 是否已被装配成可调用对象。

init_workers():将配置变成可调用系统

RayPPOTrainer.init_workers()是从“配置描述”走向“可调用系统”的关键步骤。它先创建资源池和 role/class 映射:self.resource_pool_manager.create_resource_pool()verl/trainer/ppo/ray_trainer.py:695);self.resource_pool_to_cls = ...verl/trainer/ppo/ray_trainer.py:697);actor/rollout/ref 放入 actor resource pool(verl/trainer/ppo/ray_trainer.py:699-709);critic 被转换为 TrainingWorkerConfig后放入 critic resource pool(verl/trainer/ppo/ray_trainer.py:713-738)。

随后创建 WorkerGroup:

create_colocated_worker_cls(class_dict)-> RayWorkerGroup(resource_pool, ray_cls_with_init)-> spawn(prefix_set=class_dict.keys())

对应 verl/trainer/ppo/ray_trainer.py:773-783。后半段再接入推理和环境侧组件:RewardLoopManagerverl/trainer/ppo/ray_trainer.py:812-822);LLMServerManager.create(...)verl/trainer/ppo/ray_trainer.py:854-856);AgentLoopManager.create(...)verl/trainer/ppo/ray_trainer.py:863-868);CheckpointEngineManager(...)verl/trainer/ppo/ray_trainer.py:870-884)。这解释了为什么主循环中的一句 generate_sequences(...)并非普通的 model.generate(),其背后已经连接了 rollout server、agent loop、reward loop、teacher client 和 checkpoint replicas。

fit():PPO dataflow 被写成单进程程序

现在进入第二篇的核心:RayPPOTrainer.fit()。其 docstring 说明 driver process 通过 RPC 调用 worker group 的 compute functions 来构造 PPO dataflow,轻量级的 advantage computation 在 driver process 上完成,见 verl/trainer/ppo/ray_trainer.py:1274-1279

下面这张图请观察 DataProto 如何在一个 step 中不断增加字段。rollout 增加 response,reward 增加 score,logprob/value 增加训练信号,advantage 将这些信号转化为 update 所需的字段。

\

图后的源码路径可以压缩为:

DataProto.from_single_dict(batch_dict)-> _get_gen_batch-> repeat rollout.n-> async_rollout_manager.generate_sequences-> batch.repeat   batch.union(response)-> reward-> _compute_old_log_prob-> _compute_ref_log_prob-> _compute_values-> compute_advantage-> _update_critic-> _update_actor-> checkpoint_manager.update_weights

这些步骤集中在 verl/trainer/ppo/ray_trainer.py:1330-1583。这里有若干分支需要注意,但不需要在第二篇展开:REMAX 将 sampled rollout 和 greedy baseline 合并到同一次生成请求中,见 verl/trainer/ppo/ray_trainer.py:1358-1390;rollout correction 可以选择 bypass 或 recompute old logprobs,见 verl/trainer/ppo/ray_trainer.py:1435-1482;ref policy、critic 均按配置条件执行,见 verl/trainer/ppo/ray_trainer.py:1484-1494;actor 更新后,checkpoint_manager.update_weights(...)将训练侧权重同步给 rollout replicas,见 verl/trainer/ppo/ray_trainer.py:1581-1583。这些分支说明 fit()并非刻板脚本,而是高层 dataflow 的编排位置。

_compute_*:controller 和 worker 的桥梁

fit()中会调用几类短函数:

self._compute_old_log_prob(batch)
self._compute_ref_log_prob(batch)
self._compute_values(batch)
self._update_actor(batch)
self._update_critic(batch)

这些函数并非重型计算本身。它们的职责是将 controller 侧的 DataProto转换为 worker 侧更适合处理的 TensorDict,设置元信息,调用 WorkerGroup,再将返回结果封装回 DataProto

下面这张图请关注数据类型和职责变化:fit()持有 DataProto,_compute_*将其转为 TensorDict,WorkerGroup 返回 logprob/value/metrics,结果再回到 DataProto。

\

_compute_old_log_prob()为例,源码路径为:batch.to_tensordict()verl/trainer/ppo/ray_trainer.py:1171);left_right_2_no_padding(...)verl/trainer/ppo/ray_trainer.py:1173);tu.assign_non_tensor(...)verl/trainer/ppo/ray_trainer.py:1176-1181);self.actor_rollout_wg.compute_log_prob(batch_td)verl/trainer/ppo/ray_trainer.py:1182);DataProto.from_tensordict(...)verl/trainer/ppo/ray_trainer.py:1201-1202)。同样的桥接模式也出现在 _compute_ref_log_prob()_compute_values()_update_actor()_update_critic(),集中在 verl/trainer/ppo/ray_trainer.py:1130-1272。这说明 controller 不做 actor forward/backward,也不做 critic forward/backward。controller 的任务是准备 batch、选择阶段、调用对应的 WorkerGroup、合并返回结果。

@register:普通方法调用背后的分布式协议

桥梁的另一端是 worker 方法上的 @registerActorRolloutRefWorker中有三个关键方法:

@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="ref"))
def compute_ref_log_prob(...)
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
def compute_log_prob(...)
@register(dispatch_mode=make_nd_compute_dataproto_dispatch_fn(mesh_name="actor"))
def update_actor(...)

对应 verl/workers/engine_workers.py:631-650。下面这张图请关注三层关系:worker 源码上有装饰器,装饰器将 dispatch/execute/blocking 元信息挂在方法上,controller 侧看到的只是一个普通方法调用。

\

图后的关键源码为:register()verl/single_controller/base/decorator.py:398-444,它为方法挂上 dispatch_modeexecute_modeblockingmake_nd_compute_dataproto_dispatch_fn(mesh_name)verl/single_controller/base/decorator.py:300-304,它返回 mesh-specific dispatch/collect 函数;func_generator()verl/single_controller/ray/base.py:48-66,它实际执行 dispatch、Ray remote execute、collect 和可选 unpad。

因此 controller 写:self.actor_rollout_wg.compute_log_prob(batch_td),背后发生的是:dispatch_fn-> execute_fn(method_name, ...)-> optional ray.get-> collect_fn-> unpad if needed。下一篇讲 single_controller,就是要拆解这条链路。

DataProto:高层 dataflow 中流动的是训练证据

DataProto是 controller 侧 dataflow 的标准容器。其定义在 verl/protocol.py:318-328,由三部分组成:

batch: TensorDict
non_tensor_batch: dict
meta_info: dict

下面这张图需与 fit()中的 batch.union(...)一起理解。DataProto 不仅存放 tensor;它还需让 uid、reward extra info、temperature、global step、metrics 等上下文与 tensor 字段保持在同一 batch 轴。

\

在第二篇中,需要记住 DataProto 的五个操作:from_single_dict()将 dataloader 输出转换为 DataProto,见 verl/protocol.py:480-493repeat()支持 rollout.n多采样,见 verl/protocol.py:971-1100union()将每个阶段的新字段合并回 batch,见 verl/protocol.py:781-798chunk()/ concat()支持切分和合并,见 verl/protocol.py:864-961to_tensordict()支持进入 worker 前变换视图,见 verl/protocol.py:1102-1126。这就是为什么我们将 DataProto 称为“训练证据”的载体。rollout 后有 response;reward 后有 score;logprob/value 后有 policy/value 信号;advantage 后才有 actor/critic update 所需的字段。

取舍:可读算法流换来中心化数据往返

到这里可以回到设计取舍。HybridFlow 的收益是:算法顺序集中在 fit(),计算后端被 worker 抽象隔离,角色和资源可以通过 mapping 重放置。docs/hybrid_flow.rst:203-206也将“更换 computation backend”和“更改 WorkerGroup/ResourcePool placement”列为此范式的重要 takeaways。但收益并非免费。本地文档 docs/hybrid_flow.rst:75-79已指出,分离 control flow 与 computation flow 会带来额外的数据通信开销。

下面这张图请关注四类成本:字段增长、object columns、序列化、对齐风险。它并非说明 DataProto 设计错误,而是提醒读者:统一协议越有用,它经过 controller 时也可能变得越重。

源码中的表现很具体:rollout 结果回到 controller,再通过 batch.union(gen_batch_output)合并,见 verl/trainer/ppo/ray_trainer.py:1404-1406;reward、old logprob、ref logprob、values 都会将结果合并回 batch,见 verl/trainer/ppo/ray_trainer.py:1426-1494;advantage 在 driver/controller 上计算,见 verl/trainer/ppo/ray_trainer.py:1496-1541;actor/critic update 的 metrics 回到 controller,见 verl/trainer/ppo/ray_trainer.py:1543-1586;actor 更新后还要同步权重给 rollout replicas,见 verl/trainer/ppo/ray_trainer.py:1581-1583。因此本文的判断可以更精确地表述为:HybridFlow 用 controller 上的可读算法流程,换取了更多的跨 worker 数据往返、collect 等待和协议字段管理成本。这不是缺点清单,而是理解 verl 后续文章的入口。第三篇讲 single_controller,就是要看这种“普通方法调用”的成本与收益如何在 dispatch/collect 层落地。

小结:第二篇只需记住这条边界

读完第二篇,应记住三句话:

RayPPOTrainer.fit()写 RLHF/PPO 的高层 dataflow
WorkerGroup   @register将 high-level operator 变成分布式执行
DataProto / TensorDict在 controller 和 worker 之间携带训练证据

回到本系列地图,第一篇解释了为何 AI 后训练不是一个训练脚本;第二篇将系统拆解为 dataflow、controller、worker 和协议边界。下一篇将进入 Single Controller:一次看似普通的 Python 方法调用,如何变成一组 Ray worker 上的 dispatch、execute、collect。

本文源码索引

概念文档:

  • docs/hybrid_flow.rst:45-79:control flow / computation flow 以及 verl 的分离策略。
  • docs/hybrid_flow.rst:160-177:dispatch、collect 与 @register的文档解释。
  • docs/hybrid_flow.rst:180-206:PPO 主循环示意和 takeaways。

入口和装配:

  • verl/trainer/main_ppo.py:48-98run_ppo()初始化 Ray 并启动远程 TaskRunner
  • verl/trainer/main_ppo.py:107-187TaskRunner的 role mapping 与 resource pool mapping。
  • verl/trainer/main_ppo.py:219-311TaskRunner.run()装配 dataset、resource pool、trainer,并调用 init_workers()/ fit()
  • verl/trainer/ppo/ray_trainer.py:688-884init_workers()创建 WorkerGroup、reward loop、LLM server、agent loop 和 checkpoint manager。

PPO 主循环:

  • verl/trainer/ppo/ray_trainer.py:1274-1279fit()docstring 对 PPO dataflow 的说明。
  • verl/trainer/ppo/ray_trainer.py:1330-1406:dataloader batch、uid、rollout、repeat、union。
  • verl/trainer/ppo/ray_trainer.py:1426-1541:reward、old logprob、ref logprob、values、advantage。
  • verl/trainer/ppo/ray_trainer.py:1543-1583:critic update、actor update、weight sync。

controller 到 worker:

  • verl/trainer/ppo/ray_trainer.py:1130-1272_compute_values()_compute_ref_log_prob()_compute_old_log_prob()_update_actor()_update_critic()
  • verl/workers/engine_workers.py:76-81TrainingWorker的定位。
  • verl/workers/engine_workers.py:238-385TrainingWorkertrain_mini_batch()train_batch()infer_batch()
  • verl/workers/engine_workers.py:439-650ActorRolloutRefWorker的 actor/ref/rollout 组合及 high-level methods。

协议和分发:

  • verl/protocol.py:318-328:DataProto 三层结构。
  • verl/protocol.py:480-493from_single_dict()
  • verl/protocol.py:781-798union()
  • verl/protocol.py:864-961chunk()/ concat()
  • verl/protocol.py:971-1100repeat()
  • verl/protocol.py:1102-1126to_tensordict()
  • verl/single_controller/base/decorator.py:300-304:mesh-specific DataProto dispatch/collect 函数生成。
  • verl/single_controller/base/decorator.py:398-444register()为 worker method 挂分布式元信息。
  • verl/single_controller/ray/base.py:48-66func_generator()执行 dispatch、remote、collect。
来源:https://cloud.tencent.com.cn/developer/article/2701605
上一篇单一控制器像大脑一样调度GPU工人集群的原理 下一篇企业如何利用AI招聘全方位提升校招效率
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
内网RPA离线部署从依赖打包到7×24无人值守踩坑与避坑方案
AI教程 · 2026-07-02

内网RPA离线部署从依赖打包到7×24无人值守踩坑与避坑方案

这三年,内网RPA项目接了不下二十个。每次开局都像闯关——断网、缺依赖、多机同步、定时执行、批量分发、源码保护、AI离线化,八个坑一个比一个深。今天把这些实战经验整理出来,希望能帮正在内网搞自动化的兄弟们少踩点雷。 一、内网无网络环境怎么部署RPA流程:先搞清楚什么叫“真离线” 很多工具宣传“支持本

水利工程师用WorkBuddy写洪水报告效率提升3倍
AI教程 · 2026-07-02

水利工程师用WorkBuddy写洪水报告效率提升3倍

WorkBuddy开发者分享季 水利工程师AI提效实战:用WorkBuddy撰写洪水影响评价报告,效率提升3倍 WorkBuddy 效率 人工智能 开发工具 一、我是谁,为什么需要AI 先介绍一下自己——我是一名水利工程师,在湖南长沙的一家小型水利设计公司任职。当前行业环境不太

日志服务数据加工规则洞察仪表盘使用指南
AI教程 · 2026-07-02

日志服务数据加工规则洞察仪表盘使用指南

数据加工诊断仪表盘 想实时掌握日志服务加工功能的运行状态?直接从加工列表页点击那个“规则洞察”按钮,仪表盘就会立刻呈现出来。入口就在那儿,不绕弯子。 跳转后,你可以按作业名称、实例ID或源LogStore来筛选任务状态。比如下边这张图,展示的是当前实例ID(90c9d47714dbb807d47c1

基于RFID的固定资产管理系统技术架构与工程实践
AI教程 · 2026-07-02

基于RFID的固定资产管理系统技术架构与工程实践

固定资产管理难题是众多企事业单位的普遍困扰,资产数量动辄数千件,且广泛分布于不同部门、楼层乃至园区。传统人工盘点方式在工程维度上始终面临三大关键瓶颈:采集效率低下、数据闭环中断、状态同步滞后。使用条码枪逐一扫描标签,识别距离通常不超过30厘米,操作人员需逐个寻找并扫描,盘点效率完全受限于人力。面对5

WorkBuddy实战用AI搭建A股智能盯盘助手省心高效
AI教程 · 2026-07-02

WorkBuddy实战用AI搭建A股智能盯盘助手省心高效

炒股的朋友们想必都深有体会——每天重复盯盘、查行情、分析板块轮动,这一整套流程下来耗费大量精力。手动翻查数据不仅身心俱疲,还很容易错过关键买卖节点。今天我们就来聊聊如何打造一款趁手的盯盘工具,借助AI替你分担这些重复性工作。 背景:盯盘的核心痛点 股民都有同感——每天不只要查询单只股票的实时行情,还