游乐游手机版
首页/AI教程/文章详情

为Agent管道安装Callback监听系统

时间:2026-07-01 15:25
全面掌握 Callback 系统:从原理到实战 一、为什么需要 Callback 一个 ReAct Agent 在运行时,可能需要依次调用多个节点、多个工具以及一个模型——每次调用耗费了多少 Token?哪个节点延迟最高?工具返回了什么结果?如果每个节点都手动添加日志代码,业务逻辑将与监控逻辑紧紧耦

全面掌握 Callback 系统:从原理到实战

一、为什么需要 Callback

一个 ReAct Agent 在运行时,可能需要依次调用多个节点、多个工具以及一个模型——每次调用耗费了多少 Token?哪个节点延迟最高?工具返回了什么结果?如果每个节点都手动添加日志代码,业务逻辑将与监控逻辑紧紧耦合,一旦更换监控平台(例如从 LangSmith 切换到 CozeLoop 再到 APMPlus),就必须同步修改业务代码,这显然令人头疼。 Callback 的设计理念是将观测逻辑与执行逻辑彻底解耦:框架在固定的时刻触发回调,你只需注册一个 Handler 即可,业务代码完全不需要感知 Handler 的存在。这相当于给 Agent 管道安装了一个“监听器”,它只负责监听,不干扰管道的正常运行。 Callback 系统:给 Agent 管道装上“监听器“

二、接口:五个关键时机

``` // callbacks/interface.go type Handler interface { OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context OnError(ctx context.Context, info *RunInfo, err error) context.Context OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context } ``` | 时机 | 触发点 | 输入参数 | |------|--------|----------| | `OnStart` | 组件开始处理(非流式输入场景) | `CallbackInput`(具体类型由组件定义) | | `OnEnd` | 组件成功返回(非流式输出场景) | `CallbackOutput` | | `OnError` | 组件返回 error | `error` | | `OnStartWithStreamInput` | 组件接收流式输入(Collect / Transform) | `*schema.StreamReader[CallbackInput]` | | `OnEndWithStreamOutput` | 组件产生流式输出(Stream / Transform) | `*schema.StreamReader[CallbackOutput]` | 使用这些接口时,有几个重要约束需要格外留意: - **OnEnd 和 OnError 互斥**:同一次组件调用只会触发其中一个,不会同时触发。 - 流式 Handler 收到的是框架已复制好的独立 StreamReader,处理完毕后必须调用 `Close()`,否则会导致 goroutine 泄漏,这是最容易踩的坑。 - 同一个 Handler 的 context 在 OnStart → OnEnd 之间是连续的,可用于传递状态;但不同 Handler 之间的 context 互不连通,不要指望跨 Handler 共享数据。

三、RunInfo:清楚知道是哪个组件在说话

每次回调都会携带一个 `*RunInfo` 结构体,告知 Handler 当前是哪个组件、哪个节点触发的回调: ``` type RunInfo struct { Name string // 节点名(通过 compose.WithNodeName 设置) Type string // 实现类型,如 "OpenAI"、"DeepSeek" Component components.Component // 组件类别常量 } ``` 三个字段的来源: - **Name**:Graph 中的节点名;独立组件需要手动调用 `InitCallbacks` 进行设置。 - **Type**:组件自己实现 `Typer` 接口返回的字符串;若未实现,则通过反射获取结构体名。 - **Component**:框架定义的常量,如 `ComponentOfChatModel`、`ComponentOfToolsNode`、`"Lambda"` 等。 为什么需要 Component?因为 `CallbackInput` 和 `CallbackOutput` 的类型是 `any`(`interface{}`),你需要先判断是哪种组件,再执行类型断言。例如: ``` handler := callbacks.NewHandlerBuilder(). OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { // 只处理 ChatModel 的回调 modelInput := model.ConvCallbackInput(input) if modelInput == nil { return ctx // 不是 ChatModel,跳过 } log.Printf("[%s] 发送 %d 条消息给模型", info.Name, len(modelInput.Messages)) return ctx }). Build() ```

四、HandlerBuilder:按需注册关心的回调时机

实现全部五个接口方法有时过于繁琐。`HandlerBuilder` 允许你只注册需要的回调方法,其余方法自动跳过,大大简化代码: ``` handler := callbacks.NewHandlerBuilder(). OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { log.Printf("[%s/%s] 开始", info.Component, info.Name) return context.WithValue(ctx, startTimeKey{}, time.Now()) // 存储开始时间 }). OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { start, _ := ctx.Value(startTimeKey{}).(time.Time) log.Printf("[%s/%s] 完成,耗时 %v", info.Component, info.Name, time.Since(start)) return ctx }). OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context { log.Printf("[%s/%s] 出错: %v", info.Component, info.Name, err) return ctx }). Build() ``` 框架还做了一个小优化:`HandlerBuilder` 构建的 Handler 会自动实现 `TimingChecker` 接口,未注册的时机返回 `false`,框架就会跳过该时机的 stream 复制和 goroutine 分配——如果没注册 `OnEndWithStreamOutput`,就不会为其开销,性能上更加干净。

