从零开始掌握 OpenAI Agents Python:构建智能 AI Agent 的完整指南

从零开始掌握 OpenAI Agents Python:构建智能 AI Agent 的完整指南

从零开始掌握 OpenAI Agents Python:构建智能 AI Agent 的完整指南

引言

在人工智能快速发展的今天,构建智能代理(Agent)系统已成为开发者必备的技能之一。OpenAI 推出的 openai-agents-python 库为我们提供了一个强大而灵活的框架,能够轻松创建具有复杂推理能力、多工具调用和Agent协作的AI应用。

本教程将带你从零开始,深入理解这个框架的每一个核心概念,通过详尽的代码示例和实战演练,让你能够独立构建出功能完备的AI Agent系统。无论你是刚入门的新手,还是希望深入了解框架细节的开发者,都能从中获得有价值的知识。

第一部分:环境搭建与项目初始化

1.1 系统要求与前置条件

在开始之前,我们需要确保开发环境满足基本要求。首先,你需要安装 Python 3.10 或更高版本,因为这个框架使用了许多现代Python特性,如类型提示的增强功能和异步编程支持。其次,你需要拥有一个 OpenAI API Key,可以从 OpenAI 官网平台获取。

在安装依赖之前,建议创建一个独立的虚拟环境来管理项目依赖,这样可以避免与其他项目产生冲突。你可以使用 Python 自带的 venv 模块或者更现代的 uv 工具来创建虚拟环境。虚拟环境能够确保项目依赖的隔离性和可重现性,这在团队协作和部署时尤为重要。

# 创建虚拟环境的示例命令(在终端中执行)
# python -m venv agents-env
# source agents-env/bin/activate  # Linux/Mac
# agents-env\Scripts\activate     # Windows

1.2 安装框架与依赖

安装 openai-agents-python 框架非常简单,只需要使用 pip 包管理器即可完成。官方建议在安装时包含所有可选依赖,以获得完整的开发体验:

pip install openai-agents-python

如果你想安装包含所有可选依赖的版本,可以使用以下命令:

pip install "openai-agents-python[instruments]"

这个命令会额外安装用于追踪(Tracing)和监控的依赖包,对于调试和优化 Agent 行为非常有帮助。安装完成后,我们可以通过一个简单的导入测试来确认安装成功:

# 验证安装的代码
import inspect
from agents import Agent, Runner

# 检查核心类是否正确导入
print("Agent class imported successfully")
print(f"Runner class available: {Runner is not None}")

# 查看版本信息
import agents
print(f"Framework version: {agents.__version__}")

1.3 配置 API Key

为了让框架能够与 OpenAI 的 API 进行通信,我们需要配置 API Key。有多种方式可以完成这个配置,你可以根据项目的实际需求选择最适合的方法。

第一种方式是通过环境变量设置,这是最推荐的做法,特别适合生产环境和团队协作场景:

# 在终端中设置环境变量
# export OPENAI_API_KEY="sk-..."

第二种方式是在代码中直接设置,这种方式适合快速测试和学习,但在生产环境中应该避免使用:

import os
from openai import AsyncOpenAI

# 直接设置 API Key(仅用于测试)
os.environ["OPENAI_API_KEY"] = "sk-your-api-key-here"

# 创建客户端实例
client = AsyncOpenAI()

第三种方式是通过配置文件管理,支持 YAML 或 JSON 格式的配置文件,适合需要管理多个环境配置的大型项目:

# config.yaml 示例
# openai:
#   api_key: ${OPENAI_API_KEY}
#   organization: ${OPENAI_ORG_ID}

1.4 开发环境配置建议

为了获得最佳的开发体验,建议配置以下工具和设置。代码编辑器方面,VS Code 或 PyCharm 都是不错的选择,它们都提供了良好的 Python 支持和调试功能。类型检查工具如 mypy 可以帮助你在运行时之前发现类型错误,这对于构建复杂的 Agent 系统非常有价值。

# pyproject.toml 配置示例
[tool.agents]
name = "my-agent-project"
version = "0.1.0"

