为Agent管道安装Callback监听系统
时间:2026-07-01 15:25
全面掌握 Callback 系统:从原理到实战 一、为什么需要 Callback 一个 ReAct Agent 在运行时,可能需要依次调用多个节点、多个工具以及一个模型——每次调用耗费了多少 Token?哪个节点延迟最高?工具返回了什么结果?如果每个节点都手动添加日志代码,业务逻辑将与监控逻辑紧紧耦
全面掌握 Callback 系统:从原理到实战
一、为什么需要 Callback
一个 ReAct Agent 在运行时,可能需要依次调用多个节点、多个工具以及一个模型——每次调用耗费了多少 Token?哪个节点延迟最高?工具返回了什么结果?如果每个节点都手动添加日志代码,业务逻辑将与监控逻辑紧紧耦合,一旦更换监控平台(例如从 LangSmith 切换到 CozeLoop 再到 APMPlus),就必须同步修改业务代码,这显然令人头疼。
Callback 的设计理念是将观测逻辑与执行逻辑彻底解耦:框架在固定的时刻触发回调,你只需注册一个 Handler 即可,业务代码完全不需要感知 Handler 的存在。这相当于给 Agent 管道安装了一个“监听器”,它只负责监听,不干扰管道的正常运行。
二、接口:五个关键时机
```
// callbacks/interface.go
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
```
| 时机 | 触发点 | 输入参数 |
|------|--------|----------|
| `OnStart` | 组件开始处理(非流式输入场景) | `CallbackInput`(具体类型由组件定义) |
| `OnEnd` | 组件成功返回(非流式输出场景) | `CallbackOutput` |
| `OnError` | 组件返回 error | `error` |
| `OnStartWithStreamInput` | 组件接收流式输入(Collect / Transform) | `*schema.StreamReader[CallbackInput]` |
| `OnEndWithStreamOutput` | 组件产生流式输出(Stream / Transform) | `*schema.StreamReader[CallbackOutput]` |
使用这些接口时,有几个重要约束需要格外留意:
- **OnEnd 和 OnError 互斥**:同一次组件调用只会触发其中一个,不会同时触发。
- 流式 Handler 收到的是框架已复制好的独立 StreamReader,处理完毕后必须调用 `Close()`,否则会导致 goroutine 泄漏,这是最容易踩的坑。
- 同一个 Handler 的 context 在 OnStart → OnEnd 之间是连续的,可用于传递状态;但不同 Handler 之间的 context 互不连通,不要指望跨 Handler 共享数据。
三、RunInfo:清楚知道是哪个组件在说话
每次回调都会携带一个 `*RunInfo` 结构体,告知 Handler 当前是哪个组件、哪个节点触发的回调:
```
type RunInfo struct {
Name string // 节点名(通过 compose.WithNodeName 设置)
Type string // 实现类型,如 "OpenAI"、"DeepSeek"
Component components.Component // 组件类别常量
}
```
三个字段的来源:
- **Name**:Graph 中的节点名;独立组件需要手动调用 `InitCallbacks` 进行设置。
- **Type**:组件自己实现 `Typer` 接口返回的字符串;若未实现,则通过反射获取结构体名。
- **Component**:框架定义的常量,如 `ComponentOfChatModel`、`ComponentOfToolsNode`、`"Lambda"` 等。
为什么需要 Component?因为 `CallbackInput` 和 `CallbackOutput` 的类型是 `any`(`interface{}`),你需要先判断是哪种组件,再执行类型断言。例如:
```
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 只处理 ChatModel 的回调
modelInput := model.ConvCallbackInput(input)
if modelInput == nil {
return ctx // 不是 ChatModel,跳过
}
log.Printf("[%s] 发送 %d 条消息给模型", info.Name, len(modelInput.Messages))
return ctx
}).
Build()
```
四、HandlerBuilder:按需注册关心的回调时机
实现全部五个接口方法有时过于繁琐。`HandlerBuilder` 允许你只注册需要的回调方法,其余方法自动跳过,大大简化代码:
```
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s/%s] 开始", info.Component, info.Name)
return context.WithValue(ctx, startTimeKey{}, time.Now()) // 存储开始时间
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
start, _ := ctx.Value(startTimeKey{}).(time.Time)
log.Printf("[%s/%s] 完成,耗时 %v", info.Component, info.Name, time.Since(start))
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[%s/%s] 出错: %v", info.Component, info.Name, err)
return ctx
}).
Build()
```
框架还做了一个小优化:`HandlerBuilder` 构建的 Handler 会自动实现 `TimingChecker` 接口,未注册的时机返回 `false`,框架就会跳过该时机的 stream 复制和 goroutine 分配——如果没注册 `OnEndWithStreamOutput`,就不会为其开销,性能上更加干净。
五、注入方式:全局 vs. 单次
全局 Handler(进程级)
```
// 在 main.go 里调用一次,进程生命周期内所有调用都会生效
callbacks.AppendGlobalHandlers(metricsHandler, tracingHandler)
```
全局 Handler 适用于“监控一切”的场景,例如分布式追踪、Token 计费等。调用时序上,全局 Handler 先于单次 Handler 执行。
单次 Handler(调用级)
```
// 每次 Invoke 时按需注入
runner.Invoke(ctx, input, compose.WithCallbacks(progressHandler))
// 或者编译期绑定(对所有节点生效)
graph.Compile(ctx, compose.WithGraphCompileCallbacks(debugHandler))
```
单次 Handler 只对当次调用生效,适合调试、用户级进度推送等场景。
六、流式 Handler:切记要关闭
`OnStartWithStreamInput` 和 `OnEndWithStreamOutput` 收到的是框架提前复制好的 StreamReader。如果不关闭,框架无法回收底层 goroutine,直接造成资源泄漏。
错误写法:忘记了 Close,只读取了一个 chunk 就返回,导致 goroutine 泄漏。
```
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
chunk, _ := output.Recv()
log.Printf("first chunk: %v", chunk)
return ctx // 只读了一个,没 Close!
})
```
正确写法:使用 `defer output.Close()` 兜底,即使中途发生 panic 也不会泄漏。
```
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
go func() {
defer output.Close() // 无论如何都要关闭
for {
chunk, err := output.Recv()
if err != nil { break } // io.EOF 也会在这里返回
// 处理 chunk...
}
}()
return ctx
})
```
LangSmith 的实现正是采用这种模式——在 goroutine 里用 `defer output.Close()` 兜底,值得借鉴。
七、context 传状态:OnStart → OnEnd 的标准模式
同一个 Handler 的 OnStart 返回的 context 会流入它的 OnEnd / OnError。这是跨回调时机传递状态的唯一正确方式。
来看 LangSmith Handler 的完整模式(`langsmith.go:75`):
```
// OnStart:创建 run,把 runID 存入 context
func (c *CallbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo,
input callbacks.CallbackInput) context.Context {
runID := uuid.NewString()
c.cli.CreateRun(ctx, &Run{ID: runID, Name: info.Name, StartTime: time.Now()})
return context.WithValue(ctx, langsmithStateKey{}, &LangsmithState{
ParentRunID: runID, // 存起来,OnEnd 用
})
}
// OnEnd:从 context 取出 runID,更新 run(关联同一次调用)
func (c *CallbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo,
output callbacks.CallbackOutput) context.Context {
state, _ := ctx.Value(langsmithStateKey{}).(*LangsmithState)
endTime := time.Now()
c.cli.UpdateRun(ctx, state.ParentRunID, &RunPatch{EndTime: &endTime})
return ctx
}
```
注意:每个 Handler 只能看到自己的 ctx 链,不同 Handler 之间的 ctx 不共享——不要依赖其他 Handler 存入的 context value。
八、组件开发者视角:如何在自己的组件里埋点
如果你在实现自定义组件(比如自定义 ChatModel),可以使用 `callbacks/aspect_inject.go` 提供的工具函数:
非流式组件:
```
func (m *MyModel) Generate(ctx context.Context, input []*schema.Message, ...) (*schema.Message, error) {
ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
resp, err := m.doGenerate(ctx, input)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
callbacks.OnEnd(ctx, &model.CallbackOutput{Message: resp})
return resp, nil
}
```
流式组件:
```
func (m *MyModel) Stream(ctx context.Context, input []*schema.Message, ...) (*schema.StreamReader[*schema.Message], error) {
ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
stream, err := m.doStream(ctx, input)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
// 框架自动复制 stream 给各 Handler
ctx, stream = callbacks.OnEndWithStreamOutput(ctx, stream)
return stream, nil
}
```
独立运行(没有 Graph 管理 RunInfo)时,需要先初始化:
```
ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{
Type: "MyModel",
Component: components.ComponentOfChatModel,
Name: "my-model",
}, myHandler)
```
九、三个开箱即用的 tracing 集成
`eino-ext/callbacks/` 提供了可直接使用的 Handler:
| 平台 | 包路径 | 接入代码 |
|------|--------|----------|
| LangSmith | `callbacks/langsmith` | `langsmith.NewLangsmithHandler(&Config{APIKey: "..."})` |
| Langfuse | `callbacks/langfuse` | `langfuse.NewHandler(&Config{...})` |
| APMPlus | `callbacks/apmplus` | `apmplus.NewHandler(&Config{...})` |
接入 LangSmith 只需:
```
handler, _ := langsmith.NewLangsmithHandler(&langsmith.Config{
APIKey: os.Getenv("LANGSMITH_API_KEY"),
})
callbacks.AppendGlobalHandlers(handler)
```
之后所有 Graph 中所有节点的调用都会自动上报,无需修改一行业务代码。
小结
Callback 系统的设计哲学是零侵入性:框架统一在五个关键时机触发回调,Handler 只关心自己的逻辑,业务代码完全不感知 Handler 的存在。`HandlerBuilder` 让你跳过不关心的时机(从而节省 stream 复制开销);`RunInfo.Component` 让一个 Handler 同时处理多种组件类型;context 链使 OnStart 到 OnEnd 之间能够安全传递状态。流式 Handler 必须 Close 是最容易踩到的坑——`defer output.Close()` 是标准写法。
下篇继续。
*代码来源:cloudwego/eino · cloudwego/eino-ext · cloudwego/eino-examples*