五、注入方式:全局 vs. 单次

全局 Handler(进程级)

``` // 在 main.go 里调用一次,进程生命周期内所有调用都会生效 callbacks.AppendGlobalHandlers(metricsHandler, tracingHandler) ``` 全局 Handler 适用于“监控一切”的场景,例如分布式追踪、Token 计费等。调用时序上,全局 Handler 先于单次 Handler 执行。

单次 Handler(调用级)

``` // 每次 Invoke 时按需注入 runner.Invoke(ctx, input, compose.WithCallbacks(progressHandler)) // 或者编译期绑定(对所有节点生效) graph.Compile(ctx, compose.WithGraphCompileCallbacks(debugHandler)) ``` 单次 Handler 只对当次调用生效,适合调试、用户级进度推送等场景。

六、流式 Handler:切记要关闭

`OnStartWithStreamInput` 和 `OnEndWithStreamOutput` 收到的是框架提前复制好的 StreamReader。如果不关闭,框架无法回收底层 goroutine,直接造成资源泄漏。 错误写法:忘记了 Close,只读取了一个 chunk 就返回,导致 goroutine 泄漏。 ``` OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { chunk, _ := output.Recv() log.Printf("first chunk: %v", chunk) return ctx // 只读了一个,没 Close! }) ``` 正确写法:使用 `defer output.Close()` 兜底,即使中途发生 panic 也不会泄漏。 ``` OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context { go func() { defer output.Close() // 无论如何都要关闭 for { chunk, err := output.Recv() if err != nil { break } // io.EOF 也会在这里返回 // 处理 chunk... } }() return ctx }) ``` LangSmith 的实现正是采用这种模式——在 goroutine 里用 `defer output.Close()` 兜底,值得借鉴。

七、context 传状态:OnStart → OnEnd 的标准模式

同一个 Handler 的 OnStart 返回的 context 会流入它的 OnEnd / OnError。这是跨回调时机传递状态的唯一正确方式。 来看 LangSmith Handler 的完整模式(`langsmith.go:75`): ``` // OnStart:创建 run,把 runID 存入 context func (c *CallbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context { runID := uuid.NewString() c.cli.CreateRun(ctx, &Run{ID: runID, Name: info.Name, StartTime: time.Now()}) return context.WithValue(ctx, langsmithStateKey{}, &LangsmithState{ ParentRunID: runID, // 存起来,OnEnd 用 }) } // OnEnd:从 context 取出 runID,更新 run(关联同一次调用) func (c *CallbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context { state, _ := ctx.Value(langsmithStateKey{}).(*LangsmithState) endTime := time.Now() c.cli.UpdateRun(ctx, state.ParentRunID, &RunPatch{EndTime: &endTime}) return ctx } ``` 注意:每个 Handler 只能看到自己的 ctx 链,不同 Handler 之间的 ctx 不共享——不要依赖其他 Handler 存入的 context value。

八、组件开发者视角:如何在自己的组件里埋点

如果你在实现自定义组件(比如自定义 ChatModel),可以使用 `callbacks/aspect_inject.go` 提供的工具函数: 非流式组件: ``` func (m *MyModel) Generate(ctx context.Context, input []*schema.Message, ...) (*schema.Message, error) { ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input}) resp, err := m.doGenerate(ctx, input) if err != nil { callbacks.OnError(ctx, err) return nil, err } callbacks.OnEnd(ctx, &model.CallbackOutput{Message: resp}) return resp, nil } ``` 流式组件: ``` func (m *MyModel) Stream(ctx context.Context, input []*schema.Message, ...) (*schema.StreamReader[*schema.Message], error) { ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input}) stream, err := m.doStream(ctx, input) if err != nil { callbacks.OnError(ctx, err) return nil, err } // 框架自动复制 stream 给各 Handler ctx, stream = callbacks.OnEndWithStreamOutput(ctx, stream) return stream, nil } ``` 独立运行(没有 Graph 管理 RunInfo)时,需要先初始化: ``` ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{ Type: "MyModel", Component: components.ComponentOfChatModel, Name: "my-model", }, myHandler) ```