[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = false

第二部分:核心概念详解

2.1 Agent(智能体)— 框架的核心构建块

Agent 是整个框架的核心概念,它代表了一个具有特定能力、行为规则和目标的 AI 实体。理解 Agent 的内部结构对于构建复杂的 AI 系统至关重要。

一个 Agent 主要由以下几个部分组成:

指令(Instructions) 定义了 Agent 的角色定位、行为规范和输出格式要求。良好的指令设计能够让 Agent 准确理解自己的职责和边界。

模型(Model) 指定了 Agent 使用的 AI 模型,可以是 GPT-4、GPT-4 Turbo 或其他 OpenAI 支持的模型。不同模型在能力、成本和响应速度上有差异,选择时需要权衡。

工具(Tools) 赋予 Agent 执行特定任务的能力,如搜索信息、执行代码、访问API等。

示例(Examples) 提供了一些输入-输出对,帮助模型理解期望的行为模式。

from agents import Agent

# 创建你的第一个 Agent
first_agent = Agent(
    name="问候助手",
    instructions="你是一个友善的客服助手,专门负责向用户问好并提供基本帮助。\
                  请始终保持热情、专业的态度,使用简洁明了的语言。",
    model="gpt-4-turbo",
)

print(f"Agent 创建成功: {first_agent.name}")
print(f"使用的模型: {first_agent.model}")

2.2 Runner(运行器)— 执行与状态管理

Runner 是负责执行 Agent 的核心类,它处理用户输入、协调 Agent 行为、管理上下文状态,并返回最终结果。框架提供了同步和异步两种 Runner 接口,以适应不同的应用场景。

import asyncio
from agents import Agent, Runner

# 简单的同步执行
async def run_basic_agent():
    agent = Agent(
        name="简单助手",
        instructions="用一句话介绍自己",
    )

    # 使用 run 方法执行
    result = await Runner.run(agent, "你好")

    # 访问结果
    print(f"Agent 响应: {result.final_output}")
    print(f"消耗的 Token 数: {result.last_agent.memory.usage_tokens}")

    return result

# 运行异步函数
asyncio.run(run_basic_agent())

Runner 提供了丰富的方法和属性来处理各种场景:

run() 方法接受一个 Agent 和输入,启动 Agent 的执行流程。

run_stream() 方法返回流式响应,适合需要实时展示输出的应用。

get_last_agent_memory() 方法可以访问 Agent 的记忆,用于实现上下文持久化。

2.3 Function(函数工具)— 扩展 Agent 能力

Function 是 Agent 与外部世界交互的桥梁。通过定义函数工具,Agent 可以执行代码、查询数据库、调用 API 等复杂操作。框架提供了优雅的装饰器语法来声明函数工具。

from agents import Agent, Runner, function

# 使用 @function 装饰器定义工具
@function
def get_weather(location: str) -> str:
    """
    获取指定位置的天气信息

    参数:
        location: 城市名称或地点描述

    返回:
        天气描述字符串
    """
    # 这里是你真实的天气 API 调用逻辑
    # 为了演示,我们返回一个模拟结果
    weather_data = {
        "北京": "晴,温度 15-25°C,适合出行",
        "上海": "多云,温度 18-28°C,偶有小雨",
        "广州": "雷阵雨,温度 25-33°C,请带伞",
    }

    return weather_data.get(location, f"抱歉,未找到 {location} 的天气信息")

# 创建一个使用工具的 Agent
weather_agent = Agent(
    name="天气预报员",
    instructions="你是一个专业的天气预报员。使用 get_weather 工具\
                  来回答用户关于天气的问题。如果用户没有指定地点,\
                  请询问他们想查询哪个城市的天气。",
    tools=[get_weather],
)

# 执行 Agent
async def main():
    result = await Runner.run(weather_agent, "北京今天天气怎么样?")
    print(result.final_output)

asyncio.run(main())

2.4 Handoff(交接)— 多 Agent 协作机制

Handoff 机制允许 Agent 之间进行交接和协作,这是构建复杂 AI 系统的重要特性。通过 handoff,多个专门化的 Agent 可以协同工作,每个 Agent 负责处理其擅长的任务。

from agents import Agent, Runner, handoff

# 定义处理不同语言的 Agent
english_agent = Agent(
    name="English Assistant",
    instructions="你是一个专业的英语助手。帮助用户进行英语翻译、\
                  写作润色和语法纠错。只使用英语交流。",
)

chinese_agent = Agent(
    name="中文助手",
    instructions="你是一个专业的中文助手。帮助用户进行中文翻译、\
                  写作润色和文字润色。只使用中文交流。",
)

spanish_agent = Agent(
    name="Asistente de Español",
    instructions="You are a professional Spanish assistant. Help users \
                  with Spanish translation, writing, and grammar. \
                  Only communicate in Spanish.",
)

# 创建转接器
language_handoff_es = handoff(spanish_agent)

# 创建主 Agent,能够根据语言转接到不同的 Agent
router_agent = Agent(
    name="多语言助手",
    instructions="你是一个多语言助手路由器。根据用户使用的语言,\
                  将请求转接到相应的语言专家 Agent:\
                  - 用户使用英语 → 转接到 English Assistant\
                  - 用户使用中文 → 转接到 中文助手\
                  - 用户使用西班牙语 → 转接到 Asistente de Español",
    handoffs=[english_agent, chinese_agent, language_handoff_es],
)

2.5 Guardrails(护栏)— 输入输出安全保障

Guardrails 提供了对 Agent 输入输出的验证和保护机制。它们可以在用户输入到达 Agent 之前或 Agent 输出返回给用户之前进行检查和过滤,确保系统的安全性和可靠性。

from agents import Agent, Runner, GuardrailFunctionOutput, guardrail
from pydantic import BaseModel

# 定义输出结构
class OutputCheck(BaseModel):
    is_safe: bool
    reason: str
    category: str = "safe"

# 定义内容安全检查函数
@guardrail
def content_safety_check(user_input: str) -> GuardrailFunctionOutput:
    """
    检查用户输入的安全性

    这个函数会在用户输入到达 Agent 之前被调用,
    如果返回 is_safe=False,Agent 将不会处理该输入
    """
    # 定义敏感词列表
    sensitive_keywords = ["hack", "exploit", "bypass"]

    # 检查是否包含敏感内容
    input_lower = user_input.lower()
    for keyword in sensitive_keywords:
        if keyword in input_lower:
            return GuardrailFunctionOutput(
                is_safe=False,
                reason=f"输入包含敏感词: {keyword}",
                category="safety_violation",
            )

    return GuardrailFunctionOutput(
        is_safe=True,
        reason="输入通过安全检查",
        category="safe",
    )

# 定义输出结构验证
class TranslationOutput(BaseModel):
    translation: str
    confidence: float

# 创建带护栏的 Agent
safe_translator = Agent(
    name="安全翻译助手",
    instructions="将用户输入翻译成英文,只返回翻译结果。",
    input_guardrails=[content_safety_check],
)

2.6 Tracing(追踪)— 调试与监控

Tracing 功能提供了对 Agent 执行过程的完整追踪能力,包括每个步骤的输入输出、函数调用、Token 消耗等信息。这对于调试、监控和优化 Agent 行为非常有用。

from agents import Agent, Runner
from agents.tracing import AsyncAnalytics, tracer

# 配置追踪
analytics = AsyncAnalytics()

# 使用追踪运行 Agent
async def run_with_tracing():
    agent = Agent(
        name="追踪测试 Agent",
        instructions="解释量子计算的基本原理",
    )

    # 启动追踪
    with tracer.start_as_current_span("agent-execution") as span:
        span.set_attribute("agent.name", agent.name)

        result = await Runner.run(agent, "什么是量子纠缠?")

        # 记录结果
        span.set_attribute("result.length", len(result.final_output))
        print(f"追踪结果: {result.final_output[:100]}...")

# 运行带追踪的代码
asyncio.run(run_with_tracing())

第三部分:一步步实战教程

3.1 第一个 Agent — 打造简单的问答助手

让我们从最简单的例子开始,创建一个能够回答基本问题的问答助手。这个例子将帮助你理解框架的基本使用流程。

import asyncio
from agents import Agent, Runner

async def create_qa_assistant():
    # 第一步:定义 Agent
    qa_agent = Agent(
        name="智能问答助手",
        instructions="""
        你是一个知识渊博的问答助手,名叫"小智"。

        你的特点:
        - 回答简洁明了,通常不超过三句话
        - 如遇到不确定的问题,诚实表示不知道
        - 适当使用比喻帮助解释复杂概念
        - 在回答结束时,可以根据话题适当推荐相关内容
        """,
        model="gpt-4-turbo",
    )

    # 第二步:准备问题
    questions = [
        "什么是 Python 编程语言?",
        "请解释一下什么是机器学习",
        "人工智能和机器学习有什么区别?",
    ]

    # 第三步:运行 Agent 并收集结果
    print("=" * 50)
    print("开始问答测试")
    print("=" * 50)

    for question in questions:
        print(f"\n问题: {question}")
        print("-" * 40)

        # 运行 Agent
        result = await Runner.run(qa_agent, question)

        # 输出结果
        print(f"回答: {result.final_output}")

        # 获取 token 消耗信息
        if result.last_agent and result.last_agent.memory:
            usage = result.last_agent.memory.usage_tokens
            print(f"(本次对话 Token 消耗: {usage})")

# 执行
asyncio.run(create_qa_assistant())

运行上述代码后,你将看到 Agent 逐一回答问题。这个例子展示了基本的使用模式:创建 Agent、运行 Runner、处理结果。

3.2 构建多功能工具 Agent

现在让我们创建一个更复杂的 Agent,它能够使用多种工具来完成复杂任务。这个 Agent 将能够搜索信息、执行数学计算和处理文件。

import asyncio
import math
from agents import Agent, Runner, function
from typing import List, Dict

# 定义搜索工具(模拟)
@function
def web_search(query: str, max_results: int = 5) -> str:
    """
    在互联网上搜索相关信息

    参数:
        query: 搜索关键词
        max_results: 最大返回结果数

    返回:
        格式化的搜索结果
    """
    # 这里是你的真实搜索 API 调用
    # 演示中返回模拟数据
    results = f"关于'{query}'的搜索结果(模拟):\n"
    results += f"1. 相关文章A - 包含{query}的详细介绍\n"
    results += f"2. 相关文章B - {query}的实际应用案例\n"
    results += f"3. 相关文章C - {query}的最新研究进展\n"
    return results

# 定义数学计算工具
@function
def calculate(expression: str) -> str:
    """
    执行数学表达式计算

    参数:
        expression: 数学表达式,如 "2 + 2" 或 "sqrt(16)"

    返回:
        计算结果
    """
    try:
        # 安全评估数学表达式
        # 注意:实际应用中应该使用更安全的评估方法
        allowed_functions = {
            "sqrt": math.sqrt,
            "sin": math.sin,
            "cos": math.cos,
            "tan": math.tan,
            "log": math.log,
            "abs": abs,
            "pow": pow,
        }

        # 简单的表达式解析
        expression = expression.lower().replace(" ", "")

        # 处理常见函数
        for func_name, func in allowed_functions.items():
            if func_name in expression:
                # 提取参数
                start = expression.find(f"{func_name}(") + len(func_name) + 1
                end = expression.find(")", start)
                if end != -1:
                    arg = float(expression[start:end])
                    result = func(arg)
                    return f"{expression} = {result}"

        # 处理简单算术(演示用,生产环境应使用 ast 模块解析)
        result = eval(expression, {"__builtins__": {}}, {})
        return f"{expression} = {result}"

    except Exception as e:
        return f"计算错误: {str(e)}"

# 定义知识库查询工具
@function
def query_knowledge_base(topic: str) -> str:
    """
    查询本地知识库

    参数:
        topic: 查询主题

    返回:
        知识库中的相关内容
    """
    # 模拟知识库数据
    knowledge_db = {
        "python": "Python 是一种高级编程语言,以其简洁的语法和强大的功能著称。\
                   它支持多种编程范式,广泛用于 Web 开发、数据科学、人工智能等领域。",
        "javascript": "JavaScript 是一种脚本语言,主要用于 Web 前端开发。\
                      它可以在浏览器中运行,实现交互式网页效果。",
        "机器学习": "机器学习是人工智能的一个分支,研究如何让计算机从数据中学习并改进。\
                    主要算法包括监督学习、无监督学习和强化学习。",
    }

    # 模糊匹配
    for key, value in knowledge_db.items():
        if key in topic.lower() or topic.lower() in key:
            return value

    return f"知识库中未找到关于'{topic}'的信息"

# 创建多功能 Agent
tools_agent = Agent(
    name="全能助手",
    instructions="""
    你是一个多功能助手,能够帮助用户完成各种任务。

    可用工具:
    - web_search: 搜索互联网信息
    - calculate: 执行数学计算
    - query_knowledge_base: 查询本地知识库

    使用指南:
    1. 理解用户的问题
    2. 选择合适的工具来回答
    3. 如果需要多个工具,逐步调用
    4. 综合工具返回的结果给出完整回答
    """,
    tools=[web_search, calculate, query_knowledge_base],
)

# 运行多功能 Agent
async def main():
    tasks = [
        "帮我计算一下 2 的 10 次方是多少",
        "请搜索关于 Python 编程语言的最新资讯",
        "我需要了解机器学习的基础知识",
        "请帮我计算 sqrt(144) + 50 的结果",
    ]

    print("=" * 60)
    print("多功能 Agent 测试")
    print("=" * 60)

    for task in tasks:
        print(f"\n任务: {task}")
        print("-" * 50)

        result = await Runner.run(tools_agent, task)
        print(f"结果: {result.final_output}")

asyncio.run(main())

3.3 构建多 Agent 协作系统

现在让我们构建一个复杂的多 Agent 系统,其中包含多个专门化的 Agent,通过 handoff 机制协作处理用户请求。

import asyncio
from agents import Agent, Runner, handoff

# ============================================
# 定义各个专门化的 Agent
# ============================================

# 1. 账户管理 Agent
account_agent = Agent(
    name="账户管理员",
    instructions="""
    你是一个账户管理专家,帮助用户处理账户相关问题。

    可以处理的请求类型:
    - 账户注册和登录问题
    - 密码重置
    - 账户信息更新
    - 账户安全设置

    回答要专业、准确,对于敏感操作要提醒用户验证身份。
    """,
)

# 2. 订单处理 Agent
order_agent = Agent(
    name="订单专员",
    instructions="""
    你是一个订单处理专家,帮助用户管理订单。

    可以处理的请求类型:
    - 订单状态查询
    - 订单修改(地址、联系方式等)
    - 取消订单
    - 退换货申请
    - 物流查询

    处理订单时,要先确认用户身份和订单信息。
    """,
)

# 3. 投诉处理 Agent
complaint_agent = Agent(
    name="投诉处理专员",
    instructions="""
    你是一个投诉处理专家,专注于解决用户的不满和问题。

    工作原则:
    - 耐心倾听用户的诉求
    - 表达理解和歉意
    - 积极寻找解决方案
    - 如无法解决,及时升级到更高层级

    对于严重投诉,需要记录详细信息并上报。
    """,
)

# 4. 产品咨询 Agent
product_agent = Agent(
    name="产品顾问",
    instructions="""
    你是一个产品顾问,帮助用户了解产品信息。

    职责范围:
    - 产品功能介绍
    - 产品对比和推荐
    - 价格和优惠信息
    - 使用方法和技巧

    回答要客观,避免过度推销设。
    """,
)

# ============================================
# 创建转接器
# ============================================

# 为不同 Agent 创建 handoff
account_handoff = handoff(account_agent)
order_handoff = handoff(order_agent)
complaint_handoff = handoff(complaint_agent)
product_handoff = handoff(product_agent)

# ============================================
# 创建主路由 Agent
# ============================================

router_agent = Agent(
    name="智能客服中枢",
    instructions="""
    你是一个智能客服系统的中枢路由器,负责将用户请求转接到最合适的专员。

    转接规则:
    1. 账户、登录、密码、注册 → 账户管理员
    2. 订单、物流、退货、换货 → 订单专员
    3. 投诉、反馈、问题报告 → 投诉处理专员
    4. 产品功能、价格、使用方法 → 产品顾问

    转接流程:
    1. 识别用户请求的核心意图
    2. 选择最合适的专员 Agent
    3. 将请求和上下文完整转交给专员
    4. 专员处理完毕后,可以转回中枢进行其他服务

    始终保持友好、专业的服务态度。
    """,
    handoffs=[account_handoff, order_handoff, complaint_handoff, product_handoff],
)

# ============================================
# 测试多 Agent 协作系统
# ============================================

async def test_multi_agent_system():
    test_cases = [
        "我想重置我的账户密码",
        "查询一下订单号 12345 的物流进度",
        "这个产品有什么功能?和竞品相比有什么优势?",
        "我收到的商品破损了,要求赔偿",
        "我想要取消订单并退款",
    ]

    print("=" * 70)
    print("多 Agent 协作系统测试")
    print("=" * 70)

    for i, query in enumerate(test_cases, 1):
        print(f"\n{'=' * 70}")
        print(f"测试案例 {i}")
        print(f"用户请求: {query}")
        print("-" * 70)

        result = await Runner.run(router_agent, query)
        print(f"系统响应: {result.final_output}")

        # 显示执行轨迹
        if hasattr(result, 'last_agent') and result.last_agent:
            print(f"\n[信息] 最终处理的 Agent: {result.last_agent.name}")

asyncio.run(test_multi_agent_system())

3.4 实现流式输出

对于需要实时展示的场景,流式输出是一个重要的功能。下面展示如何实现流式响应。

import asyncio
from agents import Agent, Runner

async def stream_output_demo():
    # 创建 Agent
    story_agent = Agent(
        name="故事讲述者",
        instructions="""
        你是一个富有想象力的故事讲述者,能够创作各种类型的故事。
        请用生动的语言讲述一个简短的奇幻故事。
        """,
    )

    print("=" * 60)
    print("流式输出演示")
    print("=" * 60)
    print("\n故事开始:\n")

    # 使用 run_stream 实现流式输出
    stream = await Runner.run_stream(story_agent, "请给我讲一个关于勇敢骑士的短故事")

    # 流式处理输出
    collected_output = []
    async for event in stream.stream_events():
        # 处理不同类型的事件
        if event.type == "raw_stream_event":
            # 处理原始流事件
            if hasattr(event.data, 'delta'):
                delta = event.data.delta
                if hasattr(delta, 'content'):
                    print(delta.content, end="", flush=True)
                    collected_output.append(delta.content)

        elif event.type == "agent_updated_stream_event":
            # Agent 状态更新事件
            print(f"\n[Agent 状态更新: {event.data.name}]", end="", flush=True)

    print("\n")

    # 访问完整输出
    final_result = await stream.get_final_output()
    print(f"\n完整故事长度: {len(final_result.final_output)} 字符")
    print(f"Token 消耗: {final_result.last_agent.memory.usage_tokens if final_result.last_agent and final_result.last_agent.memory else 'N/A'}")

asyncio.run(stream_output_demo())

3.5 构建带记忆的对话系统

实现跨对话的记忆功能,可以让 Agent 在多轮对话中保持上下文。

import asyncio
from agents import Agent, Runner
from typing import List, Dict

class ConversationMemory:
    """简单的对话记忆管理器"""

    def __init__(self):
        self.conversations: Dict[str, List[Dict]] = {}

    def add_message(self, session_id: str, role: str, content: str):
        """添加消息到记忆"""
        if session_id not in self.conversations:
            self.conversations[session_id] = []

        self.conversations[session_id].append({
            "role": role,
            "content": content
        })

    def get_context(self, session_id: str, max_turns: int = 10) -> str:
        """获取最近的对话上下文"""
        if session_id not in self.conversations:
            return ""

        messages = self.conversations[session_id][-max_turns:]
        context = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages])
        return context

    def clear_session(self, session_id: str):
        """清除特定会话的记忆"""
        if session_id in self.conversations:
            del self.conversations[session_id]

