自行实现ChainAgent

2026年4月18日 · 679 字 · 4 分钟

使用Eino框架的图编排能力,自行实现一个简单的Chain Agent

什么是Chain Agent

Chain Agent(链式代理)是一种最简单的Agent模式,它按照固定的顺序执行:

  1. ChatModel:调用大语言模型,模型决定是直接回答还是调用工具
  2. Tool:如果模型决定调用工具,执行工具调用
  3. ChatModel:再次调用模型,基于工具结果生成最终答案

Chain的局限性

Chain Agent的调用次数是固定的(ChatModel → Tool → ChatModel),无法像ReAct Agent那样动态循环。但它的优点是实现简单、执行流程可预测。

架构设计

┌─────────────────────────────────────────────────────────────┐
│                    Chain Agent 架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   [START]                                                   │
│      │                                                      │
│      ▼                                                      │
│   ┌─────────────┐                                           │
│   │ ChatModel1  │  分析用户问题,决定是否调用工具            │
│   └─────────────┘                                           │
│      │                                                      │
│      ▼                                                      │
│   ┌─────────────┐                                           │
│   │    Tool     │  执行工具调用(可能调用多个工具)          │
│   └─────────────┘                                           │
│      │                                                      │
│      ▼                                                      │
│   ┌─────────────┐                                           │
│   │ HistoryNode │  管理历史消息,构建完整对话上下文          │
│   └─────────────┘                                           │
│      │                                                      │
│      ▼                                                      │
│   ┌─────────────┐                                           │
│   │ ChatModel2  │  基于工具结果生成最终答案                  │
│   └─────────────┘                                           │
│      │                                                      │
│      ▼                                                      │
│   [END]                                                     │
│                                                             │
└─────────────────────────────────────────────────────────────┘

代码实现

定义图状态

图状态用于管理对话历史:

type HistoryMessage struct {
	History       []*schema.Message  // 历史消息
	SystemMessage string              // 系统消息
	UserMessage   string              // 用户消息
}

创建ChatModel和工具

// 创建ChatModel
config := config.NewConfig("../config/config.json")
model, _ := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	APIKey:  config.APIKey,
	Model:   config.Model,
	BaseURL: config.Url,
})

// 创建工具
timeTool := tools.NewTimeTool()   // 获取时间
poemTool := tools.NewSayTool()    // 获取古诗
tools := []tool.BaseTool{timeTool, poemTool}

// 绑定工具到ChatModel
toolInfos := make([]*schema.ToolInfo, 0, len(tools))
for _, tool := range tools {
	toolInfo, _ := tool.Info(ctx)
	toolInfos = append(toolInfos, toolInfo)
}
model.BindTools(toolInfos)

关键方法说明:

  • BindTools:模型自主决定是直接回答还是调用工具
  • BindForcedTools:模型必须调用一个或多个工具

创建图

初始化图和状态

graph := compose.NewGraph[[]*schema.Message, *schema.Message](
	compose.WithGenLocalState(func(ctx context.Context) *HistoryMessage {
		return &HistoryMessage{
			History:       make([]*schema.Message, 0, 4),
			UserMessage:   "",
			SystemMessage: "",
		}
	}),
)

添加节点

// 历史消息管理节点
historyNode := compose.InvokableLambda(
	func(ctx context.Context, input []*schema.Message) ([]*schema.Message, error) {
		return // 逻辑通过图状态实现
	},
)

graph.AddLambdaNode("history_node", historyNode, 
	compose.WithStatePostHandler(
		func(ctx context.Context, out []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
			result := []*schema.Message{
				{Role: schema.System, Content: state.SystemMessage},
			}
			result = append(result, state.History...)
			return result, nil
		},
	))

// 第一个ChatModel节点
graph.AddChatModelNode("chatmodel1", model,
	compose.WithStatePreHandler(func(ctx context.Context, input []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
		// 提取系统消息和用户消息
		for _, msg := range input {
			if msg.Role == schema.System {
				state.SystemMessage = msg.Content
			} else if msg.Role == schema.User {
				state.UserMessage = msg.Content
			}
		}
		return input, nil
	}),
	compose.WithStatePostHandler(func(ctx context.Context, output *schema.Message, state *HistoryMessage) (*schema.Message, error) {
		// 将模型输出加入历史
		state.History = append(state.History, output)
		return output, nil
	}))

// 工具节点
toolsNode, _ := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
	Tools: tools,
})
graph.AddToolsNode("tool", toolsNode,
	compose.WithStatePostHandler(func(ctx context.Context, out []*schema.Message, state *HistoryMessage) ([]*schema.Message, error) {
		// 将工具输出加入历史
		state.History = append(state.History, out...)
		return out, nil
	}))

