Eino 框架实战指南:四种 AI 集成方式完全解析

发表信息: by

Eino 框架实战指南:四种 AI 集成方式完全解析

📖 前言

在 AI 应用开发中,如何优雅地编排 LLM(大语言模型)与各种组件的交互流程是一个关键问题。Eino 框架提供了四种强大的编排方式,从简单的线性流程到复杂的智能决策,满足不同场景的需求。

本文将通过实战代码和运行结果,带你深入理解这四种编排方式:

  • Chain(链):线性流水线,简单直接
  • Graph(图):支持分支的图结构,灵活路由
  • Workflow(工作流):字段级数据映射,精细控制
  • ReAct Agent(智能体):LLM 自主决策,动态执行

🔗 一、Chain:线性编排的艺术

核心概念

Chain 是 Eino 中最基础的编排方式,它将多个组件按照固定顺序串联起来,形成一条数据处理流水线。就像工厂的生产线一样,每个工位(节点)完成自己的任务后,将结果传递给下一个工位。

Chain 的特点:

  • ✅ 类型安全:编译时检查输入输出类型
  • ✅ 自动流式处理:无需手动处理流式转换
  • ✅ 声明式编程:通过 Append 方法链式构建
  • ✅ 简单直观:适合线性处理流程

示例 1:简单的内容总结 Chain

这个示例展示了如何创建一个基本的 Chain,将 ChatTemplate 和 ChatModel 串联起来,实现文本总结功能。

package main

import (
        "context"
        "fmt"
        "log"

        "github.com/cloudwego/eino/components/prompt"
        "github.com/cloudwego/eino/compose"
        "github.com/cloudwego/eino/schema"

        "eino-deepseek-demo/pkg/deepseek"
)

func main() {
        fmt.Println("=== Chain 示例:内容总结 ===\n")

        // 创建 ChatModel
        apiKey := "sk-xxx"
        chatModel, err := deepseek.NewChatModel(apiKey, "")
        if err != nil {
                log.Fatalf("创建 ChatModel 失败: %v", err)
        }

        ctx := context.Background()

        // 创建 Prompt Template
        summarizeTemplate := prompt.FromMessages(
                schema.FString,
                schema.SystemMessage("你是一个专业的内容总结助手。"),
                schema.UserMessage("请用一句话总结以下内容:\n{content}"),
        )

        // 创建 Chain: ChatTemplate -> ChatModel
        chain := compose.NewChain[map[string]any, *schema.Message]()
        chain.AppendChatTemplate(summarizeTemplate)
        chain.AppendChatModel(chatModel)

        // 编译 Chain
        runnable, err := chain.Compile(ctx)
        if err != nil {
                log.Fatalf("编译 Chain 失败: %v", err)
        }

        // 准备输入
        input := map[string]any{
                "content": `Go 语言是 Google 开发的一种静态强类型、编译型语言。
它具有简洁的语法、高效的并发处理能力和快速的编译速度。
Go 语言特别适合构建微服务、云原生应用和高性能网络服务。`,
        }

        fmt.Println("📥 输入内容:")
        fmt.Println(input["content"])
        fmt.Println()

        // 执行 Chain
        result, err := runnable.Invoke(ctx, input)
        if err != nil {
                log.Fatalf("执行 Chain 失败: %v", err)
        }

        fmt.Println("📤 总结结果:")
        fmt.Println(result.Content)
}

运行结果:

=== Chain 示例:内容总结 ===

📥 输入内容:
Go 语言是 Google 开发的一种静态强类型、编译型语言。
它具有简洁的语法、高效的并发处理能力和快速的编译速度。
Go 语言特别适合构建微服务、云原生应用和高性能网络服务。

📤 总结结果:
Go 语言是由 Google 开发的一种静态强类型编译型语言,以其简洁语法、高效并发和快速编译而著称,特别适合构建微服务、云原生应用和高性能网络服务。

示例 2:多步骤 Chain(翻译 → 总结)

更复杂的场景可能需要多个步骤,这个示例展示如何将两个 Chain 串联起来,先翻译再总结。

// 第一步:翻译
translateTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个专业的翻译助手。"),
        schema.UserMessage("请将以下英文翻译成中文:\n{text}"),
)

translateChain := compose.NewChain[map[string]any, *schema.Message]()
translateChain.AppendChatTemplate(translateTemplate)
translateChain.AppendChatModel(chatModel)

translateRunnable, err := translateChain.Compile(ctx)
if err != nil {
        log.Fatalf("编译翻译 Chain 失败: %v", err)
}

// 英文输入
englishInput := map[string]any{
        "text": `Artificial Intelligence is transforming how we work and live.
Machine learning algorithms can now process vast amounts of data,
enabling applications from self-driving cars to personalized medicine.`,
}

// 执行翻译
translateResult, err := translateRunnable.Invoke(ctx, englishInput)
if err != nil {
        log.Fatalf("翻译失败: %v", err)
}

// 第二步:总结翻译结果
summaryTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个专业的内容总结助手。"),
        schema.UserMessage("请用一句话总结以下中文内容:\n{text}"),
)

