Callback和图状态

2026年4月18日 · 1085 字 · 6 分钟

使用Eino框架的Callback机制实现图的监控,以及图状态在节点间的共享

什么是Callback

Callback(回调)是一种无侵入式的监控机制,可以在不修改业务代码的情况下,注入日志、追踪、指标等功能。在图的执行过程中,Callback可以:

  • 记录输入输出:追踪每个节点的输入和输出数据
  • 性能监控:统计节点执行时间
  • 错误追踪:捕获并记录异常信息
  • 调试支持:帮助定位问题所在

什么是图状态

图状态(Graph State)是在图执行过程中,所有节点共享的数据结构。它可以用来:

  • 跨节点传递数据:在节点间共享计算结果
  • 统计执行信息:记录节点执行次数、路径长度等
  • 实现复杂逻辑:支持条件判断和动态行为

代码实现

定义图状态

首先定义一个状态结构体:

type GraphStat struct {
	Length int  // 记录执行路径长度
}

创建图时初始化状态

使用WithGenLocalState在创建图时初始化状态:

graph := compose.NewGraph[*TA, *TD](compose.WithGenLocalState(
	func(ctx context.Context) *GraphStat {
		return &GraphStat{
			Length: 0,
		}
	}))

定义Pre和Post Handler

// Pre Handler: 节点执行前调用
func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error) {
	state.Length += 1  // 增加路径长度
	return in, nil
}

// Post Handler: 节点执行后调用
func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error) {
	log.Printf("grpath length %d", state.Length)  // 打印当前路径长度
	return out, nil
}

添加节点时绑定Handler

graph.AddLambdaNode("node1", node1, 
	compose.WithNodeName("node1"),                              // 指定节点名称
	compose.WithStatePreHandler(NodeStatePreHandler[*TA]),      // Pre Handler
	compose.WithStatePostHandler(NodeStatePostHandler[*TB]))    // Post Handler

graph.AddLambdaNode("node2", node2, 
	compose.WithNodeName("node2"),
	compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
	compose.WithStatePostHandler(NodeStatePostHandler[*TC]))

graph.AddLambdaNode("node3", node3, 
	compose.WithNodeName("node3"),
	compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
	compose.WithStatePostHandler(NodeStatePostHandler[*TD]))

graph.AddLambdaNode("node4", node4, 
	compose.WithNodeName("node4"),
	compose.WithStatePreHandler(NodeStatePreHandler[*TC]),
	compose.WithStatePostHandler(NodeStatePostHandler[*TD]))

实现Callback Handler

方式一:完整实现

实现callbacks.Handler接口的所有方法:

type LoggerCallbacks struct{}

func (l *LoggerCallbacks) OnStart(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
	log.Printf("[INPUT] name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
	return ctx
}

func (l *LoggerCallbacks) OnEnd(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
	log.Printf("[OUTPUT] name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
	return ctx
}

func (l *LoggerCallbacks) OnError(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
	log.Printf("[ERROR] name: %v, type: %v, component: %v, error: %v", info.Name, info.Type, info.Component, err)
	return ctx
}

func (l *LoggerCallbacks) OnStartWithStreamInput(ctx context.Context, info *callbacks.RunInfo, input *schema.StreamReader[callbacks.CallbackInput]) context.Context {
	return ctx
}

func (l *LoggerCallbacks) OnEndWithStreamOutput(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
	return ctx
}

方式二:Builder模式

只关注需要的回调时机:

func GetStartAndEndCallback() callbacks.Handler {
	return callbacks.NewHandlerBuilder().
		OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
			log.Printf("[INPUT] name: %v, type: %v, component: %v, input: %v", info.Name, info.Type, info.Component, input)
			return ctx
		}).
		OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
			log.Printf("[OUTPUT] name: %v, type: %v, component: %v, output: %v", info.Name, info.Type, info.Component, output)
			return ctx
		}).
		Build()
}

运行时注入Callback

在调用Invoke时传入Callback:

result, err := runnable.Invoke(ctx, input, 
	compose.WithCallbacks(callbacktool.GetStartAndEndCallback()))

这种方式只在本次调用中生效,不会影响其他调用。

全局Callback

除了运行时注入,还可以注册全局Callback,对所有图的执行生效:

