Callback和图状态
2026年4月18日 · 1085 字 · 6 分钟
使用Eino框架的Callback机制实现图的监控,以及图状态在节点间的共享 Callback(回调)是一种无侵入式的监控机制,可以在不修改业务代码的情况下,注入日志、追踪、指标等功能。在图的执行过程中,Callback可以: 图状态(Graph State)是在图执行过程中,所有节点共享的数据结构。它可以用来: 首先定义一个状态结构体: 使用 实现 只关注需要的回调时机: 在调用 这种方式只在本次调用中生效,不会影响其他调用。 除了运行时注入,还可以注册全局Callback,对所有图的执行生效: 全局Callback vs 运行时Callback: 注意:全局Callback会影响性能,建议只在调试时开启,生产环境慎用。 输出: 执行路径:node1 → node2 → node4,路径长度=3 执行路径:node1 → node3,路径长度=2 Callback不修改业务代码,通过 所有节点共享同一个GraphState实例: Pre/Post Handler使用泛型保证类型安全: 支持三种配置方式:什么是Callback
什么是图状态
代码实现
定义图状态
type GraphStat struct {
Length int // 记录执行路径长度
}
创建图时初始化状态
WithGenLocalState在创建图时初始化状态:graph := compose.NewGraph[*TA, *TD](compose.WithGenLocalState(
func(ctx context.Context) *GraphStat {
return &GraphStat{
Length: 0,
}
}))
定义Pre和Post Handler
// Pre Handler: 节点执行前调用
func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error) {
state.Length += 1 // 增加路径长度
return in, nil
}
// Post Handler: 节点执行后调用
func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error) {
log.Printf("grpath length %d", state.Length) // 打印当前路径长度
return out, nil
}
添加节点时绑定Handler
graph.AddLambdaNode("node1", node1,
compose.WithNodeName("node1"), // 指定节点名称
compose.WithStatePreHandler(NodeStatePreHandler[*TA]), // Pre Handler
compose.WithStatePostHandler(NodeStatePostHandler[*TB])) // Post Handler
graph.AddLambdaNode("node2", node2,
compose.WithNodeName("node2"),
compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
compose.WithStatePostHandler(NodeStatePostHandler[*TC]))
graph.AddLambdaNode("node3", node3,
compose.WithNodeName("node3"),
compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
compose.WithStatePostHandler(NodeStatePostHandler[*TD]))
graph.AddLambdaNode("node4", node4,
compose.WithNodeName("node4"),
compose.WithStatePreHandler(NodeStatePreHandler[*TC]),
compose.WithStatePostHandler(NodeStatePostHandler[*TD]))
实现Callback Handler
方式一:完整实现
callbacks.Handler接口的所有方法:type LoggerCallbacks struct{}
func (l *LoggerCallbacks) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[INPUT] name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}
func (l *LoggerCallbacks) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[OUTPUT] name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}
func (l *LoggerCallbacks) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[ERROR] name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
return ctx
}
func (l *LoggerCallbacks) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
return ctx
}
func (l *LoggerCallbacks) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
return ctx
}
方式二:Builder模式
func GetStartAndEndCallback() callbacks.Handler {
return callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[INPUT] name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
return ctx
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
log.Printf("[OUTPUT] name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
return ctx
}).
Build()
}
运行时注入Callback
Invoke时传入Callback:result, err := runnable.Invoke(ctx, input,
compose.WithCallbacks(callbacktool.GetStartAndEndCallback()))
全局Callback
import "github.com/cloudwego/eino/callbacks"
// 方式一:使用完整实现的LoggerCallbacks
callbacks.AppendGlobalHandlers(&callbacktool.LoggerCallbacks{})
// 方式二:使用Builder模式创建的Handler
callbacks.AppendGlobalHandlers(callbacktool.GetStartAndEndCallback())
特性
全局Callback
运行时Callback
作用范围
所有图的执行
仅当前调用
注入时机
程序启动时
运行时
典型用途
日志、监控、追踪
调试特定调用
完整示例
完整代码
package enio_graph
import (
"context"
callbacktool "eino_study/callback_tool"
"fmt"
"log"
"github.com/cloudwego/eino/compose"
)
type GraphStat struct {
Length int
}
func BuildGraphWithCallBack(msg string) {
ctx := context.Background()
// 全局Callback(注释掉,调试时可打开)
// callbacks.AppendGlobalHandlers(&callbacktool.LoggerCallbacks{})
// callbacks.AppendGlobalHandlers(callbacktool.GetStartAndEndCallback())
// 构建节点
node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
return &TB{Data: len(req.Data)}, nil
})
node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
return &TC{Data: float64(req.Data)}, nil
})
node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
return &TD{Data: req.Data > 5}, nil
})
node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
return &TD{input.Data <= 5}, nil
})
// 构建图(带状态)
graph := compose.NewGraph[*TA, *TD](compose.WithGenLocalState(
func(ctx context.Context) *GraphStat {
return &GraphStat{Length: 0}
}))
// 添加节点(带名称、Pre/Post Handler)
graph.AddLambdaNode("node1", node1,
compose.WithNodeName("node1"),
compose.WithStatePreHandler(NodeStatePreHandler[*TA]),
compose.WithStatePostHandler(NodeStatePostHandler[*TB]))
graph.AddLambdaNode("node2", node2,
compose.WithNodeName("node2"),
compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
compose.WithStatePostHandler(NodeStatePostHandler[*TC]))
graph.AddLambdaNode("node3", node3,
compose.WithNodeName("node3"),
compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
compose.WithStatePostHandler(NodeStatePostHandler[*TD]))
graph.AddLambdaNode("node4", node4,
compose.WithNodeName("node4"),
compose.WithStatePreHandler(NodeStatePreHandler[*TC]),
compose.WithStatePostHandler(NodeStatePostHandler[*TD]))
// 添加边
graph.AddEdge(compose.START, "node1")
graph.AddEdge("node2", "node4")
graph.AddEdge("node3", compose.END)
graph.AddEdge("node4", compose.END)
// 添加分支
graph.AddBranch("node1", compose.NewGraphBranch[*TB](
func(ctx context.Context, in *TB) (string, error) {
if in.Data%2 == 0 {
return "node2", nil
} else {
return "node3", nil
}
},
map[string]bool{"node2": true, "node3": true},
))
// 编译
runnable, err := graph.Compile(ctx)
if err != nil {
log.Fatal(err)
}
// 运行(注入Callback)
input := &TA{msg}
result, err := runnable.Invoke(ctx, input,
compose.WithCallbacks(callbacktool.GetStartAndEndCallback()))
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Data)
}
func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error) {
state.Length += 1
return in, nil
}
func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error) {
log.Printf("grpath length %d", state.Length)
return out, nil
}
测试代码
func TestComposeGraphWithCallBack(t *testing.T) {
BuildGraphWithCallBack("hello, how are you")
BuildGraphWithCallBack("hello, how are you!")
}
运行结果
go test -v -run TestComposeGraphWithCallBack ./enio_graph/
=== RUN TestComposeGraphWithCallBack
[INPUT] name: , type: , component: Graph, input: &{hello, how are you}
[INPUT] name: node1, type: , component: Lambda, input: &{hello, how are you}
[OUTPUT] name: node1, type: , component: Lambda, output: &{18}
grpath length 1
[INPUT] name: node2, type: , component: Lambda, input: &{18}
[OUTPUT] name: node2, type: , component: Lambda, output: &{18}
grpath length 2
[INPUT] name: node4, type: , component: Lambda, input: &{18}
[OUTPUT] name: node4, type: , component: Lambda, output: &{false}
grpath length 3
[OUTPUT] name: , type: , component: Graph, output: &{false}
false
[INPUT] name: , type: , component: Graph, input: &{hello, how are you!}
[INPUT] name: node1, type: , component: Lambda, input: &{hello, how are you!}
[OUTPUT] name: node1, type: , component: Lambda, output: &{19}
grpath length 1
[INPUT] name: node3, type: , component: Lambda, input: &{19}
[OUTPUT] name: node3, type: , component: Lambda, output: &{true}
grpath length 2
[OUTPUT] name: , type: , component: Graph, output: &{true}
true
--- PASS: TestComposeGraphWithCallBack (0.00s)
PASS
结果分析
第一次执行(长度18,偶数)
Graph START
↓
node1 (Length=1) → 输入: "hello, how are you" → 输出: 18
↓ (偶数分支)
node2 (Length=2) → 输入: 18 → 输出: 18.0
↓
node4 (Length=3) → 输入: 18.0 → 输出: false
↓
Graph END
第二次执行(长度19,奇数)
Graph START
↓
node1 (Length=1) → 输入: "hello, how are you!" → 输出: 19
↓ (奇数分支)
node3 (Length=2) → 输入: 19 → 输出: true
↓
Graph END
Callback触发时机
┌─────────────────────────────────────────────────────────────┐
│ Callback触发时机 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Invoke() 调用 │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ OnStart │ ← Graph开始执行 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Pre Handler │ ← node1执行前,Length=1 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ OnStart │ ← node1开始 │
│ │ (Lambda) │ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ node1执行 │ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ OnEnd │ ← node1结束 │
│ │ (Lambda) │ │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │Post Handler │ ← node1执行后,打印Length=1 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ... 后续节点类似 ... │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ OnEnd │ ← Graph结束执行 │
│ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
关键特性
1. 无侵入式监控
WithCallbacks在运行时注入:runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
2. 状态共享
// node1执行后: Length=1
// node2执行后: Length=2
// node4执行后: Length=3
3. 类型安全的Handler
func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error)
func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error)
4. 灵活的Callback配置
callbacks.AppendGlobalHandlers(),对所有执行生效compose.WithCallbacks(),仅对本次执行生效cbutils.NewHandlerHelper().ChatModel(...),只监控特定组件// 全局Callback(程序启动时注册)
callbacks.AppendGlobalHandlers(&LoggerCallbacks{})
// 运行时Callback(调用时注入)
runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
// 特定组件Callback(只监控ChatModel)
handler := cbutils.NewHandlerHelper().ChatModel(&cbutils.ModelCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *model.CallbackInput) context.Context {
log.Printf("ChatModel input: %v", input.Messages)
return ctx
},
}).Handler()