summaryChain := compose.NewChain[map[string]any, *schema.Message]()
summaryChain.AppendChatTemplate(summaryTemplate)
summaryChain.AppendChatModel(chatModel)

summaryRunnable, err := summaryChain.Compile(ctx)
if err != nil {
        log.Fatalf("编译总结 Chain 失败: %v", err)
}

summaryInput := map[string]any{
        "text": translateResult.Content,
}

summaryResult, err := summaryRunnable.Invoke(ctx, summaryInput)
if err != nil {
        log.Fatalf("总结失败: %v", err)
}

运行结果:

📥 英文输入:
Artificial Intelligence is transforming how we work and live.
Machine learning algorithms can now process vast amounts of data,
enabling applications from self-driving cars to personalized medicine.

📝 翻译结果:
人工智能正在改变我们的工作与生活方式。
机器学习算法如今能够处理海量数据,
实现了从自动驾驶汽车到个性化医疗的各种应用。

📤 最终总结:
人工智能通过机器学习处理海量数据,正在变革我们的工作与生活方式,并实现从自动驾驶到个性化医疗的各种应用。

示例 3:带 Lambda 的 Chain(自定义处理)

Lambda 节点允许我们在 Chain 中插入自定义处理逻辑,这对于数据预处理和后处理非常有用。

// 创建情感分析 Chain
sentimentTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个情感分析专家。只回答:积极、消极 或 中性。"),
        schema.UserMessage("请分析以下文本的情感倾向:\n{text}"),
)

// 预处理 Lambda
preprocessLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
        text := input["text"].(string)
        processed := "[待分析文本] " + text
        return map[string]any{"text": processed}, nil
})

// 后处理 Lambda
postprocessLambda := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
        return fmt.Sprintf("情感分析结果: %s", msg.Content), nil
})

// 构建 Chain: 预处理 -> Template -> Model -> 后处理
sentimentChain := compose.NewChain[map[string]any, string]()
sentimentChain.AppendLambda(preprocessLambda)
sentimentChain.AppendChatTemplate(sentimentTemplate)
sentimentChain.AppendChatModel(chatModel)
sentimentChain.AppendLambda(postprocessLambda)

sentimentRunnable, err := sentimentChain.Compile(ctx)
if err != nil {
        log.Fatalf("编译情感分析 Chain 失败: %v", err)
}

testTexts := []string{
        "这个产品真的太棒了!我非常满意!",
        "服务态度很差,我很失望。",
        "今天天气不错,适合出门。",
}

for _, text := range testTexts {
        result, err := sentimentRunnable.Invoke(ctx, map[string]any{"text": text})
        if err != nil {
                log.Printf("分析失败: %v", err)
                continue
        }
        fmt.Printf("📥 输入: %s\n📤 %s\n\n", text, result)
}

运行结果:

📥 输入: 这个产品真的太棒了!我非常满意!
📤 情感分析结果: 积极

📥 输入: 服务态度很差,我很失望。
📤 情感分析结果: 消极

📥 输入: 今天天气不错,适合出门。
📤 情感分析结果: 积极

Chain 使用要点

  1. 创建 Chaincompose.NewChain[InputType, OutputType]()
  2. 添加节点
    • AppendChatTemplate() - 添加提示词模板
    • AppendChatModel() - 添加 LLM 模型
    • AppendLambda() - 添加自定义处理函数
  3. 编译执行Compile() 编译 Chain,Invoke() 执行
  4. 类型安全:编译时检查节点间类型兼容性

🌐 二、Graph:灵活的条件路由

核心概念

Graph 突破了 Chain 的线性限制,支持构建有向图结构。通过条件分支(Branch),可以根据运行时状态动态选择执行路径,实现更复杂的业务逻辑。

Graph 的特点:

  • ✅ 灵活的图结构:支持分支和合并
  • ✅ 条件路由:根据运行时状态动态选择路径
  • ✅ 类型安全:编译时检查节点间类型兼容性
  • ✅ 显式控制:通过 AddEdge 清晰定义执行流程

示例 1:简单 Graph(线性流程)

即使是简单的线性流程,使用 Graph 也能带来更好的可维护性和扩展性。

package main

import (
        "context"
        "fmt"
        "log"

        "github.com/cloudwego/eino/components/prompt"
        "github.com/cloudwego/eino/compose"
        "github.com/cloudwego/eino/schema"

        "eino-deepseek-demo/pkg/deepseek"
)