# 创建带记忆的 Agent
def create_memory_agent():
    return Agent(
        name="记忆助手",
        instructions="""
        你是一个有个性的助手,会记住用户的偏好和之前的对话内容。

        行为准则:
        1. 记住用户的名字(如果他们告诉你的话)
        2. 记住用户的偏好设置
        3. 在适当的时候提及之前的对话内容
        4. 如果用户问起之前讨论过的话题,可以使用记忆来回应

        保持友好和亲切的态度。
        """,
    )

async def memory_conversation_demo():
    memory = ConversationMemory()
    agent = create_memory_agent()

    session_id = "user_001"

    print("=" * 60)
    print("带记忆的对话系统演示")
    print("=" * 60)

    # 模拟多轮对话
    queries = [
        "你好!我的名字叫小明,我喜欢编程和音乐。",
        "你知道我喜欢什么吗?",
        "今天天气怎么样?",
        "我叫什么名字?我有什么爱好?",
    ]

    for i, query in enumerate(queries, 1):
        print(f"\n--- 对话轮次 {i} ---")
        print(f"用户: {query}")

        # 获取历史上下文
        context = memory.get_context(session_id)

        # 构建带上下文的输入
        if context:
            full_input = f"之前的对话记忆:\n{context}\n\n当前用户消息:{query}"
        else:
            full_input = query

        # 运行 Agent
        result = await Runner.run(agent, full_input)
        response = result.final_output

        print(f"助手: {response}")

        # 更新记忆
        memory.add_message(session_id, "user", query)
        memory.add_message(session_id, "assistant", response)

    # 展示最终记忆内容
    print("\n" + "=" * 60)
    print("记忆内容展示")
    print("=" * 60)
    print(memory.get_context(session_id))

