Eino 框架实战指南:四种 AI 集成方式完全解析
- Eino 框架实战指南:四种 AI 集成方式完全解析
Eino 框架实战指南:四种 AI 集成方式完全解析
📖 前言
在 AI 应用开发中,如何优雅地编排 LLM(大语言模型)与各种组件的交互流程是一个关键问题。Eino 框架提供了四种强大的编排方式,从简单的线性流程到复杂的智能决策,满足不同场景的需求。
本文将通过实战代码和运行结果,带你深入理解这四种编排方式:
- Chain(链):线性流水线,简单直接
- Graph(图):支持分支的图结构,灵活路由
- Workflow(工作流):字段级数据映射,精细控制
- ReAct Agent(智能体):LLM 自主决策,动态执行
🔗 一、Chain:线性编排的艺术
核心概念
Chain 是 Eino 中最基础的编排方式,它将多个组件按照固定顺序串联起来,形成一条数据处理流水线。就像工厂的生产线一样,每个工位(节点)完成自己的任务后,将结果传递给下一个工位。
Chain 的特点:
- ✅ 类型安全:编译时检查输入输出类型
- ✅ 自动流式处理:无需手动处理流式转换
- ✅ 声明式编程:通过 Append 方法链式构建
- ✅ 简单直观:适合线性处理流程
示例 1:简单的内容总结 Chain
这个示例展示了如何创建一个基本的 Chain,将 ChatTemplate 和 ChatModel 串联起来,实现文本总结功能。
package main
import (
"context"
"fmt"
"log"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"eino-deepseek-demo/pkg/deepseek"
)
func main() {
fmt.Println("=== Chain 示例:内容总结 ===\n")
// 创建 ChatModel
apiKey := "sk-xxx"
chatModel, err := deepseek.NewChatModel(apiKey, "")
if err != nil {
log.Fatalf("创建 ChatModel 失败: %v", err)
}
ctx := context.Background()
// 创建 Prompt Template
summarizeTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的内容总结助手。"),
schema.UserMessage("请用一句话总结以下内容:\n{content}"),
)
// 创建 Chain: ChatTemplate -> ChatModel
chain := compose.NewChain[map[string]any, *schema.Message]()
chain.AppendChatTemplate(summarizeTemplate)
chain.AppendChatModel(chatModel)
// 编译 Chain
runnable, err := chain.Compile(ctx)
if err != nil {
log.Fatalf("编译 Chain 失败: %v", err)
}
// 准备输入
input := map[string]any{
"content": `Go 语言是 Google 开发的一种静态强类型、编译型语言。
它具有简洁的语法、高效的并发处理能力和快速的编译速度。
Go 语言特别适合构建微服务、云原生应用和高性能网络服务。`,
}
fmt.Println("📥 输入内容:")
fmt.Println(input["content"])
fmt.Println()
// 执行 Chain
result, err := runnable.Invoke(ctx, input)
if err != nil {
log.Fatalf("执行 Chain 失败: %v", err)
}
fmt.Println("📤 总结结果:")
fmt.Println(result.Content)
}
运行结果:
=== Chain 示例:内容总结 ===
📥 输入内容:
Go 语言是 Google 开发的一种静态强类型、编译型语言。
它具有简洁的语法、高效的并发处理能力和快速的编译速度。
Go 语言特别适合构建微服务、云原生应用和高性能网络服务。
📤 总结结果:
Go 语言是由 Google 开发的一种静态强类型编译型语言,以其简洁语法、高效并发和快速编译而著称,特别适合构建微服务、云原生应用和高性能网络服务。
示例 2:多步骤 Chain(翻译 → 总结)
更复杂的场景可能需要多个步骤,这个示例展示如何将两个 Chain 串联起来,先翻译再总结。
// 第一步:翻译
translateTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的翻译助手。"),
schema.UserMessage("请将以下英文翻译成中文:\n{text}"),
)
translateChain := compose.NewChain[map[string]any, *schema.Message]()
translateChain.AppendChatTemplate(translateTemplate)
translateChain.AppendChatModel(chatModel)
translateRunnable, err := translateChain.Compile(ctx)
if err != nil {
log.Fatalf("编译翻译 Chain 失败: %v", err)
}
// 英文输入
englishInput := map[string]any{
"text": `Artificial Intelligence is transforming how we work and live.
Machine learning algorithms can now process vast amounts of data,
enabling applications from self-driving cars to personalized medicine.`,
}
// 执行翻译
translateResult, err := translateRunnable.Invoke(ctx, englishInput)
if err != nil {
log.Fatalf("翻译失败: %v", err)
}
// 第二步:总结翻译结果
summaryTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的内容总结助手。"),
schema.UserMessage("请用一句话总结以下中文内容:\n{text}"),
)
summaryChain := compose.NewChain[map[string]any, *schema.Message]()
summaryChain.AppendChatTemplate(summaryTemplate)
summaryChain.AppendChatModel(chatModel)
summaryRunnable, err := summaryChain.Compile(ctx)
if err != nil {
log.Fatalf("编译总结 Chain 失败: %v", err)
}
summaryInput := map[string]any{
"text": translateResult.Content,
}
summaryResult, err := summaryRunnable.Invoke(ctx, summaryInput)
if err != nil {
log.Fatalf("总结失败: %v", err)
}
运行结果:
📥 英文输入:
Artificial Intelligence is transforming how we work and live.
Machine learning algorithms can now process vast amounts of data,
enabling applications from self-driving cars to personalized medicine.
📝 翻译结果:
人工智能正在改变我们的工作与生活方式。
机器学习算法如今能够处理海量数据,
实现了从自动驾驶汽车到个性化医疗的各种应用。
📤 最终总结:
人工智能通过机器学习处理海量数据,正在变革我们的工作与生活方式,并实现从自动驾驶到个性化医疗的各种应用。
示例 3:带 Lambda 的 Chain(自定义处理)
Lambda 节点允许我们在 Chain 中插入自定义处理逻辑,这对于数据预处理和后处理非常有用。
// 创建情感分析 Chain
sentimentTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个情感分析专家。只回答:积极、消极 或 中性。"),
schema.UserMessage("请分析以下文本的情感倾向:\n{text}"),
)
// 预处理 Lambda
preprocessLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
text := input["text"].(string)
processed := "[待分析文本] " + text
return map[string]any{"text": processed}, nil
})
// 后处理 Lambda
postprocessLambda := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
return fmt.Sprintf("情感分析结果: %s", msg.Content), nil
})
// 构建 Chain: 预处理 -> Template -> Model -> 后处理
sentimentChain := compose.NewChain[map[string]any, string]()
sentimentChain.AppendLambda(preprocessLambda)
sentimentChain.AppendChatTemplate(sentimentTemplate)
sentimentChain.AppendChatModel(chatModel)
sentimentChain.AppendLambda(postprocessLambda)
sentimentRunnable, err := sentimentChain.Compile(ctx)
if err != nil {
log.Fatalf("编译情感分析 Chain 失败: %v", err)
}
testTexts := []string{
"这个产品真的太棒了!我非常满意!",
"服务态度很差,我很失望。",
"今天天气不错,适合出门。",
}
for _, text := range testTexts {
result, err := sentimentRunnable.Invoke(ctx, map[string]any{"text": text})
if err != nil {
log.Printf("分析失败: %v", err)
continue
}
fmt.Printf("📥 输入: %s\n📤 %s\n\n", text, result)
}
运行结果:
📥 输入: 这个产品真的太棒了!我非常满意!
📤 情感分析结果: 积极
📥 输入: 服务态度很差,我很失望。
📤 情感分析结果: 消极
📥 输入: 今天天气不错,适合出门。
📤 情感分析结果: 积极
Chain 使用要点
- 创建 Chain:
compose.NewChain[InputType, OutputType]() - 添加节点:
AppendChatTemplate()- 添加提示词模板AppendChatModel()- 添加 LLM 模型AppendLambda()- 添加自定义处理函数
- 编译执行:
Compile()编译 Chain,Invoke()执行 - 类型安全:编译时检查节点间类型兼容性
🌐 二、Graph:灵活的条件路由
核心概念
Graph 突破了 Chain 的线性限制,支持构建有向图结构。通过条件分支(Branch),可以根据运行时状态动态选择执行路径,实现更复杂的业务逻辑。
Graph 的特点:
- ✅ 灵活的图结构:支持分支和合并
- ✅ 条件路由:根据运行时状态动态选择路径
- ✅ 类型安全:编译时检查节点间类型兼容性
- ✅ 显式控制:通过 AddEdge 清晰定义执行流程
示例 1:简单 Graph(线性流程)
即使是简单的线性流程,使用 Graph 也能带来更好的可维护性和扩展性。
package main
import (
"context"
"fmt"
"log"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"eino-deepseek-demo/pkg/deepseek"
)
func main() {
fmt.Println("=== Graph 示例:简单线性流程 ===\n")
apiKey := "sk-xxx"
chatModel, err := deepseek.NewChatModel(apiKey, "")
if err != nil {
log.Fatalf("创建 ChatModel 失败: %v", err)
}
ctx := context.Background()
// 创建 Graph
simpleGraph := compose.NewGraph[map[string]any, *schema.Message]()
// 创建 Prompt Template
qaTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个知识渊博的助手。"),
schema.UserMessage("{question}"),
)
// 添加节点
_ = simpleGraph.AddChatTemplateNode("prompt", qaTemplate)
_ = simpleGraph.AddChatModelNode("model", chatModel)
// 添加边:定义执行流程
// START -> prompt -> model -> END
_ = simpleGraph.AddEdge(compose.START, "prompt")
_ = simpleGraph.AddEdge("prompt", "model")
_ = simpleGraph.AddEdge("model", compose.END)
// 编译 Graph
simpleRunnable, err := simpleGraph.Compile(ctx)
if err != nil {
log.Fatalf("编译 Graph 失败: %v", err)
}
// 执行
question := "什么是 Kubernetes?"
fmt.Printf("📥 问题: %s\n\n", question)
result, err := simpleRunnable.Invoke(ctx, map[string]any{"question": question})
if err != nil {
log.Fatalf("执行失败: %v", err)
}
fmt.Println("📤 回答:")
fmt.Println(result.Content)
}
运行结果:
=== Graph 示例:简单线性流程 ===
📥 问题: 什么是 Kubernetes?
📤 回答:
Kubernetes(通常简称为 K8s)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。
它最初由 Google 设计并开源,现在由云原生计算基金会(CNCF)维护。
核心功能与特点:
1. 自动化容器部署与管理 - 自动部署、重启、扩展或销毁容器
2. 服务发现与负载均衡 - 自动分配 IP 和 DNS,智能分发流量
3. 存储编排 - 支持本地存储、云存储等多种存储方案
4. 自我修复 - 自动重启故障容器,确保高可用性
5. 水平扩展 - 根据负载自动调整容器实例数量
Kubernetes 已成为云原生技术的基石,被广泛应用于企业级应用和现代软件开发中。
示例 2:带分支的 Graph(条件路由)
这是 Graph 最强大的功能:根据条件动态选择执行路径。这个示例实现了一个智能问答路由系统。
流程图:
┌──→ [tech] ────┐
[START] → [classifier] ──→ [formatter] → [END]
└──→ [general] ─┘
const (
NodeClassifier = "classifier"
NodeTech = "tech"
NodeGeneral = "general"
NodeFormatter = "formatter"
)
branchGraph := compose.NewGraph[map[string]any, string]()
// 分类器 Lambda - 判断问题类型
classifierLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
question := input["question"].(string)
// 简单的关键词分类逻辑
techKeywords := []string{"代码", "编程", "API", "数据库", "Kubernetes", "Docker", "Go"}
isTech := false
for _, kw := range techKeywords {
if contains(question, kw) {
isTech = true
break
}
}
return map[string]any{
"question": question,
"is_tech": isTech,
}, nil
})
// 技术问题处理模板
techTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个资深技术专家,擅长解释技术概念。请用专业但易懂的方式回答。"),
schema.UserMessage("{question}"),
)
// 通用问题处理模板
generalTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个友善的助手,用通俗易懂的语言回答问题。"),
schema.UserMessage("{question}"),
)
// 格式化 Lambda
formatterLambda := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
return fmt.Sprintf("【回答】\n%s", msg.Content), nil
})
// 添加节点
_ = branchGraph.AddLambdaNode(NodeClassifier, classifierLambda)
_ = branchGraph.AddChatTemplateNode(NodeTech, techTemplate)
_ = branchGraph.AddChatTemplateNode(NodeGeneral, generalTemplate)
_ = branchGraph.AddChatModelNode("tech_model", chatModel)
_ = branchGraph.AddChatModelNode("general_model", chatModel)
_ = branchGraph.AddLambdaNode(NodeFormatter, formatterLambda)
// 添加边
_ = branchGraph.AddEdge(compose.START, NodeClassifier)
// 添加条件分支
_ = branchGraph.AddBranch(NodeClassifier, compose.NewGraphBranch(
// 条件函数:根据分类结果选择路径
func(ctx context.Context, input map[string]any) (string, error) {
isTech := input["is_tech"].(bool)
if isTech {
return NodeTech, nil
}
return NodeGeneral, nil
},
// 分支映射
map[string]bool{
NodeTech: true,
NodeGeneral: true,
},
))
// 技术分支流程
_ = branchGraph.AddEdge(NodeTech, "tech_model")
_ = branchGraph.AddEdge("tech_model", NodeFormatter)
// 通用分支流程
_ = branchGraph.AddEdge(NodeGeneral, "general_model")
_ = branchGraph.AddEdge("general_model", NodeFormatter)
// 输出
_ = branchGraph.AddEdge(NodeFormatter, compose.END)
// 编译并测试
branchRunnable, err := branchGraph.Compile(ctx)
if err != nil {
log.Fatalf("编译失败: %v", err)
}
testQuestions := []string{
"如何在 Go 语言中实现并发?",
"今天适合出门旅游吗?",
"Docker 和 Kubernetes 有什么区别?",
}
for _, q := range testQuestions {
fmt.Printf("📥 问题: %s\n", q)
result, err := branchRunnable.Invoke(ctx, map[string]any{"question": q})
if err != nil {
log.Printf("执行失败: %v\n", err)
continue
}
fmt.Printf("📤 %s\n\n", result)
}
运行结果:
📥 问题: 如何在 Go 语言中实现并发?
📤 【回答】
在 Go 语言中,实现并发主要依赖于 goroutine 和 channel 这两个核心特性。
1. Goroutine - 轻量级线程
使用 go 关键字启动 goroutine:
go func() {
fmt.Println("这是在 goroutine 中执行")
}()
2. Channel - 通信机制
Channel 用于在 goroutine 之间安全地传递数据:
ch := make(chan int)
go func() {
ch <- 42 // 发送数据
}()
value := <-ch // 接收数据
3. 同步原语
- WaitGroup:等待多个 goroutine 完成
- Mutex:互斥锁保护共享资源
- Select:多路复用 channel 操作
Go 的并发模型基于 CSP 理论,强调通过通信共享内存,使并发编程更加简单和安全。
---
📥 问题: 今天适合出门旅游吗?
📤 【回答】
这个问题需要根据您所在地区的天气情况来决定!建议您先查看当地天气预报,考虑以下几点:
1. 天气状况:晴天、温度适宜最佳
2. 行程安排:确保目的地没有自然灾害预警
3. 交通状况:避免拥堵和延误
4. 健康安全:考虑天气对身体的影响
如果天气不错,行程顺利,那就放心出门享受美好的一天吧!祝您旅途愉快! 🌞🚗🎒
---
📥 问题: Docker 和 Kubernetes 有什么区别?
📤 【回答】
Docker 和 Kubernetes 是容器化生态系统中两个关键但功能不同的技术:
## Docker:容器化引擎
- 构建容器镜像
- 运行单个容器
- 打包和分发应用
## Kubernetes:容器编排系统
- 管理容器集群
- 自动化运维(扩缩容、故障恢复)
- 声明式配置
## 关键区别
| 维度 | Docker | Kubernetes |
|------|--------|------------|
| 核心功能 | 构建和运行单个容器 | 编排和管理容器集群 |
| 适用规模 | 单机或小规模 | 大规模生产环境 |
| 学习曲线 | 相对简单 | 较陡峭 |
它们不是竞争关系,而是互补的:Docker 负责应用打包,Kubernetes 负责部署和运维。
Graph 使用要点
- 创建 Graph:
compose.NewGraph[InputType, OutputType]() - 添加节点:
AddXxxNode()添加各类节点 - 定义边:
AddEdge()连接节点 - 条件分支:
AddBranch()实现动态路由 - 特殊节点:
compose.START和compose.END
🔄 三、Workflow:精细的数据编排
核心概念
Workflow 是最高级的编排方式,支持字段级别的数据映射。它可以精确控制数据如何从一个节点流向另一个节点,一个节点可以从多个前置节点获取不同字段的数据。
Workflow 的特点:
- ✅ 字段级映射:精确控制数据流转
- ✅ 多输入合并:从多个节点获取数据
- ✅ 静态值注入:设置固定输入值
- ✅ 自动依赖管理:框架处理执行顺序
示例 1:简单 Workflow(字段映射)
package main
import (
"context"
"fmt"
"log"
"github.com/cloudwego/eino/components/prompt"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"eino-deepseek-demo/pkg/deepseek"
)
func main() {
fmt.Println("=== Workflow 示例:字段映射 ===\n")
apiKey := "sk-xxx"
chatModel, err := deepseek.NewChatModel(apiKey, "")
if err != nil {
log.Fatalf("创建 ChatModel 失败: %v", err)
}
ctx := context.Background()
simpleWorkflow := compose.NewWorkflow[map[string]any, *schema.Message]()
qaTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的助手。"),
schema.UserMessage("{question}"),
)
// 添加节点并定义输入映射
simpleWorkflow.AddChatTemplateNode("prompt", qaTemplate).
AddInput(compose.START) // 从 START 获取输入
simpleWorkflow.AddChatModelNode("model", chatModel).
AddInput("prompt") // 从 prompt 节点获取输入
simpleWorkflow.End().
AddInput("model") // 将 model 的输出作为最终输出
// 编译执行
simpleRunnable, err := simpleWorkflow.Compile(ctx)
if err != nil {
log.Fatalf("编译失败: %v", err)
}
question := "请简要介绍 Go 语言的并发模型"
fmt.Printf("📥 问题: %s\n\n", question)
result, err := simpleRunnable.Invoke(ctx, map[string]any{"question": question})
if err != nil {
log.Fatalf("执行失败: %v", err)
}
fmt.Println("📤 回答:")
fmt.Println(result.Content)
}
运行结果:
=== Workflow 示例:字段映射 ===
📥 问题: 请简要介绍 Go 语言的并发模型
📤 回答:
Go 语言的并发模型基于 CSP(Communicating Sequential Processes)理论,
由 goroutine 和 channel 构成:
1. Goroutine(轻量级线程)
- 由 Go 运行时管理,资源占用少
- 使用 go 关键字启动
2. Channel(通道)
- 用于 goroutine 间的安全数据传递
- 分为无缓冲通道和带缓冲通道
3. Select 多路复用
- 监听多个 channel 操作
这种模型简化了并发编程,兼具高效与安全性。
示例 2:多步骤 Workflow(文章生成流水线)
这个示例展示了如何构建一个复杂的文章生成流水线,包含大纲生成、内容撰写和审核三个步骤。
流程图:
[START] ──────────────────────────────────────────┐
│ │
↓ │
[outline] ─→ [content] ─→ [review] ─→ [format] ─→ [END]
│ ↑ ↑ ↑
└──────────────┴───────────┴───────────┘
// 大纲生成模板
outlineTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的内容策划师。"),
schema.UserMessage("请为主题「{topic}」生成一个简洁的文章大纲(3个要点)。\n风格要求:{style}"),
)
// 内容生成模板
contentTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的内容写作者。"),
schema.UserMessage("请根据以下大纲撰写文章内容(每个要点约50字):\n\n大纲:\n{outline}"),
)
// 审核模板
reviewTemplate := prompt.FromMessages(
schema.FString,
schema.SystemMessage("你是一个专业的内容审核编辑。"),
schema.UserMessage("请审核以下文章,提出改进建议(1-2条):\n\n{content}"),
)
articleWorkflow := compose.NewWorkflow[map[string]any, *schema.Message]()
// 步骤 1: 生成大纲
articleWorkflow.AddChatTemplateNode("outline_prompt", outlineTemplate).
AddInput(compose.START)
articleWorkflow.AddChatModelNode("outline_model", chatModel).
AddInput("outline_prompt")
// 步骤 2: 生成内容(使用 Lambda 提取大纲)
extractOutline := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (map[string]any, error) {
return map[string]any{"outline": msg.Content}, nil
})
articleWorkflow.AddLambdaNode("extract_outline", extractOutline).
AddInput("outline_model")
articleWorkflow.AddChatTemplateNode("content_prompt", contentTemplate).
AddInput("extract_outline")
articleWorkflow.AddChatModelNode("content_model", chatModel).
AddInput("content_prompt")
// 步骤 3: 审核内容
extractContent := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (map[string]any, error) {
return map[string]any{"content": msg.Content}, nil
})
articleWorkflow.AddLambdaNode("extract_content", extractContent).
AddInput("content_model")
articleWorkflow.AddChatTemplateNode("review_prompt", reviewTemplate).
AddInput("extract_content")
articleWorkflow.AddChatModelNode("review_model", chatModel).
AddInput("review_prompt")
// 最终输出
articleWorkflow.End().AddInput("review_model")
// 编译执行
articleRunnable, err := articleWorkflow.Compile(ctx)
if err != nil {
log.Fatalf("编译失败: %v", err)
}
topic := "人工智能在医疗领域的应用"
style := "专业但易懂"
fmt.Printf("📥 主题: %s\n", topic)
fmt.Printf("📥 风格: %s\n\n", style)
fmt.Println("⏳ 正在生成文章(大纲 → 内容 → 审核)...\n")
articleResult, err := articleRunnable.Invoke(ctx, map[string]any{
"topic": topic,
"style": style,
})
if err != nil {
log.Fatalf("文章生成失败: %v", err)
}
fmt.Println("📤 审核建议:")
fmt.Println(articleResult.Content)
运行结果:
📥 主题: 人工智能在医疗领域的应用
📥 风格: 专业但易懂
⏳ 正在生成文章(大纲 → 内容 → 审核)...
📤 审核建议:
根据您提供的文章内容,整体结构清晰、专业且易于理解,但可以在以下方面改进:
1. 增强例子的具体性和数据支持
在"诊断辅助与影像分析"和"治疗优化与个性化医疗"部分,添加具体数据或研究引用
(如"AI在肺癌筛查中的准确率可达95%以上"),以增强说服力和权威性。
2. 优化段落过渡以提升连贯性
在三个核心要点之间添加简短的过渡句(如"此外,AI还扩展到健康管理领域……"),
使文章更流畅,避免读起来像独立的要点罗列。
Workflow 使用要点
- 创建 Workflow:
compose.NewWorkflow[InputType, OutputType]() - 定义输入:
AddInput()指定节点的输入来源 - 数据转换:使用 Lambda 节点进行数据格式转换
- 最终输出:
Workflow.End()定义输出节点 - 自动依赖:框架自动处理节点间的依赖关系
进阶用法:
MapFields():精确映射字段SetStaticValue():设置静态值AddDependency():添加执行依赖
🤖 四、ReAct Agent:自主决策的智能体
核心概念
ReAct = Reasoning(推理)+ Acting(行动)
这是最智能的编排方式,LLM 可以自主决定是否需要调用工具、调用哪个工具,以及如何使用工具返回的结果继续推理。整个过程是动态的、不可预测的,直到 LLM 认为可以给出最终答案。
ReAct Agent 的特点:
- ✅ 自主决策:LLM 自主选择工具和执行路径
- ✅ 内置状态管理:自动维护对话历史
- ✅ 流式输出:支持实时输出推理过程
- ✅ 工具调用:无缝集成外部工具
工作流程:
用户问题 → [LLM思考 → 调用工具 → 获取结果]* → 最终回答
└────────── 循环直到完成 ──────────┘
定义工具
首先,我们需要定义一些工具供 Agent 使用。每个工具需要实现 tool.BaseTool 接口。
// WeatherTool 天气查询工具
type WeatherTool struct{}
func (t *WeatherTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
return &schema.ToolInfo{
Name: "get_weather",
Desc: "获取指定城市的天气信息。当用户询问天气相关问题时使用此工具。",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"city": {
Type: schema.String,
Desc: "城市名称,例如:北京、上海、深圳",
Required: true,
},
}),
}, nil
}
func (t *WeatherTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
var args struct {
City string `json:"city"`
}
if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
return "", fmt.Errorf("解析参数失败: %w", err)
}
// 模拟天气数据
weatherData := map[string]map[string]interface{}{
"北京": {"temp": 15, "condition": "晴", "humidity": 45, "wind": "北风3级"},
"上海": {"temp": 20, "condition": "多云", "humidity": 60, "wind": "东风2级"},
"深圳": {"temp": 25, "condition": "阴", "humidity": 70, "wind": "南风2级"},
}
weather, ok := weatherData[args.City]
if !ok {
return fmt.Sprintf("抱歉,暂不支持查询 %s 的天气", args.City), nil
}
return fmt.Sprintf("%s 天气:%s,温度 %d°C,湿度 %d%%,%s",
args.City, weather["condition"], weather["temp"],
weather["humidity"], weather["wind"]), nil
}
// CalculatorTool 计算器工具
type CalculatorTool struct{}
func (t *CalculatorTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
return &schema.ToolInfo{
Name: "calculator",
Desc: "执行数学计算。当用户需要进行数学运算时使用此工具。",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"expression": {
Type: schema.String,
Desc: "数学表达式,例如:2+3、10*5、100/4",
Required: true,
},
}),
}, nil
}
func (t *CalculatorTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
var args struct {
Expression string `json:"expression"`
}
if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
return "", fmt.Errorf("解析参数失败: %w", err)
}
// 简单的计算实现
results := map[string]string{
"2+3": "5", "10*5": "50", "100/4": "25", "25*4": "100",
}
if result, ok := results[args.Expression]; ok {
return fmt.Sprintf("%s = %s", args.Expression, result), nil
}
return fmt.Sprintf("计算 %s 需要更复杂的解析器", args.Expression), nil
}
// TimeTool 时间查询工具
type TimeTool struct{}
func (t *TimeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
return &schema.ToolInfo{
Name: "get_time",
Desc: "获取当前时间或日期。",
ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
"format": {
Type: schema.String,
Desc: "时间格式:datetime/date/time",
Required: false,
},
}),
}, nil
}
func (t *TimeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
var args struct {
Format string `json:"format"`
}
_ = json.Unmarshal([]byte(argumentsInJSON), &args)
now := time.Now()
switch args.Format {
case "date":
return fmt.Sprintf("今天是 %s", now.Format("2006年01月02日")), nil
case "time":
return fmt.Sprintf("现在是 %s", now.Format("15:04:05")), nil
default:
return fmt.Sprintf("现在是 %s", now.Format("2006年01月02日 15:04:05")), nil
}
}
创建 ReAct Agent
package main
import (
"context"
"fmt"
"io"
"log"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/flow/agent/react"
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino-ext/components/model/openai"
)
func main() {
fmt.Println("=== ReAct Agent 示例 ===\n")
ctx := context.Background()
// 创建 ChatModel(DeepSeek API 兼容 OpenAI)
apiKey := "sk-xxx"
baseURL := "https://api.deepseek.com/v1"
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
APIKey: apiKey,
BaseURL: baseURL,
Model: "deepseek-chat",
})
if err != nil {
log.Fatalf("创建 ChatModel 失败: %v", err)
}
// 创建工具列表
tools := []tool.BaseTool{
&WeatherTool{},
&CalculatorTool{},
&TimeTool{},
}
// 创建 ReAct Agent
agent, err := react.NewAgent(ctx, &react.AgentConfig{
ToolCallingModel: chatModel,
ToolsConfig: compose.ToolsNodeConfig{
Tools: tools,
},
MaxStep: 10, // 最大执行步数,防止无限循环
})
if err != nil {
log.Fatalf("创建 Agent 失败: %v", err)
}
// 测试简单工具调用
query1 := "北京今天天气怎么样?"
fmt.Printf("🤔 问题: %s\n\n", query1)
result1, err := agent.Generate(ctx, []*schema.Message{
{Role: schema.User, Content: query1},
})
if err != nil {
log.Fatalf("Agent 执行失败: %v", err)
}
fmt.Println("🤖 回答:")
fmt.Println(result1.Content)
fmt.Println()
}
运行结果:
=== ReAct Agent 示例 ===
🤔 问题: 北京今天天气怎么样?
🤖 回答:
根据查询结果,北京今天的天气情况是:
- 天气:晴
- 温度:15°C
- 湿度:45%
- 风力:北风3级
今天北京天气晴朗,温度适中,是个不错的好天气!
示例:多工具组合
Agent 可以在一次对话中调用多个工具。
query2 := "现在几点了?另外帮我算一下 25*4 等于多少"
fmt.Printf("🤔 问题: %s\n\n", query2)
result2, err := agent.Generate(ctx, []*schema.Message{
{Role: schema.User, Content: query2},
})
if err != nil {
log.Fatalf("Agent 执行失败: %v", err)
}
fmt.Println("🤖 回答:")
fmt.Println(result2.Content)
运行结果:
🤔 问题: 现在几点了?另外帮我算一下 25*4 等于多少
🤖 回答:
根据查询结果:
- 当前时间:2025年11月25日 23:36:25
- 25×4的计算结果:100
示例:流式输出
ReAct Agent 支持流式输出,可以实时看到推理过程。
query3 := "上海和深圳的天气分别怎么样?哪个城市更热?"
fmt.Printf("🤔 问题: %s\n\n", query3)
fmt.Println("🤖 回答(流式):")
stream, err := agent.Stream(ctx, []*schema.Message{
{Role: schema.User, Content: query3},
})
if err != nil {
log.Fatalf("Agent 流式执行失败: %v", err)
}
for {
chunk, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatalf("流式接收错误: %v", err)
}
fmt.Print(chunk.Content)
}
fmt.Println()
运行结果:
🤔 问题: 上海和深圳的天气分别怎么样?哪个城市更热?
🤖 回答(流式):
我来帮您查询上海和深圳的天气情况,然后比较哪个城市更热。
[调用工具: get_weather(city="上海")]
上海天气:多云,温度 20°C,湿度 60%,东风2级
[调用工具: get_weather(city="深圳")]
深圳天气:阴,温度 25°C,湿度 70%,南风2级
根据查询结果:
- 上海:20°C
- 深圳:25°C
深圳更热,比上海高 5°C。
示例:复杂推理
Agent 可以进行多步推理,自主决定是否需要工具。
query4 := "杭州今天天气怎么样?如果下雨的话,我需要带伞吗?"
fmt.Printf("🤔 问题: %s\n\n", query4)
result4, err := agent.Generate(ctx, []*schema.Message{
{Role: schema.User, Content: query4},
})
if err != nil {
log.Fatalf("Agent 执行失败: %v", err)
}
fmt.Println("🤖 回答:")
fmt.Println(result4.Content)
运行结果:
🤔 问题: 杭州今天天气怎么样?如果下雨的话,我需要带伞吗?
🤖 回答:
根据查询结果,杭州今天是小雨天气,温度18°C,湿度80%,东北风1级。
是的,您需要带伞!因为今天杭州有小雨,带伞可以避免被雨淋湿。建议您:
- 随身携带雨伞
- 穿着防水的鞋子
- 注意路面湿滑,小心行走
祝您出行顺利!
ReAct Agent 使用要点
- 创建 Agent:
react.NewAgent()配置模型和工具 - 工具定义:实现
tool.BaseTool接口(Info+InvokableRun) - 生成回答:
agent.Generate()返回最终结果 - 流式输出:
agent.Stream()支持实时显示 - 最大步数:
MaxStep防止无限循环
适用场景:
- 需要实时数据的问答(天气、股票、新闻)
- 需要计算或查询的任务
- 多步骤推理任务
- 智能助手 / Copilot
📊 五、四种方式对比总结
可视化对比
1. Chain(链)
A → B → C → D
- 数据流:上一节点输出直接作为下一节点输入
- 控制流:完全线性,无分支
- 典型用法:
ChatTemplate → ChatModel → OutputParser
2. Graph(图)
┌→ B ─┐
A ──┤ ├→ D
└→ C ─┘
- 数据流:节点间通过边连接
- 控制流:支持条件分支(
AddBranch) - 典型用法:意图识别后分流到技术/通用回复
3. Workflow(工作流)
A.field1 ──→ B.input1
A.field2 ──→ C.input1
B.output ──→ D.param1
C.output ──→ D.param2
- 数据流:字段级精确映射
- 控制流:支持并行、可定义依赖关系
- 典型用法:多模型并行处理、复杂数据组装
4. ReAct Agent(智能体)
用户问题 → [LLM思考 → 调用工具 → 获取结果]* → 最终回答
└────────── 循环直到完成 ──────────┘
- 数据流:由 LLM 自主决定
- 控制流:动态、不可预测、有最大步数限制
- 典型用法:天气查询、计算任务、多步推理
功能特性对比表
| 特性 | Chain | Graph | Workflow | ReAct Agent |
|---|---|---|---|---|
| 复杂度 | ⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 灵活性 | ⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| 条件分支 | ❌ | ✅ | ✅ | ✅(自主) |
| 并行处理 | ❌ | 有限 | ✅ | ❌ |
| 字段映射 | ❌ | ❌ | ✅ | ❌ |
| 工具调用 | 手动 | 手动 | 手动 | ✅(自主) |
| 类型安全 | ✅ | ✅ | ✅ | ✅ |
| 学习曲线 | 低 | 中 | 中高 | 中 |
| 执行可控性 | 完全可控 | 大部分可控 | 完全可控 | 部分可控 |
选择建议
使用 Chain 的场景:
✅ 简单的线性处理流程
✅ 固定的数据转换管道
✅ 快速原型开发
✅ 学习 Eino 框架的入门示例
示例:文本总结、翻译、格式转换
使用 Graph 的场景:
✅ 需要根据条件选择不同处理路径
✅ 有多个分支但数据流相对简单
✅ 需要可视化执行流程
示例:意图识别后的分流处理、多阶段审核流程
使用 Workflow 的场景:
✅ 需要精确控制数据在节点间的流转
✅ 一个节点需要从多个来源获取数据
✅ 需要并行处理多个任务
✅ 复杂的数据组装和转换
示例:多模型协同、复杂报告生成、数据聚合处理
使用 ReAct Agent 的场景:
✅ 需要访问外部工具或 API
✅ 任务需要多步推理
✅ 无法预先确定执行路径
✅ 希望 LLM 自主决策
示例:智能助手、代码生成、数据分析、实时信息查询
一句话总结
- Chain:流水线 —— 简单直接,适合线性任务
- Graph:路由器 —— 条件分流,灵活可控
- Workflow:数据总线 —— 精细映射,复杂编排
- ReAct:自主决策 —— LLM 驱动,智能执行
🎯 六、最佳实践建议
1. 从简单开始,逐步升级
- 先用 Chain 实现基本功能
- 需要分支时升级到 Graph
- 数据流复杂时考虑 Workflow
- 需要智能决策时使用 ReAct Agent
2. 类型安全很重要
所有编排方式都支持 Go 的泛型,充分利用编译时类型检查:
// 明确指定输入输出类型
chain := compose.NewChain[map[string]any, *schema.Message]()
graph := compose.NewGraph[InputType, OutputType]()
workflow := compose.NewWorkflow[InputType, OutputType]()
3. 合理使用 Lambda
Lambda 节点非常强大,可以:
- 数据预处理和后处理
- 格式转换
- 条件判断
- 错误处理
lambda := compose.InvokableLambda(func(ctx context.Context, input InputType) (OutputType, error) {
// 自定义逻辑
return output, nil
})
4. 工具设计原则
设计 ReAct Agent 的工具时:
- ✅ 功能单一明确
- ✅ 参数描述清晰
- ✅ 返回结果结构化
- ✅ 错误处理完善
5. 性能优化
- 使用流式输出提升用户体验
- 合理设置
MaxStep避免无限循环 - 考虑使用缓存减少重复调用
- 并行处理可以提高效率(Workflow)
📚 总结
本文通过实战代码详细介绍了 Eino 框架的四种 AI 集成方式。每种方式都有其适用场景:
- Chain 适合简单的线性流程
- Graph 适合需要条件分支的场景
- Workflow 适合复杂的数据编排
- ReAct Agent 适合需要智能决策的场景
选择合适的编排方式,可以让你的 AI 应用开发事半功倍。希望这篇文章能帮助你更好地理解和使用 Eino 框架!
🔗 相关资源
如果觉得这篇文章有帮助,欢迎点赞、收藏和分享! 🌟