func main() {
        fmt.Println("=== Graph 示例:简单线性流程 ===\n")

        apiKey := "sk-xxx"
        chatModel, err := deepseek.NewChatModel(apiKey, "")
        if err != nil {
                log.Fatalf("创建 ChatModel 失败: %v", err)
        }

        ctx := context.Background()

        // 创建 Graph
        simpleGraph := compose.NewGraph[map[string]any, *schema.Message]()

        // 创建 Prompt Template
        qaTemplate := prompt.FromMessages(
                schema.FString,
                schema.SystemMessage("你是一个知识渊博的助手。"),
                schema.UserMessage("{question}"),
        )

        // 添加节点
        _ = simpleGraph.AddChatTemplateNode("prompt", qaTemplate)
        _ = simpleGraph.AddChatModelNode("model", chatModel)

        // 添加边:定义执行流程
        // START -> prompt -> model -> END
        _ = simpleGraph.AddEdge(compose.START, "prompt")
        _ = simpleGraph.AddEdge("prompt", "model")
        _ = simpleGraph.AddEdge("model", compose.END)

        // 编译 Graph
        simpleRunnable, err := simpleGraph.Compile(ctx)
        if err != nil {
                log.Fatalf("编译 Graph 失败: %v", err)
        }

        // 执行
        question := "什么是 Kubernetes?"
        fmt.Printf("📥 问题: %s\n\n", question)

        result, err := simpleRunnable.Invoke(ctx, map[string]any{"question": question})
        if err != nil {
                log.Fatalf("执行失败: %v", err)
        }

        fmt.Println("📤 回答:")
        fmt.Println(result.Content)
}

运行结果:

=== Graph 示例:简单线性流程 ===

📥 问题: 什么是 Kubernetes?

📤 回答:
Kubernetes(通常简称为 K8s)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。
它最初由 Google 设计并开源,现在由云原生计算基金会(CNCF)维护。

核心功能与特点:
1. 自动化容器部署与管理 - 自动部署、重启、扩展或销毁容器
2. 服务发现与负载均衡 - 自动分配 IP 和 DNS,智能分发流量
3. 存储编排 - 支持本地存储、云存储等多种存储方案
4. 自我修复 - 自动重启故障容器,确保高可用性
5. 水平扩展 - 根据负载自动调整容器实例数量

Kubernetes 已成为云原生技术的基石,被广泛应用于企业级应用和现代软件开发中。

示例 2:带分支的 Graph(条件路由)

这是 Graph 最强大的功能:根据条件动态选择执行路径。这个示例实现了一个智能问答路由系统。

流程图:

                    ┌──→ [tech] ────┐
[START] → [classifier]             ──→ [formatter] → [END]
                    └──→ [general] ─┘
const (
        NodeClassifier = "classifier"
        NodeTech       = "tech"
        NodeGeneral    = "general"
        NodeFormatter  = "formatter"
)

branchGraph := compose.NewGraph[map[string]any, string]()

// 分类器 Lambda - 判断问题类型
classifierLambda := compose.InvokableLambda(func(ctx context.Context, input map[string]any) (map[string]any, error) {
        question := input["question"].(string)
        
        // 简单的关键词分类逻辑
        techKeywords := []string{"代码", "编程", "API", "数据库", "Kubernetes", "Docker", "Go"}
        isTech := false
        for _, kw := range techKeywords {
                if contains(question, kw) {
                        isTech = true
                        break
                }
        }
        
        return map[string]any{
                "question": question,
                "is_tech":  isTech,
        }, nil
})

// 技术问题处理模板
techTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个资深技术专家,擅长解释技术概念。请用专业但易懂的方式回答。"),
        schema.UserMessage("{question}"),
)

// 通用问题处理模板
generalTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个友善的助手,用通俗易懂的语言回答问题。"),
        schema.UserMessage("{question}"),
)

// 格式化 Lambda
formatterLambda := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
        return fmt.Sprintf("【回答】\n%s", msg.Content), nil
})

// 添加节点
_ = branchGraph.AddLambdaNode(NodeClassifier, classifierLambda)
_ = branchGraph.AddChatTemplateNode(NodeTech, techTemplate)
_ = branchGraph.AddChatTemplateNode(NodeGeneral, generalTemplate)
_ = branchGraph.AddChatModelNode("tech_model", chatModel)
_ = branchGraph.AddChatModelNode("general_model", chatModel)
_ = branchGraph.AddLambdaNode(NodeFormatter, formatterLambda)

// 添加边
_ = branchGraph.AddEdge(compose.START, NodeClassifier)

// 添加条件分支
_ = branchGraph.AddBranch(NodeClassifier, compose.NewGraphBranch(
        // 条件函数:根据分类结果选择路径
        func(ctx context.Context, input map[string]any) (string, error) {
                isTech := input["is_tech"].(bool)
                if isTech {
                        return NodeTech, nil
                }
                return NodeGeneral, nil
        },
        // 分支映射
        map[string]bool{
                NodeTech:    true,
                NodeGeneral: true,
        },
))

// 技术分支流程
_ = branchGraph.AddEdge(NodeTech, "tech_model")
_ = branchGraph.AddEdge("tech_model", NodeFormatter)

// 通用分支流程
_ = branchGraph.AddEdge(NodeGeneral, "general_model")
_ = branchGraph.AddEdge("general_model", NodeFormatter)

// 输出
_ = branchGraph.AddEdge(NodeFormatter, compose.END)

// 编译并测试
branchRunnable, err := branchGraph.Compile(ctx)
if err != nil {
        log.Fatalf("编译失败: %v", err)
}