import "github.com/cloudwego/eino/callbacks"

// 方式一:使用完整实现的LoggerCallbacks
callbacks.AppendGlobalHandlers(&callbacktool.LoggerCallbacks{})

// 方式二:使用Builder模式创建的Handler
callbacks.AppendGlobalHandlers(callbacktool.GetStartAndEndCallback())

全局Callback vs 运行时Callback:

特性 全局Callback 运行时Callback
作用范围 所有图的执行 仅当前调用
注入时机 程序启动时 运行时
典型用途 日志、监控、追踪 调试特定调用

注意:全局Callback会影响性能,建议只在调试时开启,生产环境慎用。

完整示例

完整代码

package enio_graph

import (
	"context"
	callbacktool "eino_study/callback_tool"
	"fmt"
	"log"

	"github.com/cloudwego/eino/compose"
)

type GraphStat struct {
	Length int
}

func BuildGraphWithCallBack(msg string) {
	ctx := context.Background()

	// 全局Callback(注释掉,调试时可打开)
	// callbacks.AppendGlobalHandlers(&callbacktool.LoggerCallbacks{})
	// callbacks.AppendGlobalHandlers(callbacktool.GetStartAndEndCallback())

	// 构建节点
	node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
		return &TB{Data: len(req.Data)}, nil
	})
	node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
		return &TC{Data: float64(req.Data)}, nil
	})
	node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
		return &TD{Data: req.Data > 5}, nil
	})
	node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
		return &TD{input.Data <= 5}, nil
	})

	// 构建图(带状态)
	graph := compose.NewGraph[*TA, *TD](compose.WithGenLocalState(
		func(ctx context.Context) *GraphStat {
			return &GraphStat{Length: 0}
		}))

	// 添加节点(带名称、Pre/Post Handler)
	graph.AddLambdaNode("node1", node1, 
		compose.WithNodeName("node1"),
		compose.WithStatePreHandler(NodeStatePreHandler[*TA]),
		compose.WithStatePostHandler(NodeStatePostHandler[*TB]))
	graph.AddLambdaNode("node2", node2, 
		compose.WithNodeName("node2"),
		compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
		compose.WithStatePostHandler(NodeStatePostHandler[*TC]))
	graph.AddLambdaNode("node3", node3, 
		compose.WithNodeName("node3"),
		compose.WithStatePreHandler(NodeStatePreHandler[*TB]),
		compose.WithStatePostHandler(NodeStatePostHandler[*TD]))
	graph.AddLambdaNode("node4", node4, 
		compose.WithNodeName("node4"),
		compose.WithStatePreHandler(NodeStatePreHandler[*TC]),
		compose.WithStatePostHandler(NodeStatePostHandler[*TD]))

	// 添加边
	graph.AddEdge(compose.START, "node1")
	graph.AddEdge("node2", "node4")
	graph.AddEdge("node3", compose.END)
	graph.AddEdge("node4", compose.END)

	// 添加分支
	graph.AddBranch("node1", compose.NewGraphBranch[*TB](
		func(ctx context.Context, in *TB) (string, error) {
			if in.Data%2 == 0 {
				return "node2", nil
			} else {
				return "node3", nil
			}
		},
		map[string]bool{"node2": true, "node3": true},
	))

	// 编译
	runnable, err := graph.Compile(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// 运行(注入Callback)
	input := &TA{msg}
	result, err := runnable.Invoke(ctx, input, 
		compose.WithCallbacks(callbacktool.GetStartAndEndCallback()))
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(result.Data)
}

func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error) {
	state.Length += 1
	return in, nil
}

func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error) {
	log.Printf("grpath length %d", state.Length)
	return out, nil
}

测试代码

func TestComposeGraphWithCallBack(t *testing.T) {
	BuildGraphWithCallBack("hello, how are you")
	BuildGraphWithCallBack("hello, how are you!")
}

运行结果

go test -v -run TestComposeGraphWithCallBack ./enio_graph/

输出:

