图的构建

2026年4月18日 · 658 字 · 4 分钟

使用Eino框架的compose包构建复杂的有向图工作流,实现灵活的任务编排

什么是图编排

在构建AI应用时,我们经常需要将多个处理步骤组合成一个工作流。图编排(Graph Orchestration)是一种将任务节点通过边连接成有向图的方式,可以:

  • 表达复杂流程:支持分支、并行、循环等复杂逻辑
  • 类型安全:编译时检查节点间的数据类型是否匹配
  • 可视化:图的拓扑结构清晰,便于理解和调试

核心概念

节点(Node)

节点是图中的处理单元,每个节点接收输入并产生输出。

边(Edge)

边定义了节点之间的执行顺序:

  • 普通边:固定从一个节点到另一个节点
  • 分支边:根据条件选择下一个节点

图(Graph)

图是节点和边的容器,定义了整个工作流的拓扑结构。

架构图

Graph架构图

代码实现

依赖安装

go get github.com/cloudwego/eino/compose

定义数据类型

首先定义图中流转的数据类型:

package enio_graph

// 输入类型
type TA struct {
	Data string
}

// 中间类型
type TB struct {
	Data int
}
type TC struct {
	Data float64
}

// 输出类型
type TD struct {
	Data bool
}

创建节点

使用compose.InvokableLambda创建Lambda节点:

import (
	"context"
	"github.com/cloudwego/eino/compose"
)

// node1: 将字符串长度转为整数
node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
	return &TB{Data: len(req.Data)}, nil
})

// node2: 将整数转为浮点数
node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
	return &TC{Data: float64(req.Data)}, nil
})

// node3: 判断整数是否大于5
node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
	return &TD{Data: req.Data > 5}, nil
})

// node4: 判断浮点数是否小于等于5
node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
	return &TD{input.Data <= 5}, nil
})

构建图

创建图实例

// 创建图,指定输入类型*TA,输出类型*TD
graph := compose.NewGraph[*TA, *TD]()

添加节点

graph.AddLambdaNode("node1", node1)
graph.AddLambdaNode("node2", node2)
graph.AddLambdaNode("node3", node3)
graph.AddLambdaNode("node4", node4)

添加普通边

// 从START到node1
graph.AddEdge(compose.START, "node1")

// node2到node4
graph.AddEdge("node2", "node4")

// node3到END
graph.AddEdge("node3", compose.END)

// node4到END
graph.AddEdge("node4", compose.END)

添加分支边

分支边根据条件动态选择下一个节点:

graph.AddBranch("node1", compose.NewGraphBranch[*TB](
	func(ctx context.Context, in *TB) (string, error) {
		// 根据数据奇偶性选择分支
		if in.Data%2 == 0 {
			return "node2", nil  // 偶数走node2
		} else {
			return "node3", nil  // 奇数走node3
		}
	},
	map[string]bool{
		"node2": true,  // 声明所有可能的目标节点
		"node3": true,
	},
))

编译和运行

编译图

编译时检查节点间数据类型是否匹配:

runnable, err := graph.Compile(ctx)
if err != nil {
	log.Fatal(err)
}

运行图

input := &TA{Data: "hello, how are you"}
result, err := runnable.Invoke(ctx, input)
if err != nil {
	log.Fatal(err)
}
fmt.Println(result.Data)  // 输出: false 或 true

完整示例

完整代码

package enio_graph

import (
	"context"
	"fmt"
	"log"

	"github.com/cloudwego/eino/compose"
)