asyncio.run(memory_conversation_demo())

3.6 实现复杂的工作流管道

在实际应用中,我们经常需要将多个 Agent 组织成复杂的工作流程。下面展示如何构建一个数据处理管道。

import asyncio
from agents import Agent, Runner, function
from typing import List, Dict

# ============================================
# 定义工作流程中的各个处理 Agent
# ============================================

# 1. 数据验证 Agent
validator_agent = Agent(
    name="数据验证员",
    instructions="""
    你负责验证输入数据的完整性和格式正确性。

    检查项目:
    - 必填字段是否存在
    - 数据格式是否正确
    - 数据范围是否合理

    如果数据有问题,详细说明问题所在。
    如果数据正常,返回"验证通过"。
    """,
)

# 2. 数据分析 Agent
analyzer_agent = Agent(
    name="数据分析师",
    instructions="""
    你负责分析输入的数据,提取有价值的洞察。

    分析维度:
    - 数据的基本统计特征
    - 数据的分布情况
    - 异常值的识别
    - 潜在的模式和趋势

    返回结构化的分析报告。
    """,
)

# 3. 报告生成 Agent
reporter_agent = Agent(
    name="报告撰写员",
    instructions="""
    你负责将分析结果整理成易于理解的报告。

    报告要求:
    - 结构清晰,层次分明
    - 突出关键发现
    - 提供可操作的建议
    - 使用简洁明了的语言

    报告应该让非专业读者也能理解。
    """,
)