testQuestions := []string{
        "如何在 Go 语言中实现并发?",
        "今天适合出门旅游吗?",
        "Docker 和 Kubernetes 有什么区别?",
}

for _, q := range testQuestions {
        fmt.Printf("📥 问题: %s\n", q)
        result, err := branchRunnable.Invoke(ctx, map[string]any{"question": q})
        if err != nil {
                log.Printf("执行失败: %v\n", err)
                continue
        }
        fmt.Printf("📤 %s\n\n", result)
}

运行结果:

📥 问题: 如何在 Go 语言中实现并发?
📤 【回答】
在 Go 语言中,实现并发主要依赖于 goroutine 和 channel 这两个核心特性。

1. Goroutine - 轻量级线程
使用 go 关键字启动 goroutine:

go func() {
    fmt.Println("这是在 goroutine 中执行")
}()

2. Channel - 通信机制
Channel 用于在 goroutine 之间安全地传递数据:

ch := make(chan int)
go func() {
    ch <- 42  // 发送数据
}()
value := <-ch  // 接收数据

3. 同步原语
- WaitGroup:等待多个 goroutine 完成
- Mutex:互斥锁保护共享资源
- Select:多路复用 channel 操作

Go 的并发模型基于 CSP 理论,强调通过通信共享内存,使并发编程更加简单和安全。

---
📥 问题: 今天适合出门旅游吗?
📤 【回答】
这个问题需要根据您所在地区的天气情况来决定!建议您先查看当地天气预报,考虑以下几点:

1. 天气状况:晴天、温度适宜最佳
2. 行程安排:确保目的地没有自然灾害预警
3. 交通状况:避免拥堵和延误
4. 健康安全:考虑天气对身体的影响

如果天气不错,行程顺利,那就放心出门享受美好的一天吧!祝您旅途愉快! 🌞🚗🎒

---
📥 问题: Docker 和 Kubernetes 有什么区别?
📤 【回答】
Docker 和 Kubernetes 是容器化生态系统中两个关键但功能不同的技术:

## Docker:容器化引擎
- 构建容器镜像
- 运行单个容器
- 打包和分发应用

## Kubernetes:容器编排系统
- 管理容器集群
- 自动化运维(扩缩容、故障恢复)
- 声明式配置

## 关键区别
| 维度 | Docker | Kubernetes |
|------|--------|------------|
| 核心功能 | 构建和运行单个容器 | 编排和管理容器集群 |
| 适用规模 | 单机或小规模 | 大规模生产环境 |
| 学习曲线 | 相对简单 | 较陡峭 |

它们不是竞争关系,而是互补的:Docker 负责应用打包,Kubernetes 负责部署和运维。

Graph 使用要点

  1. 创建 Graphcompose.NewGraph[InputType, OutputType]()
  2. 添加节点AddXxxNode() 添加各类节点
  3. 定义边AddEdge() 连接节点
  4. 条件分支AddBranch() 实现动态路由
  5. 特殊节点compose.STARTcompose.END

🔄 三、Workflow:精细的数据编排

核心概念

Workflow 是最高级的编排方式,支持字段级别的数据映射。它可以精确控制数据如何从一个节点流向另一个节点,一个节点可以从多个前置节点获取不同字段的数据。

Workflow 的特点:

  • ✅ 字段级映射:精确控制数据流转
  • ✅ 多输入合并:从多个节点获取数据
  • ✅ 静态值注入:设置固定输入值
  • ✅ 自动依赖管理:框架处理执行顺序

示例 1:简单 Workflow(字段映射)

package main

import (
        "context"
        "fmt"
        "log"

        "github.com/cloudwego/eino/components/prompt"
        "github.com/cloudwego/eino/compose"
        "github.com/cloudwego/eino/schema"

        "eino-deepseek-demo/pkg/deepseek"
)

func main() {
        fmt.Println("=== Workflow 示例:字段映射 ===\n")

        apiKey := "sk-xxx"
        chatModel, err := deepseek.NewChatModel(apiKey, "")
        if err != nil {
                log.Fatalf("创建 ChatModel 失败: %v", err)
        }

        ctx := context.Background()

        simpleWorkflow := compose.NewWorkflow[map[string]any, *schema.Message]()

        qaTemplate := prompt.FromMessages(
                schema.FString,
                schema.SystemMessage("你是一个专业的助手。"),
                schema.UserMessage("{question}"),
        )

        // 添加节点并定义输入映射
        simpleWorkflow.AddChatTemplateNode("prompt", qaTemplate).
                AddInput(compose.START)  // 从 START 获取输入

        simpleWorkflow.AddChatModelNode("model", chatModel).
                AddInput("prompt")  // 从 prompt 节点获取输入

        simpleWorkflow.End().
                AddInput("model")  // 将 model 的输出作为最终输出

        // 编译执行
        simpleRunnable, err := simpleWorkflow.Compile(ctx)
        if err != nil {
                log.Fatalf("编译失败: %v", err)
        }

        question := "请简要介绍 Go 语言的并发模型"
        fmt.Printf("📥 问题: %s\n\n", question)

        result, err := simpleRunnable.Invoke(ctx, map[string]any{"question": question})
        if err != nil {
                log.Fatalf("执行失败: %v", err)
        }

        fmt.Println("📤 回答:")
        fmt.Println(result.Content)
}