func BuildGraph(msg string) {
	ctx := context.Background()

	// 构建节点
	node1 := compose.InvokableLambda[*TA, *TB](func(ctx context.Context, req *TA) (*TB, error) {
		return &TB{Data: len(req.Data)}, nil
	})
	node2 := compose.InvokableLambda[*TB, *TC](func(ctx context.Context, req *TB) (*TC, error) {
		return &TC{Data: float64(req.Data)}, nil
	})
	node3 := compose.InvokableLambda[*TB, *TD](func(ctx context.Context, req *TB) (*TD, error) {
		return &TD{Data: req.Data > 5}, nil
	})
	node4 := compose.InvokableLambda[*TC, *TD](func(ctx context.Context, input *TC) (output *TD, err error) {
		return &TD{input.Data <= 5}, nil
	})

	// 构建图
	graph := compose.NewGraph[*TA, *TD]()
	graph.AddLambdaNode("node1", node1)
	graph.AddLambdaNode("node2", node2)
	graph.AddLambdaNode("node3", node3)
	graph.AddLambdaNode("node4", node4)

	// 添加边
	graph.AddEdge(compose.START, "node1")
	graph.AddEdge("node2", "node4")
	graph.AddEdge("node3", compose.END)
	graph.AddEdge("node4", compose.END)

	// 添加分支
	graph.AddBranch("node1", compose.NewGraphBranch[*TB](
		func(ctx context.Context, in *TB) (string, error) {
			if in.Data%2 == 0 {
				return "node2", nil
			} else {
				return "node3", nil
			}
		},
		map[string]bool{"node2": true, "node3": true},
	))

	// 编译
	runnable, err := graph.Compile(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// 运行
	input := &TA{msg}
	result, err := runnable.Invoke(ctx, input)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Println(result.Data)
}

测试代码

func TestComposeGraph(t *testing.T) {
	BuildGraph("hello, how are you")   // 长度19,奇数
	BuildGraph("hello, how are you!")  // 长度20,偶数
}

运行结果

go test -v -run TestComposeGraph ./enio_graph/

输出:

=== RUN   TestComposeGraph
false
true
--- PASS: TestComposeGraph (0.00s)
PASS

结果分析

输入 长度 奇偶性 执行路径 输出
“hello, how are you” 19 奇数 node1 → node3 → END true (19 > 5)
“hello, how are you!” 20 偶数 node1 → node2 → node4 → END false (20.0 <= 5)

执行流程详解:

  1. 第一个输入(长度19):

    • node1: 计算长度 = 19
    • 分支判断: 19%2=1(奇数)→ 走node3
    • node3: 19 > 5 → 返回true
  2. 第二个输入(长度20):

    • node1: 计算长度 = 20
    • 分支判断: 20%2=0(偶数)→ 走node2
    • node2: 转为浮点数 20.0
    • node4: 20.0 <= 5 → 返回false

关键特性

类型安全

编译时检查上下游节点的数据类型:

node1: *TA → *TB
node2: *TB → *TC    ✓ 输入*TB与node1输出匹配
node3: *TB → *TD    ✓ 输入*TB与node1输出匹配
node4: *TC → *TD    ✓ 输入*TC与node2输出匹配

如果类型不匹配,编译时会报错。

分支条件

分支函数根据输入动态选择路径:

func(ctx context.Context, in *TB) (string, error) {
    // 返回下一个节点的名称
    if condition {
        return "nodeA", nil
    }
    return "nodeB", nil
}

声明式目标

AddBranch中必须声明所有可能的目标节点:

map[string]bool{
    "node2": true,
    "node3": true,
}

这确保了编译器能够进行完整的类型检查。

应用场景

1. 数据处理管道

输入 → 验证 → 转换 → 格式化 → 输出

2. AI Agent工作流

用户问题 → 意图识别 ──┬── 信息检索 → 生成回答
                      └── 工具调用 → 返回结果

3. 条件分支处理

输入数据 → 分类 ──┬── 类型A处理
                   ├── 类型B处理
                   └── 类型C处理

最佳实践

1. 合理划分节点

每个节点应该:

  • 单一职责
  • 输入输出类型明确
  • 可独立测试

2. 分支逻辑清晰

分支条件应该:

  • 互斥且完备
  • 简单明了
  • 避免过多分支

3. 类型设计

数据类型应该:

  • 语义明确
  • 只包含必要字段
  • 便于扩展

4. 错误处理

节点函数应该:

  • 返回有意义的错误
  • 处理边界情况
  • 记录关键日志