# ============================================
# 定义数据处理函数
# ============================================

@function
def validate_input_data(data: Dict) -> str:
    """
    验证输入数据

    参数:
        data: 待验证的数据字典

    返回:
        验证结果
    """
    required_fields = ["name", "age", "email"]
    missing_fields = [f for f in required_fields if f not in data]

    if missing_fields:
        return f"缺少必填字段: {', '.join(missing_fields)}"

    # 验证邮箱格式(简化)
    if "@" not in data.get("email", ""):
        return "邮箱格式不正确"

    # 验证年龄范围
    age = data.get("age", 0)
    if not isinstance(age, int) or age < 0 or age > 150:
        return "年龄值超出合理范围"

    return "验证通过"

@function
def analyze_data(data: Dict) -> str:
    """
    分析数据

    参数:
        data: 待分析的数据字典

    返回:
        分析结果
    """
    insights = []

    # 基本统计
    insights.append(f"数据包含 {len(data)} 个字段")

    # 字段分析
    for key, value in data.items():
        insights.append(f"- {key}: {value} ({type(value).__name__})")

    # 异常检测
    if isinstance(data.get("age"), int):
        if data["age"] < 18:
            insights.append("注意:用户年龄低于18岁")
        elif data["age"] > 60:
            insights.append("注意:用户年龄超过60岁")

    return "\n".join(insights)