运行结果:

=== Workflow 示例:字段映射 ===

📥 问题: 请简要介绍 Go 语言的并发模型

📤 回答:
Go 语言的并发模型基于 CSP(Communicating Sequential Processes)理论,
由 goroutine 和 channel 构成:

1. Goroutine(轻量级线程)
   - 由 Go 运行时管理,资源占用少
   - 使用 go 关键字启动

2. Channel(通道)
   - 用于 goroutine 间的安全数据传递
   - 分为无缓冲通道和带缓冲通道

3. Select 多路复用
   - 监听多个 channel 操作

这种模型简化了并发编程,兼具高效与安全性。

示例 2:多步骤 Workflow(文章生成流水线)

这个示例展示了如何构建一个复杂的文章生成流水线,包含大纲生成、内容撰写和审核三个步骤。

流程图:

[START] ──────────────────────────────────────────┐
   │                                              │
   ↓                                              │
[outline] ─→ [content] ─→ [review] ─→ [format] ─→ [END]
   │              ↑           ↑           ↑
   └──────────────┴───────────┴───────────┘
// 大纲生成模板
outlineTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个专业的内容策划师。"),
        schema.UserMessage("请为主题「{topic}」生成一个简洁的文章大纲(3个要点)。\n风格要求:{style}"),
)

// 内容生成模板
contentTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个专业的内容写作者。"),
        schema.UserMessage("请根据以下大纲撰写文章内容(每个要点约50字):\n\n大纲:\n{outline}"),
)

// 审核模板
reviewTemplate := prompt.FromMessages(
        schema.FString,
        schema.SystemMessage("你是一个专业的内容审核编辑。"),
        schema.UserMessage("请审核以下文章,提出改进建议(1-2条):\n\n{content}"),
)

articleWorkflow := compose.NewWorkflow[map[string]any, *schema.Message]()

// 步骤 1: 生成大纲
articleWorkflow.AddChatTemplateNode("outline_prompt", outlineTemplate).
        AddInput(compose.START)
articleWorkflow.AddChatModelNode("outline_model", chatModel).
        AddInput("outline_prompt")

// 步骤 2: 生成内容(使用 Lambda 提取大纲)
extractOutline := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (map[string]any, error) {
        return map[string]any{"outline": msg.Content}, nil
})
articleWorkflow.AddLambdaNode("extract_outline", extractOutline).
        AddInput("outline_model")

articleWorkflow.AddChatTemplateNode("content_prompt", contentTemplate).
        AddInput("extract_outline")
articleWorkflow.AddChatModelNode("content_model", chatModel).
        AddInput("content_prompt")

// 步骤 3: 审核内容
extractContent := compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (map[string]any, error) {
        return map[string]any{"content": msg.Content}, nil
})
articleWorkflow.AddLambdaNode("extract_content", extractContent).
        AddInput("content_model")

articleWorkflow.AddChatTemplateNode("review_prompt", reviewTemplate).
        AddInput("extract_content")
articleWorkflow.AddChatModelNode("review_model", chatModel).
        AddInput("review_prompt")

// 最终输出
articleWorkflow.End().AddInput("review_model")

// 编译执行
articleRunnable, err := articleWorkflow.Compile(ctx)
if err != nil {
        log.Fatalf("编译失败: %v", err)
}

topic := "人工智能在医疗领域的应用"
style := "专业但易懂"

fmt.Printf("📥 主题: %s\n", topic)
fmt.Printf("📥 风格: %s\n\n", style)
fmt.Println("⏳ 正在生成文章(大纲 → 内容 → 审核)...\n")

articleResult, err := articleRunnable.Invoke(ctx, map[string]any{
        "topic": topic,
        "style": style,
})
if err != nil {
        log.Fatalf("文章生成失败: %v", err)
}

fmt.Println("📤 审核建议:")
fmt.Println(articleResult.Content)

运行结果:

📥 主题: 人工智能在医疗领域的应用
📥 风格: 专业但易懂

⏳ 正在生成文章(大纲 → 内容 → 审核)...

📤 审核建议:
根据您提供的文章内容,整体结构清晰、专业且易于理解,但可以在以下方面改进:

1. 增强例子的具体性和数据支持
   在"诊断辅助与影像分析""治疗优化与个性化医疗"部分,添加具体数据或研究引用
   (如"AI在肺癌筛查中的准确率可达95%以上"),以增强说服力和权威性。

2. 优化段落过渡以提升连贯性
   在三个核心要点之间添加简短的过渡句(如"此外,AI还扩展到健康管理领域……"),
   使文章更流畅,避免读起来像独立的要点罗列。

