⏱️ 把串行 Agent Pipeline 改成 Temporal 工作流之后,快了 3 倍
背景
先来看一个跨境电商运营助手的真实案例。整个流程是这样运作的:

加载商品 → 翻译 → 搜竞品 → 分析评论 → 定价建议 → 生成总结
每个步骤依次串行执行,前一个完成才能开始下一个。跑完一次完整分析大约需要30到45秒。
听起来似乎还行?但如果上线后遇到100个商品排队等着分析……那速度就有点跟不上了。
当时的代码实现大致长这样:
def orchestrate(product_id: str) -> CrossBorderReport:
product = load_product(product_id)
translated = translate_product(product) # ~3s
competitor_report = analyze_competitors(...) # ~5s(Ta vily 网络请求)
review_analysis = analyze_reviews(product_id) # ~3s
pricing = get_pricing_recommendation(...) # ~3s
summary = generate_summary(...) # ~3s
return report
翻译耗时3秒,搜索竞品5秒,这些步骤之间完全没有数据依赖,却不得不排队等待。看起来有些浪费资源。
核心问题
绘制一下分析依赖图就能看清问题所在:
┌──→ 翻译 ──┐
│ │
加载商品 ────┼──→ 搜竞品 ──┼──→ 竞品分析 ──┐
│ │ │
└──→ 分析评论 ──┘ ├──→ 定价建议 ──→ 总结
│
┌──────────────────────────────┘
翻译和搜竞品之间没有箭头连接——它们完全可以并行执行。分析评论也只是依赖商品数据。真正需要串行的只有:定价建议(依赖翻译结果、竞品分析、评论分析)和生成总结(依赖所有上游步骤)。
原方案被强行写成串行,纯粹是代码结构限制导致的。
为什么选 Temporal
第一时间想到了Python自带的 asyncio.gather(),但仅有并行还不够。我们来对比一下:
| 需求 | asyncio.gather | Temporal |
|---|---|---|
| 并行执行 | ✅ | ✅ |
| 自动重试 | ❌ 需自行编写 | ✅ 内置支持 |
| 超时控制 | ❌ 需自行编写 | ✅ 每个步骤独立设置 |
| 持久化 | ❌ 崩溃后需从头再来 | ✅ 从断点恢复 |
| 可观测性 | ❌ 全靠 print 输出 | ✅ Server Web UI 提供 |
| 分布式 | ❌ 仅限单机 | ✅ 可跨机器部署 |
关键在于:Ta vily API偶尔会超时(卡住5到10秒),asyncio.gather只能被动等待它超时;而Temporal可以为每个Activity单独设置 start_to_close_timeout=30s,超时后自动重试,重试时采用指数退避策略。
改造过程
Step 1:定义 Activity
每个Activity封装一个业务步骤,并加上超时控制:
@activity.defn
async def load_product(product_id: str) -> dict:
"""从 product.json 加载商品信息"""
...
@activity.defn
async def translate_product(product_dict: dict) -> dict:
"""翻译商品信息"""
...
@activity.defn
async def search_competitors(product_dict: dict) -> dict:
"""搜索竞品"""
...
每个Activity独立、无状态、可重试。
Step 2:定义 Workflow 编排
@workflow.defn
class CrossBorderWorkflow:
@workflow.run
async def run(self, product_id: str) -> dict:
# Step 1: 加载商品(超时 10 秒)
product = await workflow.execute_activity(
load_product, args=[product_id],
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2 & 3: 翻译 + 搜索竞品(并行!)
translated, competitors = await asyncio.gather(
workflow.execute_activity(
translate_product, args=[product],
start_to_close_timeout=timedelta(seconds=60),
),
workflow.execute_activity(
search_competitors, args=[product],
start_to_close_timeout=timedelta(seconds=30),
),
)
# Step 4 & 5: 竞品分析 + 评论分析(并行!)
comp_report, review = await asyncio.gather(
workflow.execute_activity(
analyze_competitors, args=[product, competitors],
start_to_close_timeout=timedelta(seconds=60),
),
workflow.execute_activity(
load_and_analyze_reviews, args=[product_id],
start_to_close_timeout=timedelta(seconds=60),
),
)
# Step 6: 定价(必须等上面全部完成)
pricing = await workflow.execute_activity(
get_pricing_recommendation,
args=[product, translated, comp_report, review],
start_to_close_timeout=timedelta(seconds=60),
)
# Step 7: 生成总结
summary = await workflow.execute_activity(
generate_summary,
args=[product, translated, comp_report, review, pricing],
start_to_close_timeout=timedelta(seconds=30),
)
return {"summary": summary, "pricing": pricing, ...}
这段代码的精妙之处在于:使用asyncio.gather将互不依赖的Activity同时提交给Temporal Server,Server会调度到Worker并行执行。
Step 3:编写 Worker
一个常驻进程,持续监听任务队列:
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="cross-border-queue",
activities=[load_product, translate_product, ...],
workflows=[CrossBorderWorkflow],
)
await worker.run()
Step 4:测试
Temporal提供了WorkflowEnvironment.start_local(),无需启动Docker,直接内嵌运行:
async with await WorkflowEnvironment.start_local() as env:
async with Worker(env.client, task_queue="queue", ...):
result = await env.client.execute_workflow(
CrossBorderWorkflow.run, "P001",
id="test-001", task_queue="queue",
)
print(result["summary"])
效果对比
| 方案 | 耗时 | 是否支持重试 | 是否支持超时 | 故障恢复 |
|---|---|---|---|---|
| 同步串行 | ~17 秒 | ❌ | ❌ | ❌ 需从头再来 |
| Temporal 并行 | ~8 秒 | ✅ 指数退避 | ✅ 每步独立 | ✅ 断点恢复 |
具体到每个步骤的时间线对比如下:
同步串行:
0s─ 加载商品 ─ 翻译 ─ 搜竞品 ─ 分析竞品 ─ 分析评论 ─ 定价 ─ 总结 → 17s
Temporal 并行:
0s─ 加载商品 ─┬─ 翻译 ─┬─ 分析竞品 ─┬─ 定价 ┬─ 总结 → 8s
├─ 搜竞品 ┤ │ │
└─ 分析评论 ───────────┘ │
│
并行节省了大约50%的时间。如果Ta vily网络请求卡住超过30秒,Activity会自动超时重试,不会让整个流程卡死。
遇到的坑
1. Temporal SDK 版本 API 变化
execute_activity的参数必须用 args=[...] 关键字传递:
# ✗ 不能用这种方式
await workflow.execute_activity(func, arg1, arg2, timeout=...)
# ✓ 必须用 args
await workflow.execute_activity(func, args=[arg1, arg2], timeout=...)
一开始翻了15分钟的错误堆栈才发现这个细节。
2. Activity 的 return 必须可序列化
Temporal依赖gRPC通信,Activity返回值会被序列化为JSON。因此不能直接返回Pydantic模型,需要先调用.model_dump()转换为普通字典。
@activity.defn
async def translate_product(product_dict: dict) -> dict:
product = Product(**product_dict)
translated = _translate(product)
# 返回 TranslatedProduct 对象
return translated.model_dump() # 转为 dict,Temporal 才能传输
3. Workflow 里的 import
Temporal的Workflow代码会被重放(replay),常规import在重放时可能报错。需要包裹一层:
with workflow.unsafe.imports_passed_through():
from temporal_worker.activities import load_product, ...
什么时候值得用 Temporal
适合的场景:
- 流程超过3个步骤,且步骤间有明确的依赖关系
- 步骤涉及外部API(Ta vily、LLM调用),容易超时或出错
- 需要持久化——进程重启后从断点恢复,不丢失状态
- 需要监控——Temporal Web UI可以查看每个Workflow的执行状态
不适合的场景:
- 简单的2-3步串行——
asyncio.gather就足够了 - 同步CLI脚本——不需要持久化
- 不需要分布式——单机场景下略显厚重
项目复盘
这次改造带来了几点启发:
- 画依赖图比画流程图更重要——先搞清楚哪些步骤有数据依赖、哪些可以并行,再选择技术方案。
- 不要过早优化——串行能跑就先串行,跑通之后再考虑并行。如果项目里只有3个商品,串行的45秒完全可以接受。
- Temporal的学习曲线主要体现在SDK API上——概念本身很直观(Workflow编排、Activity执行),但
unsafe.imports_passed_through、args=[...]这些细节需要踩坑才能记住。 - 生产环境还需要进一步完善——目前只是跑通了流程,如果要上线,还需要:Activity重试策略调优、Workflow超时全局兜底、心跳检测(针对长时间运行的Activity)。