# ============================================
# 创建工作流管道 Agent
# ============================================

class DataPipeline:
    """数据处理管道"""

    def __init__(self):
        self.validator = Agent(
            name="验证节点",
            instructions="验证输入数据,输出验证结果",
            tools=[validate_input_data],
        )

        self.analyzer = Agent(
            name="分析节点", 
            instructions="分析输入数据,输出分析报告",
            tools=[analyze_data],
        )

        self.reporter = Agent(
            name="报告节点",
            instructions="将分析结果转化为易读的最终报告",
        )

    async def run(self, input_data: Dict) -> str:
        """运行完整的处理管道"""
        print("\n" + "=" * 60)
        print("数据处理管道启动")
        print("=" * 60)

        # 阶段 1: 验证
        print("\n[阶段 1/3] 数据验证...")
        validation_result = await Runner.run(
            self.validator,
            f"请验证以下数据:{input_data}"
        )
        validation_output = validation_result.final_output
        print(f"验证结果: {validation_output}")

        if "通过" not in validation_output:
            return f"数据验证失败:{validation_output}"

        # 阶段 2: 分析
        print("\n[阶段 2/3] 数据分析...")
        analysis_result = await Runner.run(
            self.analyzer,
            f"请分析以下数据:{input_data}"
        )
        analysis_output = analysis_result.final_output
        print(f"分析完成: {len(analysis_output)} 字符")

        # 阶段 3: 报告生成
        print("\n[阶段 3/3] 生成报告...")
        report_result = await Runner.run(
            self.reporter,
            f"基于以下分析结果生成最终报告:\n{analysis_output}"
        )
        report_output = report_result.final_output

        print("\n" + "=" * 60)
        print("处理完成!")
        print("=" * 60)

        return report_output