// 第二个ChatModel节点
graph.AddChatModelNode("chatmodel2", model)

连接节点

graph.AddEdge(compose.START, "chatmodel1")
graph.AddEdge("chatmodel1", "tool")
graph.AddEdge("tool", "history_node")
graph.AddEdge("history_node", "chatmodel2")
graph.AddEdge("chatmodel2", compose.END)

编译和运行

编译图

runnable, err := graph.Compile(ctx)
if err != nil {
	log.Fatal(err)
}

非流式调用

input := []*schema.Message{
	{Role: schema.User, Content: "我在泰国可以告诉我几点了吗,并且和我说和节日有关的古诗?"},
}
msg, err := runnable.Invoke(ctx, input)
if err != nil {
	log.Fatal(err)
}
fmt.Println(msg.Content)

流式调用

sr, err := runnable.Stream(ctx, input)
if err != nil {
	log.Fatal(err)
}
defer sr.Close()

for {
	msg, err := sr.Recv()
	if err != nil {
		if err == io.EOF {
			break
		}
		log.Println(err)
		break
	}
	fmt.Print(msg.Content)
}

完整示例

测试代码

func TestChainAgent(t *testing.T) {
	BuildChainWithGraph()
}

运行结果

go test -v -run TestChainAgent ./agent/

输出:

=== RUN   TestChainAgent
【非流式输出】

I'll help you get the current time in Bangkok and a festive Chinese poem! Here's what I found:

🕐 **Current Time in Bangkok (Asia/Bangkok):** 2026-04-18 19:45:12

📜 **Poem of the Moment:**
> "才过清明,渐觉伤春暮。"  
> — *蝶恋花·春暮* by 李冠  
> *(Category: Festival - Qingming Festival)*

The poem reflects on the feeling after the Qingming Festival (Tomb-Sweeping Day), sensing the lingering melancholy of spring's departure.

【流式输出】

您好!让我为您查询了一些信息:

📅 **当前时间**(曼谷时区)
2026年4月18日 19:45:20

📖 **今日诗词**
> "独写菖蒲竹叶杯,蓬城芳草踏初回。"

— 汤显祖《午日处州禁竞渡》

这首诗写的是端午节的情境,描绘了诗人在端午节饮用菖蒲酒、踏青归来的情景。

--- PASS: TestChainAgent (20.51s)
PASS

执行流程分析

  1. ChatModel1

    • 输入:“我在泰国可以告诉我几点了吗,并且和我说和节日有关的古诗?”
    • 模型决定调用两个工具:get_timeget_quote
  2. Tool

    • 调用 get_time,参数:{"time_zone": "Asia/Bangkok"}
    • 调用 get_quote,参数:{"category": "jieri"}
    • 返回两个工具的结果
  3. HistoryNode

    • 构建完整的对话历史
    • 包含系统消息、用户消息、工具调用和工具结果
  4. ChatModel2

    • 输入:完整的对话历史
    • 生成最终答案,整合时间信息和古诗内容

关键技术点

1. 工具绑定

model.BindTools(toolInfos)

让模型理解每个工具的功能和使用方式,模型自主决定是否调用工具。

2. 状态管理

通过图状态管理历史消息:

// Pre Handler: 提取消息
compose.WithStatePreHandler(func(ctx context.Context, input []*schema.Message, state *HistoryMessage) {
    // 提取系统消息和用户消息
})

// Post Handler: 更新历史
compose.WithStatePostHandler(func(ctx context.Context, output *schema.Message, state *HistoryMessage) {
    state.History = append(state.History, output)
})

3. 多工具调用

模型可能同时调用多个工具:

state.History = append(state.History, out...)  // out是[]*schema.Message

4. 流式输出

支持流式和非流式两种调用方式:

// 非流式
msg, err := runnable.Invoke(ctx, input)

// 流式
sr, err := runnable.Stream(ctx, input)

Chain vs ReAct

特性 Chain Agent ReAct Agent
调用次数 固定(2次ChatModel) 动态(可能多次循环)
实现复杂度 简单 复杂
适用场景 简单任务 复杂推理任务
执行流程 可预测 不可预测
性能 更快 可能更慢

应用场景

1. 信息查询

用户提问 → 调用查询工具 → 返回结果

2. 数据处理

用户输入 → 调用处理工具 → 返回处理结果

3. 简单任务

用户指令 → 调用执行工具 → 返回执行结果