图的构建
2026年4月18日 · 658 字 · 4 分钟
使用Eino框架的compose包构建复杂的有向图工作流,实现灵活的任务编排 在构建AI应用时,我们经常需要将多个处理步骤组合成一个工作流。图编排(Graph Orchestration)是一种将任务节点通过边连接成有向图的方式,可以: 节点是图中的处理单元,每个节点接收输入并产生输出。 边定义了节点之间的执行顺序: 图是节点和边的容器,定义了整个工作流的拓扑结构。 首先定义图中流转的数据类型: 使用 分支边根据条件动态选择下一个节点: 编译时检查节点间数据类型是否匹配: 输出: 执行流程详解: 第一个输入(长度19): 第二个输入(长度20): 编译时检查上下游节点的数据类型: 如果类型不匹配,编译时会报错。 分支函数根据输入动态选择路径: 在 这确保了编译器能够进行完整的类型检查。 每个节点应该: 分支条件应该: 数据类型应该: 节点函数应该:什么是图编排
核心概念
节点(Node)
边(Edge)
图(Graph)
架构图
代码实现
依赖安装
go get github.com/cloudwego/eino/compose
定义数据类型
package enio_graph
// 输入类型
type TA struct {
Data string
}
// 中间类型
type TB struct {
Data int
}
type TC struct {
Data float64
}
// 输出类型
type TD struct {
Data bool
}
创建节点
compose.InvokableLambda创建Lambda节点:import (
"context"
"github.com/cloudwego/eino/compose"
)
// node1: 将字符串长度转为整数
node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
return &TB{Data: len(req.Data)}, nil
})
// node2: 将整数转为浮点数
node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
return &TC{Data: float64(req.Data)}, nil
})
// node3: 判断整数是否大于5
node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
return &TD{Data: req.Data > 5}, nil
})
// node4: 判断浮点数是否小于等于5
node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
return &TD{input.Data <= 5}, nil
})
构建图
创建图实例
// 创建图,指定输入类型*TA,输出类型*TD
graph := compose.NewGraph[*TA, *TD]()
添加节点
graph.AddLambdaNode("node1", node1)
graph.AddLambdaNode("node2", node2)
graph.AddLambdaNode("node3", node3)
graph.AddLambdaNode("node4", node4)
添加普通边
// 从START到node1
graph.AddEdge(compose.START, "node1")
// node2到node4
graph.AddEdge("node2", "node4")
// node3到END
graph.AddEdge("node3", compose.END)
// node4到END
graph.AddEdge("node4", compose.END)
添加分支边
graph.AddBranch("node1", compose.NewGraphBranch[*TB](
func(ctx context.Context, in *TB) (string, error) {
// 根据数据奇偶性选择分支
if in.Data%2 == 0 {
return "node2", nil // 偶数走node2
} else {
return "node3", nil // 奇数走node3
}
},
map[string]bool{
"node2": true, // 声明所有可能的目标节点
"node3": true,
},
))
编译和运行
编译图
runnable, err := graph.Compile(ctx)
if err != nil {
log.Fatal(err)
}
运行图
input := &TA{Data: "hello, how are you"}
result, err := runnable.Invoke(ctx, input)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Data) // 输出: false 或 true
完整示例
完整代码
package enio_graph
import (
"context"
"fmt"
"log"
"github.com/cloudwego/eino/compose"
)
func BuildGraph(msg string) {
ctx := context.Background()
// 构建节点
node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
return &TB{Data: len(req.Data)}, nil
})
node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
return &TC{Data: float64(req.Data)}, nil
})
node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
return &TD{Data: req.Data > 5}, nil
})
node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
return &TD{input.Data <= 5}, nil
})
// 构建图
graph := compose.NewGraph[*TA, *TD]()
graph.AddLambdaNode("node1", node1)
graph.AddLambdaNode("node2", node2)
graph.AddLambdaNode("node3", node3)
graph.AddLambdaNode("node4", node4)
// 添加边
graph.AddEdge(compose.START, "node1")
graph.AddEdge("node2", "node4")
graph.AddEdge("node3", compose.END)
graph.AddEdge("node4", compose.END)
// 添加分支
graph.AddBranch("node1", compose.NewGraphBranch[*TB](
func(ctx context.Context, in *TB) (string, error) {
if in.Data%2 == 0 {
return "node2", nil
} else {
return "node3", nil
}
},
map[string]bool{"node2": true, "node3": true},
))
// 编译
runnable, err := graph.Compile(ctx)
if err != nil {
log.Fatal(err)
}
// 运行
input := &TA{msg}
result, err := runnable.Invoke(ctx, input)
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Data)
}
测试代码
func TestComposeGraph(t *testing.T) {
BuildGraph("hello, how are you") // 长度19,奇数
BuildGraph("hello, how are you!") // 长度20,偶数
}
运行结果
go test -v -run TestComposeGraph ./enio_graph/
=== RUN TestComposeGraph
false
true
--- PASS: TestComposeGraph (0.00s)
PASS
结果分析
输入
长度
奇偶性
执行路径
输出
“hello, how are you”
19
奇数
node1 → node3 → END
true (19 > 5)
“hello, how are you!”
20
偶数
node1 → node2 → node4 → END
false (20.0 <= 5)
关键特性
类型安全
node1: *TA → *TB
node2: *TB → *TC ✓ 输入*TB与node1输出匹配
node3: *TB → *TD ✓ 输入*TB与node1输出匹配
node4: *TC → *TD ✓ 输入*TC与node2输出匹配
分支条件
func(ctx context.Context, in *TB) (string, error) {
// 返回下一个节点的名称
if condition {
return "nodeA", nil
}
return "nodeB", nil
}
声明式目标
AddBranch中必须声明所有可能的目标节点:map[string]bool{
"node2": true,
"node3": true,
}
应用场景
1. 数据处理管道
输入 → 验证 → 转换 → 格式化 → 输出
2. AI Agent工作流
用户问题 → 意图识别 ──┬── 信息检索 → 生成回答
└── 工具调用 → 返回结果
3. 条件分支处理
输入数据 → 分类 ──┬── 类型A处理
├── 类型B处理
└── 类型C处理
最佳实践
1. 合理划分节点
2. 分支逻辑清晰
3. 类型设计
4. 错误处理