Workflow 使用要点

  1. 创建 Workflowcompose.NewWorkflow[InputType, OutputType]()
  2. 定义输入AddInput() 指定节点的输入来源
  3. 数据转换:使用 Lambda 节点进行数据格式转换
  4. 最终输出Workflow.End() 定义输出节点
  5. 自动依赖:框架自动处理节点间的依赖关系

进阶用法:

  • MapFields():精确映射字段
  • SetStaticValue():设置静态值
  • AddDependency():添加执行依赖

🤖 四、ReAct Agent:自主决策的智能体

核心概念

ReAct = Reasoning(推理)+ Acting(行动)

这是最智能的编排方式,LLM 可以自主决定是否需要调用工具、调用哪个工具,以及如何使用工具返回的结果继续推理。整个过程是动态的、不可预测的,直到 LLM 认为可以给出最终答案。

ReAct Agent 的特点:

  • ✅ 自主决策:LLM 自主选择工具和执行路径
  • ✅ 内置状态管理:自动维护对话历史
  • ✅ 流式输出:支持实时输出推理过程
  • ✅ 工具调用:无缝集成外部工具

工作流程:

用户问题 → [LLM思考 → 调用工具 → 获取结果]* → 最终回答
           └────────── 循环直到完成 ──────────┘

定义工具

首先,我们需要定义一些工具供 Agent 使用。每个工具需要实现 tool.BaseTool 接口。

// WeatherTool 天气查询工具
type WeatherTool struct{}

func (t *WeatherTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
        return &schema.ToolInfo{
                Name: "get_weather",
                Desc: "获取指定城市的天气信息。当用户询问天气相关问题时使用此工具。",
                ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
                        "city": {
                                Type:     schema.String,
                                Desc:     "城市名称,例如:北京、上海、深圳",
                                Required: true,
                        },
                }),
        }, nil
}

func (t *WeatherTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
        var args struct {
                City string `json:"city"`
        }
        if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
                return "", fmt.Errorf("解析参数失败: %w", err)
        }

        // 模拟天气数据
        weatherData := map[string]map[string]interface{}{
                "北京": {"temp": 15, "condition": "晴", "humidity": 45, "wind": "北风3级"},
                "上海": {"temp": 20, "condition": "多云", "humidity": 60, "wind": "东风2级"},
                "深圳": {"temp": 25, "condition": "阴", "humidity": 70, "wind": "南风2级"},
        }

        weather, ok := weatherData[args.City]
        if !ok {
                return fmt.Sprintf("抱歉,暂不支持查询 %s 的天气", args.City), nil
        }

        return fmt.Sprintf("%s 天气:%s,温度 %d°C,湿度 %d%%,%s",
                args.City, weather["condition"], weather["temp"], 
                weather["humidity"], weather["wind"]), nil
}

// CalculatorTool 计算器工具
type CalculatorTool struct{}

func (t *CalculatorTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
        return &schema.ToolInfo{
                Name: "calculator",
                Desc: "执行数学计算。当用户需要进行数学运算时使用此工具。",
                ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
                        "expression": {
                                Type:     schema.String,
                                Desc:     "数学表达式,例如:2+3、10*5、100/4",
                                Required: true,
                        },
                }),
        }, nil
}

func (t *CalculatorTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
        var args struct {
                Expression string `json:"expression"`
        }
        if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil {
                return "", fmt.Errorf("解析参数失败: %w", err)
        }

        // 简单的计算实现
        results := map[string]string{
                "2+3": "5", "10*5": "50", "100/4": "25", "25*4": "100",
        }

        if result, ok := results[args.Expression]; ok {
                return fmt.Sprintf("%s = %s", args.Expression, result), nil
        }

        return fmt.Sprintf("计算 %s 需要更复杂的解析器", args.Expression), nil
}

// TimeTool 时间查询工具
type TimeTool struct{}

func (t *TimeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
        return &schema.ToolInfo{
                Name: "get_time",
                Desc: "获取当前时间或日期。",
                ParamsOneOf: schema.NewParamsOneOfByParams(map[string]*schema.ParameterInfo{
                        "format": {
                                Type:     schema.String,
                                Desc:     "时间格式:datetime/date/time",
                                Required: false,
                        },
                }),
        }, nil
}

func (t *TimeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
        var args struct {
                Format string `json:"format"`
        }
        _ = json.Unmarshal([]byte(argumentsInJSON), &args)

        now := time.Now()
        switch args.Format {
        case "date":
                return fmt.Sprintf("今天是 %s", now.Format("2006年01月02日")), nil
        case "time":
                return fmt.Sprintf("现在是 %s", now.Format("15:04:05")), nil
        default:
                return fmt.Sprintf("现在是 %s", now.Format("2006年01月02日 15:04:05")), nil
        }
}

创建 ReAct Agent

package main

import (
        "context"
        "fmt"
        "io"
        "log"

        "github.com/cloudwego/eino/components/tool"
        "github.com/cloudwego/eino/compose"
        "github.com/cloudwego/eino/flow/agent/react"
        "github.com/cloudwego/eino/schema"
        "github.com/cloudwego/eino-ext/components/model/openai"
)