# ============================================
# 测试工作流管道
# ============================================

async def test_pipeline():
    # 准备测试数据
    test_data = {
        "name": "张三",
        "age": 28,
        "email": "zhangsan@example.com",
        "occupation": "软件工程师",
        "experience_years": 5,
        "skills": ["Python", "JavaScript", "Machine Learning"],
    }

    # 创建并运行管道
    pipeline = DataPipeline()
    final_report = await pipeline.run(test_data)

    print("\n最终报告:")
    print("-" * 60)
    print(final_report)

asyncio.run(test_pipeline())

第四部分:常见使用场景与实战案例

4.1 智能客服机器人

智能客服是 Agent 技术最常见的应用场景之一。一个完善的客服系统需要能够处理多种类型的问题,并在必要时转接到人工客服。

“`python
import asyncio
from agents import Agent, Runner, handoff, function
from typing import Optional
import re

============================================

客服系统的核心组件

============================================

1. FAQ 查询 Agent

faq_agent = Agent(
name=”FAQ助手”,
instructions=”””
你是一个 FAQ 查询专家,帮助用户解答常见问题。

知识库内容:
- 账户相关问题:注册、登录、密码重置、账户设置
- 支付相关问题:付款方式、退款政策、发票申请
- 产品使用

Project: https://github.com/openai/openai-agents-python

Stars: 26395

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注