=== RUN   TestComposeGraphWithCallBack
[INPUT] name: , type: , component: Graph, input: &{hello, how are you}
[INPUT] name: node1, type: , component: Lambda, input: &{hello, how are you}
[OUTPUT] name: node1, type: , component: Lambda, output: &{18}
grpath length 1
[INPUT] name: node2, type: , component: Lambda, input: &{18}
[OUTPUT] name: node2, type: , component: Lambda, output: &{18}
grpath length 2
[INPUT] name: node4, type: , component: Lambda, input: &{18}
[OUTPUT] name: node4, type: , component: Lambda, output: &{false}
grpath length 3
[OUTPUT] name: , type: , component: Graph, output: &{false}
false
[INPUT] name: , type: , component: Graph, input: &{hello, how are you!}
[INPUT] name: node1, type: , component: Lambda, input: &{hello, how are you!}
[OUTPUT] name: node1, type: , component: Lambda, output: &{19}
grpath length 1
[INPUT] name: node3, type: , component: Lambda, input: &{19}
[OUTPUT] name: node3, type: , component: Lambda, output: &{true}
grpath length 2
[OUTPUT] name: , type: , component: Graph, output: &{true}
true
--- PASS: TestComposeGraphWithCallBack (0.00s)
PASS

结果分析

第一次执行(长度18,偶数)

Graph START
    ↓
node1 (Length=1) → 输入: "hello, how are you" → 输出: 18
    ↓ (偶数分支)
node2 (Length=2) → 输入: 18 → 输出: 18.0
    ↓
node4 (Length=3) → 输入: 18.0 → 输出: false
    ↓
Graph END

执行路径:node1 → node2 → node4,路径长度=3

第二次执行(长度19,奇数)

Graph START
    ↓
node1 (Length=1) → 输入: "hello, how are you!" → 输出: 19
    ↓ (奇数分支)
node3 (Length=2) → 输入: 19 → 输出: true
    ↓
Graph END

执行路径:node1 → node3,路径长度=2

Callback触发时机

┌─────────────────────────────────────────────────────────────┐
│                    Callback触发时机                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Invoke() 调用                                             │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │  OnStart    │  ← Graph开始执行                          │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │ Pre Handler │  ← node1执行前,Length=1                  │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │  OnStart    │  ← node1开始                              │
│   │   (Lambda)  │                                           │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │ node1执行   │                                           │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │  OnEnd      │  ← node1结束                              │
│   │   (Lambda)  │                                           │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │Post Handler │  ← node1执行后,打印Length=1              │
│   └─────────────┘                                           │
│       │                                                     │
│       ▼                                                     │
│   ... 后续节点类似 ...                                       │
│       │                                                     │
│       ▼                                                     │
│   ┌─────────────┐                                           │
│   │  OnEnd      │  ← Graph结束执行                          │
│   └─────────────┘                                           │
│                                                             │
└─────────────────────────────────────────────────────────────┘

关键特性

1. 无侵入式监控

Callback不修改业务代码,通过WithCallbacks在运行时注入:

runnable.Invoke(ctx, input, compose.WithCallbacks(handler))

2. 状态共享

所有节点共享同一个GraphState实例:

// node1执行后: Length=1
// node2执行后: Length=2
// node4执行后: Length=3

3. 类型安全的Handler

Pre/Post Handler使用泛型保证类型安全:

func NodeStatePreHandler[I any](ctx context.Context, in I, state *GraphStat) (I, error)
func NodeStatePostHandler[O any](ctx context.Context, out O, state *GraphStat) (O, error)

4. 灵活的Callback配置

支持三种配置方式:

  • 全局Callbackcallbacks.AppendGlobalHandlers(),对所有执行生效
  • 运行时Callbackcompose.WithCallbacks(),仅对本次执行生效
  • 特定组件Callbackcbutils.NewHandlerHelper().ChatModel(...),只监控特定组件
// 全局Callback(程序启动时注册)
callbacks.AppendGlobalHandlers(&LoggerCallbacks{})

// 运行时Callback(调用时注入)
runnable.Invoke(ctx, input, compose.WithCallbacks(handler))

// 特定组件Callback(只监控ChatModel)
handler := cbutils.NewHandlerHelper().ChatModel(&cbutils.ModelCallbackHandler{
	OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *model.CallbackInput) context.Context {
		log.Printf("ChatModel input: %v", input.Messages)
		return ctx
	},
}).Handler()