func main() {
        fmt.Println("=== ReAct Agent 示例 ===\n")

        ctx := context.Background()

        // 创建 ChatModel(DeepSeek API 兼容 OpenAI)
        apiKey := "sk-xxx"
        baseURL := "https://api.deepseek.com/v1"

        chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
                APIKey:  apiKey,
                BaseURL: baseURL,
                Model:   "deepseek-chat",
        })
        if err != nil {
                log.Fatalf("创建 ChatModel 失败: %v", err)
        }

        // 创建工具列表
        tools := []tool.BaseTool{
                &WeatherTool{},
                &CalculatorTool{},
                &TimeTool{},
        }

        // 创建 ReAct Agent
        agent, err := react.NewAgent(ctx, &react.AgentConfig{
                ToolCallingModel: chatModel,
                ToolsConfig: compose.ToolsNodeConfig{
                        Tools: tools,
                },
                MaxStep: 10,  // 最大执行步数,防止无限循环
        })
        if err != nil {
                log.Fatalf("创建 Agent 失败: %v", err)
        }

        // 测试简单工具调用
        query1 := "北京今天天气怎么样?"
        fmt.Printf("🤔 问题: %s\n\n", query1)

        result1, err := agent.Generate(ctx, []*schema.Message{
                {Role: schema.User, Content: query1},
        })
        if err != nil {
                log.Fatalf("Agent 执行失败: %v", err)
        }

        fmt.Println("🤖 回答:")
        fmt.Println(result1.Content)
        fmt.Println()
}

运行结果:

=== ReAct Agent 示例 ===

🤔 问题: 北京今天天气怎么样?

🤖 回答:
根据查询结果,北京今天的天气情况是:
- 天气:晴
- 温度:15°C
- 湿度:45%
- 风力:北风3级

今天北京天气晴朗,温度适中,是个不错的好天气!

示例:多工具组合

Agent 可以在一次对话中调用多个工具。

query2 := "现在几点了?另外帮我算一下 25*4 等于多少"
fmt.Printf("🤔 问题: %s\n\n", query2)

result2, err := agent.Generate(ctx, []*schema.Message{
        {Role: schema.User, Content: query2},
})
if err != nil {
        log.Fatalf("Agent 执行失败: %v", err)
}

fmt.Println("🤖 回答:")
fmt.Println(result2.Content)

运行结果:

🤔 问题: 现在几点了?另外帮我算一下 25*4 等于多少

🤖 回答:
根据查询结果:

- 当前时间:2025年11月25日 23:36:25
- 25×4的计算结果:100

示例:流式输出

ReAct Agent 支持流式输出,可以实时看到推理过程。

query3 := "上海和深圳的天气分别怎么样?哪个城市更热?"
fmt.Printf("🤔 问题: %s\n\n", query3)
fmt.Println("🤖 回答(流式):")

stream, err := agent.Stream(ctx, []*schema.Message{
        {Role: schema.User, Content: query3},
})
if err != nil {
        log.Fatalf("Agent 流式执行失败: %v", err)
}

for {
        chunk, err := stream.Recv()
        if err != nil {
                if err == io.EOF {
                        break
                }
                log.Fatalf("流式接收错误: %v", err)
        }
        fmt.Print(chunk.Content)
}
fmt.Println()

运行结果:

🤔 问题: 上海和深圳的天气分别怎么样?哪个城市更热?

🤖 回答(流式):
我来帮您查询上海和深圳的天气情况,然后比较哪个城市更热。

[调用工具: get_weather(city="上海")]
上海天气:多云,温度 20°C,湿度 60%,东风2级

[调用工具: get_weather(city="深圳")]
深圳天气:阴,温度 25°C,湿度 70%,南风2级

根据查询结果:
- 上海:20°C
- 深圳:25°C

深圳更热,比上海高 5°C。

示例:复杂推理

Agent 可以进行多步推理,自主决定是否需要工具。

query4 := "杭州今天天气怎么样?如果下雨的话,我需要带伞吗?"
fmt.Printf("🤔 问题: %s\n\n", query4)

result4, err := agent.Generate(ctx, []*schema.Message{
        {Role: schema.User, Content: query4},
})
if err != nil {
        log.Fatalf("Agent 执行失败: %v", err)
}

fmt.Println("🤖 回答:")
fmt.Println(result4.Content)

运行结果:

🤔 问题: 杭州今天天气怎么样?如果下雨的话,我需要带伞吗?

🤖 回答:
根据查询结果,杭州今天是小雨天气,温度18°C,湿度80%,东北风1级。

是的,您需要带伞!因为今天杭州有小雨,带伞可以避免被雨淋湿。建议您:

- 随身携带雨伞
- 穿着防水的鞋子
- 注意路面湿滑,小心行走

祝您出行顺利!