九、三个开箱即用的 tracing 集成

`eino-ext/callbacks/` 提供了可直接使用的 Handler: | 平台 | 包路径 | 接入代码 | |------|--------|----------| | LangSmith | `callbacks/langsmith` | `langsmith.NewLangsmithHandler(&Config{APIKey: "..."})` | | Langfuse | `callbacks/langfuse` | `langfuse.NewHandler(&Config{...})` | | APMPlus | `callbacks/apmplus` | `apmplus.NewHandler(&Config{...})` | 接入 LangSmith 只需: ``` handler, _ := langsmith.NewLangsmithHandler(&langsmith.Config{ APIKey: os.Getenv("LANGSMITH_API_KEY"), }) callbacks.AppendGlobalHandlers(handler) ``` 之后所有 Graph 中所有节点的调用都会自动上报,无需修改一行业务代码。

小结

Callback 系统的设计哲学是零侵入性:框架统一在五个关键时机触发回调,Handler 只关心自己的逻辑,业务代码完全不感知 Handler 的存在。`HandlerBuilder` 让你跳过不关心的时机(从而节省 stream 复制开销);`RunInfo.Component` 让一个 Handler 同时处理多种组件类型;context 链使 OnStart 到 OnEnd 之间能够安全传递状态。流式 Handler 必须 Close 是最容易踩到的坑——`defer output.Close()` 是标准写法。 下篇继续。 *代码来源:cloudwego/eino · cloudwego/eino-ext · cloudwego/eino-examples*
来源:https://juejin.cn/post/7656023099708243977
上一篇码农AI翻身第六篇:你好,我叫Parameter 下一篇AI到底是理解还是猜测?
本站内容用于信息整理与展示,如有侵权或内容问题请及时联系处理。

相关推荐

补充同频道和同主题内容,方便继续浏览更多相关内容。

同类最新

继续查看同栏目最近更新的文章。

更多
RAG四标融合企业知识资产体系四库协同GEO优化实践
AI教程 · 2026-07-01

RAG四标融合企业知识资产体系四库协同GEO优化实践

生成式AI正在彻底改写信息检索的底层逻辑。传统SEO依赖关键词堆砌和外链建设的策略,在大模型的内容采信规则下已经基本失效。取而代之的,是生成式引擎优化(GEO)。它不再关注外链数量,而是重点衡量你的知识是否结构化、证据链是否坚实、信源是否可靠——这些维度才是RAG(检索增强生成)架构真正看重的核心指

一个普通上班人分享WorkBuddy使用心得与真实体验
AI教程 · 2026-07-01

一个普通上班人分享WorkBuddy使用心得与真实体验

前言 最近我开始使用WorkBuddy——这是腾讯推出的一款AI办公工作台。差不多用了一周时间,趁印象还新鲜,把真实的使用感受记录下来,给还在犹豫的朋友做个参考。不吹不黑,只说实际体验。 初印象:不只是聊天机器人 之前用过不少AI工具,大多数就是个对话框,你问它答,答完就结束了。WorkBuddy不

AI幻觉变真功能实战教程:App Inventor 2视频录制拓展一周开发实录
AI教程 · 2026-07-01

AI幻觉变真功能实战教程:App Inventor 2视频录制拓展一周开发实录

先讲一个颇具戏剧性的开端。 这件事的开端颇显荒诞——有用户前来咨询,称AI Pro版的介绍中提到我们有一款“视频录制拓展”。团队全体成员都感到困惑,翻遍产品列表,发现根本不存在该组件。AI那种“一本正经胡说八道”的能力,这次确实让我们陷入尴尬。 按常理,此事到此便可结束——一句“抱歉,暂时没有这个拓

别再混淆OLAP和SQL-on-Hadoop两者查询本质不同
AI教程 · 2026-07-01

别再混淆OLAP和SQL-on-Hadoop两者查询本质不同

OLAP和SQL-on-Hadoop虽都使用SQL查询数据,但本质不同。SQL-on-Hadoop负责海量数据批量计算与ETL,查询速度秒级至分钟级;OLAP通过预聚合实现毫秒级多维分析,适合BI报表。两者在数据平台分工协作,前者是后厨加工,后者是前台快速服务。

GEO优化深度解析:AI偏好FAQ还是长文内容?
AI教程 · 2026-07-01

GEO优化深度解析:AI偏好FAQ还是长文内容?

在GEO优化中,AI对内容形式无统一偏好:FAQ在简单查询中引用率41%,长文在复杂查询中达58%。内容应基于用户意图选择形式,FAQ适配简单事实类问题,长文建立主题权威,两者互补而非替代。