自行实现ChainAgent
2026年4月18日 · 679 字 · 4 分钟
使用Eino框架的图编排能力,自行实现一个简单的Chain Agent Chain Agent(链式代理)是一种最简单的Agent模式,它按照固定的顺序执行: Chain Agent的调用次数是固定的(ChatModel → Tool → ChatModel),无法像ReAct Agent那样动态循环。但它的优点是实现简单、执行流程可预测。 图状态用于管理对话历史: 关键方法说明: 输出: ChatModel1: Tool: HistoryNode: ChatModel2: 让模型理解每个工具的功能和使用方式,模型自主决定是否调用工具。 通过图状态管理历史消息: 模型可能同时调用多个工具: 支持流式和非流式两种调用方式: 用户提问 → 调用查询工具 → 返回结果 用户输入 → 调用处理工具 → 返回处理结果 用户指令 → 调用执行工具 → 返回执行结果什么是Chain Agent
Chain的局限性
架构设计
┌─────────────────────────────────────────────────────────────┐
│ Chain Agent 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ [START] │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ ChatModel1 │ 分析用户问题,决定是否调用工具 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ Tool │ 执行工具调用(可能调用多个工具) │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ HistoryNode │ 管理历史消息,构建完整对话上下文 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ ChatModel2 │ 基于工具结果生成最终答案 │
│ └─────────────┘ │
│ │ │
│ ▼ │
│ [END] │
│ │
└─────────────────────────────────────────────────────────────┘
代码实现
定义图状态
type HistoryMessage struct {
History []*schema.Message // 历史消息
SystemMessage string // 系统消息
UserMessage string // 用户消息
}
创建ChatModel和工具
// 创建ChatModel
config := config.NewConfig("../config/config.json")
model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
APIKey: config.APIKey,
Model: config.Model,
BaseURL: config.Url,
})
// 创建工具
timeTool := tools.NewTimeTool() // 获取时间
poemTool := tools.NewSayTool() // 获取古诗
tools := []tool.BaseTool{timeTool, poemTool}
// 绑定工具到ChatModel
toolInfos := make([]*schema.ToolInfo, 0, len(tools))
for _, tool := range tools {
toolInfo, _ := tool.Info(ctx)
toolInfos = append(toolInfos, toolInfo)
}
model.BindTools(toolInfos)
BindTools:模型自主决定是直接回答还是调用工具BindForcedTools:模型必须调用一个或多个工具创建图
初始化图和状态
graph := compose.NewGraph[[]*schema.Message, *schema.Message](
compose.WithGenLocalState(func(ctx context.Context) *HistoryMessage {
return &HistoryMessage{
History: make([]*schema.Message, 0, 4),
UserMessage: "",
SystemMessage: "",
}
}),
)
添加节点
// 历史消息管理节点
historyNode := compose.InvokableLambda(
func(ctx context.Context, input []*schema.Message) ([]*schema.Message, error) {
return // 逻辑通过图状态实现
},
)
graph.AddLambdaNode("history_node", historyNode,
compose.WithStatePostHandler(
func(ctx context.Context, out []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
result := []*schema.Message{
{Role: schema.System, Content: state.SystemMessage},
}
result = append(result, state.History...)
return result, nil
},
))
// 第一个ChatModel节点
graph.AddChatModelNode("chatmodel1", model,
compose.WithStatePreHandler(func(ctx context.Context, input []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
// 提取系统消息和用户消息
for _, msg := range input {
if msg.Role == schema.System {
state.SystemMessage = msg.Content
} else if msg.Role == schema.User {
state.UserMessage = msg.Content
}
}
return input, nil
}),
compose.WithStatePostHandler(func(ctx context.Context, output *schema.Message, state *HistoryMessage) (*schema.Message, error) {
// 将模型输出加入历史
state.History = append(state.History, output)
return output, nil
}))
// 工具节点
toolsNode, _ := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
Tools: tools,
})
graph.AddToolsNode("tool", toolsNode,
compose.WithStatePostHandler(func(ctx context.Context, out []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
// 将工具输出加入历史
state.History = append(state.History, out...)
return out, nil
}))
// 第二个ChatModel节点
graph.AddChatModelNode("chatmodel2", model)
连接节点
graph.AddEdge(compose.START, "chatmodel1")
graph.AddEdge("chatmodel1", "tool")
graph.AddEdge("tool", "history_node")
graph.AddEdge("history_node", "chatmodel2")
graph.AddEdge("chatmodel2", compose.END)
编译和运行
编译图
runnable, err := graph.Compile(ctx)
if err != nil {
log.Fatal(err)
}
非流式调用
input := []*schema.Message{
{Role: schema.User, Content: "我在泰国可以告诉我几点了吗,并且和我说和节日有关的古诗?"},
}
msg, err := runnable.Invoke(ctx, input)
if err != nil {
log.Fatal(err)
}
fmt.Println(msg.Content)
流式调用
sr, err := runnable.Stream(ctx, input)
if err != nil {
log.Fatal(err)
}
defer sr.Close()
for {
msg, err := sr.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Println(err)
break
}
fmt.Print(msg.Content)
}
完整示例
测试代码
func TestChainAgent(t *testing.T) {
BuildChainWithGraph()
}
运行结果
go test -v -run TestChainAgent ./agent/
=== RUN TestChainAgent
【非流式输出】
I'll help you get the current time in Bangkok and a festive Chinese poem! Here's what I found:
🕐 **Current Time in Bangkok (Asia/Bangkok):** 2026-04-18 19:45:12
📜 **Poem of the Moment:**
> "才过清明,渐觉伤春暮。"
> — *蝶恋花·春暮* by 李冠
> *(Category: Festival - Qingming Festival)*
The poem reflects on the feeling after the Qingming Festival (Tomb-Sweeping Day), sensing the lingering melancholy of spring's departure.
【流式输出】
您好!让我为您查询了一些信息:
📅 **当前时间**(曼谷时区)
2026年4月18日 19:45:20
📖 **今日诗词**
> "独写菖蒲竹叶杯,蓬城芳草踏初回。"
— 汤显祖《午日处州禁竞渡》
这首诗写的是端午节的情境,描绘了诗人在端午节饮用菖蒲酒、踏青归来的情景。
--- PASS: TestChainAgent (20.51s)
PASS
执行流程分析
get_time 和 get_quote
get_time,参数:{"time_zone": "Asia/Bangkok"}get_quote,参数:{"category": "jieri"}
关键技术点
1. 工具绑定
model.BindTools(toolInfos)
2. 状态管理
// Pre Handler: 提取消息
compose.WithStatePreHandler(func(ctx context.Context, input []*schema.Message, state *HistoryMessage) {
// 提取系统消息和用户消息
})
// Post Handler: 更新历史
compose.WithStatePostHandler(func(ctx context.Context, output *schema.Message, state *HistoryMessage) {
state.History = append(state.History, output)
})
3. 多工具调用
state.History = append(state.History, out...) // out是[]*schema.Message
4. 流式输出
// 非流式
msg, err := runnable.Invoke(ctx, input)
// 流式
sr, err := runnable.Stream(ctx, input)
Chain vs ReAct
特性
Chain Agent
ReAct Agent
调用次数
固定(2次ChatModel)
动态(可能多次循环)
实现复杂度
简单
复杂
适用场景
简单任务
复杂推理任务
执行流程
可预测
不可预测
性能
更快
可能更慢
应用场景
1. 信息查询
2. 数据处理
3. 简单任务