ReAct Agent 使用要点

  1. 创建 Agentreact.NewAgent() 配置模型和工具
  2. 工具定义:实现 tool.BaseTool 接口(Info + InvokableRun
  3. 生成回答agent.Generate() 返回最终结果
  4. 流式输出agent.Stream() 支持实时显示
  5. 最大步数MaxStep 防止无限循环

适用场景:

  • 需要实时数据的问答(天气、股票、新闻)
  • 需要计算或查询的任务
  • 多步骤推理任务
  • 智能助手 / Copilot

📊 五、四种方式对比总结

可视化对比

1. Chain(链)

A → B → C → D
  • 数据流:上一节点输出直接作为下一节点输入
  • 控制流:完全线性,无分支
  • 典型用法ChatTemplate → ChatModel → OutputParser

2. Graph(图)

     ┌→ B ─┐
A ──┤      ├→ D
     └→ C ─┘
  • 数据流:节点间通过边连接
  • 控制流:支持条件分支(AddBranch
  • 典型用法:意图识别后分流到技术/通用回复

3. Workflow(工作流)

A.field1 ──→ B.input1
A.field2 ──→ C.input1
B.output ──→ D.param1
C.output ──→ D.param2
  • 数据流:字段级精确映射
  • 控制流:支持并行、可定义依赖关系
  • 典型用法:多模型并行处理、复杂数据组装

4. ReAct Agent(智能体)

用户问题 → [LLM思考 → 调用工具 → 获取结果]* → 最终回答
          └────────── 循环直到完成 ──────────┘
  • 数据流:由 LLM 自主决定
  • 控制流:动态、不可预测、有最大步数限制
  • 典型用法:天气查询、计算任务、多步推理

功能特性对比表

特性 Chain Graph Workflow ReAct Agent
复杂度 ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
灵活性 ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
条件分支 ✅(自主)
并行处理 有限
字段映射
工具调用 手动 手动 手动 ✅(自主)
类型安全
学习曲线 中高
执行可控性 完全可控 大部分可控 完全可控 部分可控

选择建议

使用 Chain 的场景:

✅ 简单的线性处理流程
✅ 固定的数据转换管道
✅ 快速原型开发
✅ 学习 Eino 框架的入门示例

示例:文本总结、翻译、格式转换

使用 Graph 的场景:

✅ 需要根据条件选择不同处理路径
✅ 有多个分支但数据流相对简单
✅ 需要可视化执行流程

示例:意图识别后的分流处理、多阶段审核流程

使用 Workflow 的场景:

✅ 需要精确控制数据在节点间的流转
✅ 一个节点需要从多个来源获取数据
✅ 需要并行处理多个任务
✅ 复杂的数据组装和转换

示例:多模型协同、复杂报告生成、数据聚合处理

使用 ReAct Agent 的场景:

✅ 需要访问外部工具或 API
✅ 任务需要多步推理
✅ 无法预先确定执行路径
✅ 希望 LLM 自主决策

示例:智能助手、代码生成、数据分析、实时信息查询

一句话总结

  • Chain:流水线 —— 简单直接,适合线性任务
  • Graph:路由器 —— 条件分流,灵活可控
  • Workflow:数据总线 —— 精细映射,复杂编排
  • ReAct:自主决策 —— LLM 驱动,智能执行

🎯 六、最佳实践建议

1. 从简单开始,逐步升级

  • 先用 Chain 实现基本功能
  • 需要分支时升级到 Graph
  • 数据流复杂时考虑 Workflow
  • 需要智能决策时使用 ReAct Agent

2. 类型安全很重要

所有编排方式都支持 Go 的泛型,充分利用编译时类型检查:

// 明确指定输入输出类型
chain := compose.NewChain[map[string]any, *schema.Message]()
graph := compose.NewGraph[InputType, OutputType]()
workflow := compose.NewWorkflow[InputType, OutputType]()

3. 合理使用 Lambda

Lambda 节点非常强大,可以:

  • 数据预处理和后处理
  • 格式转换
  • 条件判断
  • 错误处理
lambda := compose.InvokableLambda(func(ctx context.Context, input InputType) (OutputType, error) {
        // 自定义逻辑
        return output, nil
})

4. 工具设计原则

设计 ReAct Agent 的工具时:

  • ✅ 功能单一明确
  • ✅ 参数描述清晰
  • ✅ 返回结果结构化
  • ✅ 错误处理完善

5. 性能优化

  • 使用流式输出提升用户体验
  • 合理设置 MaxStep 避免无限循环
  • 考虑使用缓存减少重复调用
  • 并行处理可以提高效率(Workflow)

📚 总结

本文通过实战代码详细介绍了 Eino 框架的四种 AI 集成方式。每种方式都有其适用场景:

  • Chain 适合简单的线性流程
  • Graph 适合需要条件分支的场景
  • Workflow 适合复杂的数据编排
  • ReAct Agent 适合需要智能决策的场景

选择合适的编排方式,可以让你的 AI 应用开发事半功倍。希望这篇文章能帮助你更好地理解和使用 Eino 框架!


🔗 相关资源

如果觉得这篇文章有帮助,欢迎点赞、收藏和分享! 🌟