别再让你的AI Agent”停在Demo”了,这套框架让它们真正具备生产级能力
揭秘NirDiamant/agents-towards-production:让LangChain Agent从玩具走向工业级应用的完整实践指南
当你在本地跑通第一个LangChain Agent时,是否有过这样的兴奋感?几行代码就能让AI”思考”、调用工具、回答复杂问题。然而,当你试图把它部署到实际项目中时,问题接踵而至:性能不稳定、错误处理缺失、日志难以追踪、并发能力不足……仿佛所有的”魔法”都只存在于精心设计的演示中。
这正是NirDiamant/agents-towards-production项目诞生的初衷。它不是另一个抽象的AI框架,而是一套专注于解决”最后一公里”问题的实战手册——从代码结构设计到错误恢复机制,从性能优化到可观测性建设,每一处细节都指向一个目标:让你的Agent真正ready for production。
本文将带你从零开始,系统掌握这套实践指南的全部精华。无论你是刚刚接触AI Agent开发的新手,还是已经在生产环境中摸爬滚打过的工程师,都能从中获得可落地的思路和可直接复用的代码。
为什么这个项目值得关注
AI Agent开发的现状与挑战
过去一年,大语言模型(LLM)的能力跃升让”Agent”概念炙手可热。AutoGPT、BabyAGI等项目的爆火让人们看到了AI自主执行任务的曙光。然而,热潮退去后,真实的生产环境给开发者泼了一盆冷水:
现实中的Agent开发面临着多维度的挑战。首先是架构层面的问题:大多数教程和示例代码都是”单文件主义”,所有的prompt、工具定义、逻辑判断都堆在一个文件里,既无法测试,也难以维护。其次是可靠性问题:LLM的输出不稳定,当Agent需要连续执行多个步骤时,任何一步的失败都可能导致整个流程崩溃,而现有教程几乎不涉及错误恢复机制。第三是工程化问题:如何设计Agent的日志系统、如何实现有意义的监控、如何在不影响性能的前提下添加安全审计——这些生产环境必备的能力,在学习资料中严重缺失。
NirDiamant/agents-towards-production正是针对这些痛点而生的。它汇集了LangChain Agent开发中的最佳实践,涵盖了从基础架构到高级特性的完整技术栈。更难得的是,这些实践都经过了真实生产环境的验证,不是纸上谈兵的设想。
这个项目的核心价值在于它填补了”Demo到Production”之间的鸿沟。它不会教你什么是ReAct、什么是Tool Use——这些基础概念你需要提前了解。它专注的是:当你的Agent需要服务1000个并发用户时,当你的Agent需要7×24小时稳定运行时,当你的Agent出现bug需要排查时,你应该怎么做。
项目的主要特色
深入了解这个项目后,你会发现它解决的不是某一个点的问题,而是构建了一套完整的Agent生产化方法论。
在代码组织方面,项目倡导模块化的Agent架构。它不要求你使用特定的框架,而是提供了一套思路:如何将Agent的核心逻辑与外部工具解耦,如何设计可复用的工具封装层,如何让prompt模板易于管理和版本控制。这种设计让你即使切换底层框架(如从LangChain迁移到其他方案),也能保持核心业务逻辑的稳定。
在可靠性保障方面,项目提供了多层级的错误处理策略。从LLM输出解析失败的容错,到工具调用超时后的重试机制,再到整个执行链路崩溃时的优雅降级,每一层都有对应的实现方案。这套策略不是简单的try-except捕获异常,而是基于Agent执行特性的专门设计。
在可观测性方面,项目演示了如何为Agent添加结构化日志、追踪执行路径、记录每一步的输入输出。这些能力对于排查”Agent为什么会做出这个决定”这类问题至关重要,也是生产环境调试的基础设施。
在性能优化方面,项目展示了批处理、缓存、异步执行等技术的应用场景和实现方式。这些优化不是炫技,而是在真实场景中必须面对的问题——当你的Agent需要处理大量请求时,每一处性能瓶颈都可能造成用户体验的断崖式下降。
环境搭建:从零构建Agent开发环境
基础依赖安装
在开始之前,我们需要准备好Python环境和必要的依赖库。建议使用Python 3.10或更高版本,因为较新的Python版本在类型提示和异步支持方面有更好的体验。
# 创建虚拟环境(推荐做法)
python -m venv agent-env
# 激活虚拟环境
# Linux/Mac
source agent-env/bin/activate
# Windows
agent-env\Scripts\activate
# 安装核心依赖
pip install langchain>=0.1.0
pip install langchain-openai # 或其他你选择的LLM提供方
pip install langchain-community # 社区维护的丰富工具集成
# 安装可选依赖
pip install requests # HTTP请求库,常用于自定义工具
pip install python-dotenv # 环境变量管理
pip install loguru # 现代化的日志库
pip install tenacity # 重试机制库
环境变量配置
为了安全地管理API密钥和配置信息,推荐使用环境变量。项目中使用python-dotenv来加载.env文件中的配置。
# .env 文件内容示例
OPENAI_API_KEY=sk-your-api-key-here
OPENAI_API_BASE=https://api.openai.com/v1 # 如需代理或使用第三方接口
SERPAPI_API_KEY=your-serpapi-key # 如使用SerpAPI工具
# 在代码中加载环境变量
from dotenv import load_dotenv
load_dotenv() # 自动查找并加载 .env 文件
项目初始化
创建一个标准化的项目结构,可以让你的Agent开发更加规范和可维护。以下是推荐的目录布局:
my-agent-project/
├── src/
│ ├── __init__.py
│ ├── agent/ # Agent核心逻辑
│ │ ├── __init__.py
│ │ ├── base.py # Agent基类
│ │ ├── executor.py # 执行器
│ │ └── memory.py # 记忆管理
│ ├── tools/ # 工具定义
│ │ ├── __init__.py
│ │ ├── search.py # 搜索工具
│ │ ├── calculator.py # 计算工具
│ │ └── custom.py # 自定义工具
│ ├── prompts/ # Prompt模板
│ │ ├── __init__.py
│ │ └── agent_prompts.py
│ └── utils/ # 工具函数
│ ├── __init__.py
│ ├── logging.py
│ └── retry.py
├── tests/ # 测试文件
├── config/ # 配置文件
├── logs/ # 日志输出目录
├── .env # 环境变量(不提交到版本控制)
├── .gitignore
├── requirements.txt
└── README.md
初始化项目文件:
# src/__init__.py
"""My Agent Project - A production-ready AI Agent."""
__version__ = "0.1.0"
# src/agent/__init__.py
"""Agent core module."""
from .base import BaseAgent
from .executor import AgentExecutor
__all__ = ["BaseAgent", "AgentExecutor"]
核心概念解析:理解Agent的架构哲学
Agent的三大组件
在深入代码之前,我们需要理解Agent的基本架构。一个功能完整的Agent通常由三个核心组件构成:
语言模型(LLM) 是Agent的”大脑”,负责理解用户意图、规划执行步骤、生成响应内容。选择合适的LLM是Agent设计的第一步。OpenAI的GPT-4在复杂推理任务上表现出色,但成本较高;Claude系列在长上下文处理上有优势;开源模型如Llama可以在本地部署,但能力相对有限。你需要根据具体场景在能力、成本、隐私之间做出权衡。
工具集(Tools) 是Agent的”四肢”。没有工具的Agent只能”纸上谈兵”,无法真正与外部世界交互。工具可以是简单的计算器、天气查询API,也可以是复杂的代码执行环境、数据库查询接口。工具设计的好坏直接影响Agent的能力边界。
编排层(Orchestration) 是Agent的”神经系统”,负责协调LLM和工具之间的交互。编排层决定了Agent如何理解用户请求、如何决定调用哪些工具、如何处理工具返回的结果、以及何时结束执行。LangChain的Agent架构就建立在这一层之上。
ReAct范式的深度理解
ReAct(Reasoning + Acting)是当前最流行的Agent编排范式之一。它的核心思想是让Agent在”思考”和”行动”之间交替进行:
思考(Reason):分析当前状态,理解需要完成什么任务,决定下一步行动。
行动(Act):执行选定的工具调用,获取外部信息。
观察(Observe):处理工具返回的结果,更新Agent的认知状态。
这个循环持续进行,直到Agent判断任务已经完成。
理解ReAct对于正确使用本项目至关重要。agents-towards-production中的很多设计都是围绕优化ReAct循环展开的:如何让思考过程更可控、如何让工具调用更可靠、如何让观察结果更可解释。
Agent状态机设计
在生产环境中,我们推荐将Agent的执行流程建模为状态机。这不是过度设计,而是应对复杂场景的必要手段。
from enum import Enum
from typing import Optional
from dataclasses import dataclass, field
class AgentState(Enum):
"""Agent执行状态枚举."""
IDLE = "idle" # 空闲状态,等待输入
REASONING = "reasoning" # 正在推理
ACTION = "action" # 正在执行工具调用
OBSERVING = "observing" # 正在处理观察结果
RESPONDING = "responding" # 正在生成响应
ERROR = "error" # 错误状态
COMPLETED = "completed" # 任务完成
WAITING_INPUT = "waiting_input" # 等待用户额外输入
@dataclass
class ExecutionContext:
"""Agent执行上下文."""
state: AgentState = AgentState.IDLE
user_input: str = ""
current_reasoning: str = "" # 当前推理过程
action_history: list = field(default_factory=list) # 操作历史
observation_history: list = field(default_factory=list) # 观察历史
tool_results: dict = field(default_factory=dict) # 工具结果缓存
error_count: int = 0 # 连续错误计数
max_iterations: int = 10 # 最大迭代次数
metadata: dict = field(default_factory=dict) # 扩展元数据
def can_continue(self) -> bool:
"""判断是否可以继续执行."""
return (
self.state not in [AgentState.COMPLETED, AgentState.ERROR]
and self.error_count < 3
and len(self.action_history) < self.max_iterations
)
def add_action(self, action: str, tool: str, input_data: any) -> None:
"""记录一次工具调用."""
self.action_history.append({
"action": action,
"tool": tool,
"input": input_data
})
这种状态机设计有几个好处:首先,它让Agent的执行流程变得透明——你可以精确知道Agent在任何时刻在做什么;其次,它为错误处理提供了明确的边界——不同的状态可以有不同的错误恢复策略;第三,它为监控和日志提供了清晰的切入点。
实战教程:构建一个生产级Agent
第一阶段:基础架构搭建
让我们从最基础的Agent框架开始,逐步构建一个具备生产级能力的系统。
首先是工具抽象层的设计。良好的工具抽象应该具备以下特性:统一的接口定义、完善的错误处理、详细的执行日志、以及灵活的参数验证。
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Type
from pydantic import BaseModel, Field, field_validator
from loguru import logger
import time
class ToolInput(BaseModel):
"""工具输入基类."""
pass
class ToolOutput(BaseModel):
"""工具输出基类."""
success: bool
result: Any = None
error: Optional[str] = None
execution_time: float = 0.0
metadata: Dict[str, Any] = {}
class BaseTool(ABC):
"""
工具基类,定义所有工具的通用接口。
每个具体工具需要:
1. 继承此类
2. 定义输入输出模型
3. 实现execute方法
"""
name: str = "" # 工具唯一标识名
description: str = "" # 工具描述,用于LLM理解何时调用
input_model: Type[ToolInput] = ToolInput # 输入模型类
def __init__(self):
self._call_count = 0
self._total_time = 0.0
@abstractmethod
def _run(self, input_data: ToolInput) -> ToolOutput:
"""
实际执行逻辑,由子类实现。
Args:
input_data: 验证后的输入数据
Returns:
ToolOutput: 执行结果
"""
pass
def execute(self, raw_input: Dict[str, Any]) -> ToolOutput:
"""
工具执行的入口方法,包含通用逻辑如输入验证、日志记录等。
Args:
raw_input: 原始输入字典
Returns:
ToolOutput: 执行结果
"""
self._call_count += 1
start_time = time.time()
logger.info(f"[Tool: {self.name}] Starting execution with input: {raw_input}")
try:
# 输入验证
validated_input = self._validate_input(raw_input)
# 执行核心逻辑
output = self._run(validated_input)
execution_time = time.time() - start_time
self._total_time += execution_time
output.execution_time = execution_time
logger.info(
f"[Tool: {self.name}] Completed in {execution_time:.3f}s, "
f"success={output.success}"
)
return output
except Exception as e:
execution_time = time.time() - start_time
logger.exception(f"[Tool: {self.name}] Execution failed")
return ToolOutput(
success=False,
error=str(e),
execution_time=execution_time
)
def _validate_input(self, raw_input: Dict[str, Any]) -> ToolInput:
"""验证并转换输入数据."""
try:
return self.input_model(**raw_input)
except Exception as e:
raise ValueError(f"Invalid input for tool {self.name}: {e}")
def get_stats(self) -> Dict[str, Any]:
"""获取工具统计信息."""
avg_time = self._total_time / self._call_count if self._call_count > 0 else 0
return {
"name": self.name,
"call_count": self._call_count,
"total_time": self._total_time,
"average_time": avg_time
}
现在让我们实现几个具体的工具示例:
from typing import List
import math
class CalculatorInput(ToolInput):
"""计算器工具输入."""
expression: str = Field(..., description="要计算的数学表达式")
class CalculatorOutput(ToolOutput):
"""计算器工具输出."""
result: float = 0.0
class CalculatorTool(BaseTool):
"""
数学计算器工具。
支持基本运算:加(+)、减(-)、乘(*)、除(/)、幂(**)、
三角函数(sin, cos, tan)、对数(log, ln)、平方根(sqrt)
"""
name = "calculator"
description = "用于执行数学计算。当需要计算数值结果时使用此工具。"
input_model = CalculatorInput
ALLOWED_FUNCTIONS = {
'sin': math.sin,
'cos': math.cos,
'tan': math.tan,
'sqrt': math.sqrt,
'log': math.log10,
'ln': math.log,
'abs': abs,
'pi': math.pi,
'e': math.e
}
def _run(self, input_data: CalculatorInput) -> CalculatorOutput:
"""执行数学计算."""
expression = input_data.expression.lower().strip()
# 安全检查:只允许数学表达式中的安全字符
allowed_chars = set('0123456789+-*/()., e')
if not all(c in allowed_chars or c.isalnum() for c in expression):
return CalculatorOutput(
success=False,
error="表达式包含不支持的字符"
)
# 预处理表达式
expression = self._preprocess_expression(expression)
try:
# 使用eval执行计算(这里做了简化处理)
result = eval(expression, {"__builtins__": {}}, self.ALLOWED_FUNCTIONS)
return CalculatorOutput(success=True, result=float(result))
except Exception as e:
return CalculatorOutput(success=False, error=f"计算错误: {str(e)}")
def _preprocess_expression(self, expression: str) -> str:
"""预处理数学表达式."""
# 移除常见描述性文字
replacements = [
('等于', ''),
('的平方根', '**0.5'),
('的平方', '**2'),
]
for old, new in replacements:
expression = expression.replace(old, new)
return expression
from datetime import datetime
from typing import Optional
class SearchInput(ToolInput):
"""搜索工具输入."""
query: str = Field(..., description="搜索查询关键词")
max_results: int = Field(default=5, ge=1, le=10, description="最大返回结果数")
class SearchOutput(ToolOutput):
"""搜索工具输出."""
results: List[Dict[str, str]] = Field(default_factory=list)
class MockSearchTool(BaseTool):
"""
模拟搜索工具(实际项目中替换为真实搜索API)。
在开发测试阶段使用mock数据,避免API调用限制和费用。
"""
name = "web_search"
description = "用于搜索最新信息、新闻或你不确定的事实。使用搜索工具获取实时信息。"
input_model = SearchInput
# Mock搜索结果数据
MOCK_RESULTS = {
"人工智能": [
{"title": "2024年AI发展十大趋势", "snippet": "生成式AI持续领跑,企业级应用加速落地..."},
{"title": "大语言模型最新研究进展", "snippet": "多模态模型成为新焦点,上下文窗口持续扩大..."},
],
"python": [
{"title": "Python 3.12新特性详解", "snippet": "模式匹配优化、性能提升20%..."},
{"title": "Python异步编程最佳实践", "snippet": "async/await深入理解与性能优化技巧..."},
]
}
def _run(self, input_data: SearchInput) -> SearchOutput:
"""执行搜索(Mock版本)."""
query = input_data.query
# 简单实现:在mock数据中查找相关结果
results = []
for key, items in self.MOCK_RESULTS.items():
if key in query:
results.extend(items[:input_data.max_results])
break
# 如果没有匹配,返回空结果
if not results:
results = [{
"title": f"关于'{query}'的搜索结果",
"snippet": "当前处于Mock模式,请配置真实的搜索API以获取实际结果"
}]
return SearchOutput(success=True, results=results[:input_data.max_results])
第二阶段:Agent核心实现
工具层搭建完成后,我们来实现Agent的核心逻辑。这部分是整个系统的”大脑”,负责协调各个组件的工作。
from typing import List, Dict, Any, Callable, Optional
from langchain.agents import AgentExecutor, create_react_agent
from langchain.prompts import PromptTemplate
from langchain.schema import AgentAction, AgentFinish, BaseOutputParser
from langchain_openai import ChatOpenAI
from loguru import logger
import json
class ReasoningOutputParser(BaseOutputParser):
"""
ReAct风格的输出解析器。
解析LLM输出,提取思考过程、工具调用指令。
"""
def parse(self, text: str) -> Dict[str, Any]:
"""解析LLM输出文本."""
# 移除可能干扰解析的markdown标记
text = text.strip()
# 检查是否为最终答案
if "final_answer" in text.lower() or "最终答案" in text:
return {"type": "finish", "output": self._extract_final_answer(text)}
# 尝试解析工具调用
tool_result = self._parse_tool_call(text)
if tool_result:
return tool_result
# 无法解析,返回错误
logger.warning(f"无法解析LLM输出: {text[:200]}")
return {"type": "error", "output": "无法解析LLM响应"}
def _parse_tool_call(self, text: str) -> Optional[Dict[str, Any]]:
"""解析工具调用指令."""
# 查找工具名称和参数
for tool_marker in ["Action: ", "行动: "]:
if tool_marker in text:
parts = text.split(tool_marker)
if len(parts) >= 2:
tool_info = parts[1].strip().split("\n")[0]
# 尝试分离工具名和参数
if ":" in tool_info:
tool_name = tool_info.split(":")[0].strip()
tool_input = tool_info.split(":", 1)[1].strip()
else:
tool_name = tool_info.strip()
tool_input = ""
return {
"type": "action",
"tool": tool_name,
"tool_input": tool_input
}
return None
def _extract_final_answer(self, text: str) -> str:
"""提取最终答案."""
for marker in ["Final Answer:", "最终答案:", "final_answer:"]:
if marker in text:
return text.split(marker)[1].strip()
return text.strip()
class ProductionAgent:
"""
生产级Agent实现。
特性:
- 结构化日志记录
- 错误恢复机制
- 执行追踪
- 可配置的迭代限制
"""
def __init__(
self,
llm: ChatOpenAI,
tools: List[BaseTool],
prompt_template: str,
max_iterations: int = 10,
max_execution_time: Optional[float] = None,
early_stopping: bool = True
):
self.llm = llm
self.tools = {tool.name: tool for tool in tools}
self.max_iterations = max_iterations
self.max_execution_time = max_execution_time
self.early_stopping = early_stopping
# 构建prompt
self.prompt = PromptTemplate.from_template(prompt_template)
# 执行统计
self.execution_stats = {
"total_calls": 0,
"successful_calls": 0,
"failed_calls": 0,
"tool_usage": {tool.name: 0 for tool in tools}
}
logger.info(f"Agent初始化完成,工具数: {len(tools)}")
def invoke(self, user_input: str, session_id: str = "default") -> Dict[str, Any]:
"""
执行Agent推理。
Args:
user_input: 用户输入
session_id: 会话ID,用于追踪
Returns:
Dict包含执行结果和元数据
"""
import time
start_time = time.time()
self.execution_stats["total_calls"] += 1
logger.info(f"[Session: {session_id}] 开始执行,输入: {user_input[:100]}")
# 初始化执行上下文
context = ExecutionContext(
user_input=user_input,
max_iterations=self.max_iterations
)
intermediate_steps = [] # 记录中间步骤
try:
# ReAct循环
while context.can_continue():
# 1. 推理阶段
context.state = AgentState.REASONING
reasoning_output = self._reason(context, intermediate_steps)
if reasoning_output["type"] == "finish":
# 任务完成
result = reasoning_output["output"]
context.state = AgentState.COMPLETED
break
elif reasoning_output["type"] == "action":
# 2. 行动阶段
context.state = AgentState.ACTION
tool_name = reasoning_output["tool"]
tool_input = reasoning_output["tool_input"]
if tool_name not in self.tools:
# 未知工具,记录错误
context.error_count += 1
intermediate_steps.append({
"error": f"未知工具: {tool_name}"
})
continue
# 执行工具
context.add_action(reasoning_output.get("action", ""), tool_name, tool_input)
self.execution_stats["tool_usage"][tool_name] += 1
# 3. 观察阶段
context.state = AgentState.OBSERVING
tool = self.tools[tool_name]
tool_result = tool.execute(self._parse_tool_input(tool_input))
observation = self._format_observation(tool_result)
context.observation_history.append(observation)
intermediate_steps.append({
"tool": tool_name,
"input": tool_input,
"output": observation
})
else:
# 解析错误
context.error_count += 1
intermediate_steps.append({
"error": f"推理解析失败: {reasoning_output.get('output', '未知错误')}"
})
# 检查是否超时或达到最大迭代
if context.state not in [AgentState.COMPLETED, AgentState.ERROR]:
logger.warning(f"达到最大迭代次数或超时")
context.state = AgentState.ERROR
except Exception as e:
logger.exception("Agent执行异常")
context.state = AgentState.ERROR
context.metadata["error"] = str(e)
# 统计
execution_time = time.time() - start_time
if context.state == AgentState.COMPLETED:
self.execution_stats["successful_calls"] += 1
else:
self.execution_stats["failed_calls"] += 1
return {
"output": context.observation_history[-1] if context.state == AgentState.COMPLETED else None,
"intermediate_steps": intermediate_steps,
"state": context.state.value,
"execution_time": execution_time,
"stats": self.execution_stats.copy()
}
def _reason(self, context: ExecutionContext, intermediate_steps: List[Dict]) -> Dict[str, Any]:
"""执行推理步骤."""
# 构建prompt上下文
history_text = self._format_history(intermediate_steps)
current_prompt = self.prompt.format(
input=context.user_input,
history=history_text,
tools=self._format_tools_description()
)
# 调用LLM
response = self.llm.invoke(current_prompt)
response_text = response.content if hasattr(response, 'content') else str(response)
logger.debug(f"LLM响应: {response_text[:200]}")
# 解析输出
parser = ReasoningOutputParser()
return parser.parse(response_text)
def _format_history(self, intermediate_steps: List[Dict]) -> str:
"""格式化历史记录用于prompt."""
if not intermediate_steps:
return "暂无历史记录"
lines = []
for step in intermediate_steps:
if "tool" in step:
lines.append(f"行动: 使用{step['tool']}工具,输入: {step['input']}")
lines.append(f"观察: {step['output']}")
elif "error" in step:
lines.append(f"错误: {step['error']}")
return "\n".join(lines)
def _format_tools_description(self) -> str:
"""格式化工具描述用于prompt."""
lines = ["可用工具:"]
for name, tool in self.tools.items():
lines.append(f"- {name}: {tool.description}")
return "\n".join(lines)
def _parse_tool_input(self, tool_input_str: str) -> Dict[str, Any]:
"""解析工具输入参数."""
try:
# 尝试JSON格式
return json.loads(tool_input_str)
except:
# 尝试简单key=value格式
result = {}
for pair in tool_input_str.split(","):
if "=" in pair:
key, value = pair.split("=", 1)
result[key.strip()] = value.strip()
return result if result else {"input": tool_input_str}
def _format_observation(self, tool_result: ToolOutput) -> str:
"""格式化工具结果."""
if tool_result.success:
return f"成功: {tool_result.result}"
else:
return f"失败: {tool_result.error}"
第三阶段:错误处理与恢复策略
生产环境的Agent必须具备强大的错误处理能力。用户的输入是多种多样的,外部API可能不稳定,LLM的输出可能不可预测——我们需要为所有这些情况做好准备。
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
import requests
class ErrorRecoveryManager:
"""
错误恢复管理器。
为Agent提供多层次、针对性的错误恢复策略。
"""
def __init__(self):
self.error_counts = {}
self.error_history = []
def register_error(self, error_type: str, context: Dict[str, Any]) -> None:
"""记录错误."""
if error_type not in self.error_counts:
self.error_counts[error_type] = 0
self.error_counts[error_type] += 1
self.error_history.append({
"type": error_type,
"context": context,
"count": self.error_counts[error_type]
})
logger.warning(f"错误登记: {error_type}, 累计次数: {self.error_counts[error_type]}")
def should_retry(self, error_type: str) -> bool:
"""判断是否应该重试."""
# 不同错误类型有不同的重试阈值
retry_thresholds = {
"network_error": 3,
"timeout": 2,
"rate_limit": 5,
"parse_error": 1
}
threshold = retry_thresholds.get(error_type, 2)
return self.error_counts.get(error_type, 0) < threshold
def get_recovery_strategy(self, error_type: str) -> Optional[str]:
"""获取错误恢复策略."""
strategies = {
"network_error": "RETRY_WITH_BACKOFF",
"timeout": "RETRY_WITH_EXTENDED_TIMEOUT",
"rate_limit": "WAIT_AND_RETRY",
"parse_error": "SIMPLIFY_REQUEST",
"llm_error": "RETRY_WITH_SIMPLER_PROMPT",
"unknown_tool": "CLARIFY_INTENT"
}
return strategies.get(error_type)
def format_error_context(self, error: Exception, context: Dict) -> str:
"""格式化错误上下文用于报告."""
return (
f"错误类型: {type(error).__name__}\n"
f"错误信息: {str(error)}\n"
f"发生位置: {context.get('location', '未知')}\n"
f"相关数据: {context.get('data', {})}"
)
class ResilientToolWrapper:
"""
带错误恢复的工具包装器。
为现有工具添加重试、超时等保护机制。
"""
def __init__(
self,
tool: BaseTool,
max_retries: int = 3,
timeout: float = 30.0,
fallback_value: Any = None
):
self.tool = tool
self.max_retries = max_retries
self.timeout = timeout
self.fallback_value = fallback_value
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((requests.Timeout, requests.ConnectionError))
)
def execute_with_retry(self, input_data: Dict[str, Any]) -> ToolOutput:
"""带重试机制的工具执行."""
return self.tool.execute(input_data)
def execute_safe(self, input_data: Dict[str, Any]) -> ToolOutput:
"""
安全的工具执行,集成所有保护机制。
执行顺序:
1. 尝试带重试的执行
2. 如果全部失败,尝试简化参数重试
3. 如果仍然失败,返回fallback值并记录错误
"""
try:
result = self.execute_with_retry(input_data)
if result.success:
return result
# 成功后仍然失败,尝试简化
simplified_input = self._simplify_input(input_data)
if simplified_input != input_data:
logger.info("尝试简化参数重试...")
simplified_result = self.tool.execute(simplified_input)
if simplified_result.success:
return simplified_result
# 最终失败
return result
except Exception as e:
logger.exception(f"工具 {self.tool.name} 执行失败")
if self.fallback_value is not None:
return ToolOutput(
success=True, # 降级执行为"成功"
result=self.fallback_value,
error=f"使用降级值,原因: {str(e)}",
metadata={"fallback": True}
)
return ToolOutput(
success=False,
error=f"工具执行失败: {str(e)}"
)
def _simplify_input(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"""简化输入参数."""
simplified = input_data.copy()
# 根据不同工具类型进行简化
if self.tool.name == "calculator":
# 移除复杂函数,只保留基本运算
if "expression" in simplified:
expr = simplified["expression"]
# 移除log、sin等复杂函数调用
for func in ["log(", "ln(", "sin(", "cos(", "tan("]:
expr = expr.replace(func, "(")
simplified["expression"] = expr
elif self.tool.name == "web_search":
# 简化搜索查询
if "query" in simplified:
# 只保留前20个字符
simplified["query"] = simplified["query"][:20]
# 减少结果数量
simplified["max_results"] = 3
return simplified
第四阶段:日志与可观测性
在生产环境中,”能运行”只是最低要求,”能追踪、可排查”才是合格标准。让我们为Agent添加完善的日志和监控能力。
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import datetime
import json
class AgentLogger:
"""
Agent专用日志记录器。
特性:
- 分级日志(调试、信息、警告、错误)
- 文件和控制台双输出
- 结构化JSON日志
- 执行链路追踪
"""
def __init__(
self,
log_dir: str = "logs",
log_level: str = "INFO",
max_file_size: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5
):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# 创建logger
self.logger = logging.getLogger("agent")
self.logger.setLevel(getattr(logging, log_level.upper()))
# 清除已有的handlers
self.logger.handlers.clear()
# 文件handler - JSON格式
self._setup_file_handler(max_file_size, backup_count)
# 控制台handler - 彩色格式
self._setup_console_handler()
def _setup_file_handler(self, max_size: int, backup_count: int) -> None:
"""设置文件日志处理器."""
log_file = self.log_dir / f"agent_{datetime.now():%Y%m%d}.log"
file_handler = RotatingFileHandler(
log_file,
maxBytes=max_size,
backupCount=backup_count,
encoding="utf-8"
)
file_handler.setLevel(logging.DEBUG)
# JSON格式化器
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra"):
log_data.update(record.extra)
return json.dumps(log_data, ensure_ascii=False)
file_handler.setFormatter(JSONFormatter())
self.logger.addHandler(file_handler)
def _setup_console_handler(self) -> None:
"""设置控制台日志处理器."""
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 人类可读的格式化器
formatter = logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%H:%M:%S"
)
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)
def log_agent_start(self, session_id: str, user_input: str) -> None:
"""记录Agent启动."""
self.logger.info(
f"Agent执行开始",
extra={"event": "agent_start", "session_id": session_id}
)
def log_reasoning(
self,
session_id: str,
step: int,
thought: str,
action: Optional[str] = None,
action_input: Optional[str] = None
) -> None:
"""记录推理步骤."""
self.logger.info(
f"[Step {step}] 推理: {thought[:100]}",
extra={
"event": "reasoning",
"session_id": session_id,
"step": step,
"thought": thought,
"action": action,
"action_input": action_input
}
)
def log_tool_execution(
self,
session_id: str,
tool_name: str,
input_data: Any,
result: ToolOutput,
execution_time: float
) -> None:
"""记录工具执行."""
log_level = logging.INFO if result.success else logging.ERROR
self.logger.log(
log_level,
f"工具执行: {tool_name} (耗时: {execution_time:.3f}s)",
extra={
"event": "tool_execution",
"session_id": session_id,
"tool_name": tool_name,
"input": input_data,
"success": result.success,
"result": result.result,
"error": result.error,
"execution_time": execution_time
}
)
def log_agent_complete(
self,
session_id: str,
state: str,
total_steps: int,
total_time: float,
output: Any
) -> None:
"""记录Agent完成."""
self.logger.info(
f"Agent执行完成: 状态={state}, 步骤={total_steps}, "
f"耗时={total_time:.3f}s",
extra={
"event": "agent_complete",
"session_id": session_id,
"state": state,
"total_steps": total_steps,
"total_time": total_time,
"output": str(output)[:500]
}
)
def log_error(
self,
session_id: str,
error_type: str,
error_message: str,
context: Dict[str, Any]
) -> None:
"""记录错误."""
self.logger.error(
f"错误发生: {error_type} - {error_message}",
extra={
"event": "error",
"session_id": session_id,
"error_type": error_type,
"error_message": error_message,
"context": context
}
)
class ExecutionTracer:
"""
执行追踪器。
记录完整的执行链路,用于调试和性能分析。
"""
def __init__(self):
self.traces = {}
def start_trace(self, session_id: str) -> str:
"""开始追踪."""
self.traces[session_id] = {
"session_id": session_id,
"start_time": datetime.now(),
"spans": [],
"events": []
}
return session_id
def add_span(
self,
session_id: str,
name: str,
start_time: datetime,
end_time: datetime,
attributes: Dict[str, Any] = None
) -> None:
"""添加追踪跨度."""
if session_id not in self.traces:
return
span = {
"name": name,
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"duration_ms": (end_time - start_time).total_seconds() * 1000,
"attributes": attributes or {}
}
self.traces[session_id]["spans"].append(span)
def add_event(
self,
session_id: str,
name: str,
timestamp: datetime = None,
attributes: Dict[str, Any] = None
) -> None:
"""添加追踪事件."""
if session_id not in self.traces:
return
event = {
"name": name,
"timestamp": (timestamp or datetime.now()).isoformat(),
"attributes": attributes or {}
}
self.traces[session_id]["events"].append(event)
def end_trace(self, session_id: str) -> Dict[str, Any]:
"""结束追踪并返回结果."""
if session_id not in self.traces:
return {}
trace = self.traces[session_id]
trace["end_time"] = datetime.now()
# 计算总耗时
total_duration = (trace["end_time"] - trace["start_time"]).total_seconds() * 1000
trace["total_duration_ms"] = total_duration
# 计算各阶段耗时
span_summary = {}
for span in trace["spans"]:
span_name = span["name"]
if span_name not in span_summary:
span_summary[span_name] = {"count": 0, "total_ms": 0}
span_summary[span_name]["count"] += 1
span_summary[span_name]["total_ms"] += span["duration_ms"]
trace["span_summary"] = span_summary
return trace
def get_trace(self, session_id: str) -> Optional[Dict[str, Any]]:
"""获取追踪记录."""
return self.traces.get(session_id)
实际使用示例
让我们用一个完整的示例来演示如何使用上述组件构建一个实际可用的Agent。
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
import os
# 加载环境变量
load_dotenv()
# 初始化LLM
llm = ChatOpenAI(
model="gpt-3.5-turbo", # 生产环境建议使用gpt-4以获得更稳定的输出
temperature=0.7,
api_key=os.getenv("OPENAI_API_KEY")
)
# 初始化工具
calculator = CalculatorTool()
search_tool = MockSearchTool()
# 定义Agent的prompt模板
REACT_PROMPT = """你是一个智能助手,可以帮助用户解决各种问题。
你可以使用以下工具:
{tools}
历史对话:
{history}
当前任务:{input}
请按照以下格式思考和行动:
思考: 你应该做什么,为什么?
行动: 工具名称
行动输入: 工具的输入参数
观察: 工具返回的结果
如果任务完成,使用以下格式回答:
最终答案: [你的完整回答]
开始执行:
"""
# 创建Agent实例
agent = ProductionAgent(
llm=llm,
tools=[calculator, search_tool],
prompt_template=REACT_PROMPT,
max_iterations=10
)
# 初始化日志器
logger = AgentLogger(log_dir="logs", log_level="INFO")
# 执行示例
def run_agent_example():
"""运行Agent示例."""
session_id = "demo_session_001"
logger.log_agent_start(session_id, "帮我计算2的10次方")
result = agent.invoke("帮我计算2的10次方", session_id=session_id)
logger.log_agent_complete(
session_id,
result["state"],
len(result["intermediate_steps"]),
result["execution_time"],
result["output"]
)
print("\n" + "="*50)
print("执行结果:")
print("="*50)
print(f"最终状态: {result['state']}")
print(f"执行步骤: {len(result['intermediate_steps'])}")
print(f"执行耗时: {result['execution_time']:.3f}s")
print(f"工具使用统计: {result['stats']['tool_usage']}")
print("\n中间步骤:")
for i, step in enumerate(result["intermediate_steps"], 1):
print(f" Step {i}: {step}")
# 运行示例
if __name__ == "__main__":
run_agent_example()
高级特性:让Agent更强大
多Agent协作系统
在复杂任务中,单个Agent的能力往往有限。我们可以设计多Agent协作系统,让不同的Agent专注于不同的子任务,通过协作完成复杂目标。
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class TaskType(Enum):
"""任务类型枚举."""
RESEARCH = "research" # 信息检索
ANALYSIS = "analysis" # 分析研究
WRITING = "writing" # 内容撰写
CODING = "coding" # 代码编写
REVIEW = "review" # 审查审核
@dataclass
class Task:
"""任务定义."""
task_id: str
task_type: TaskType
description: str
input_data: Dict[str, Any]
status: str = "pending"
result: Any = None
assigned_agent: str = None
class MultiAgentOrchestrator:
"""
多Agent编排器。
负责管理多个专业Agent,分配任务,汇总结果。
"""
def __init__(self):
self.agents: Dict[str, ProductionAgent] = {}
self.task_queue: List[Task] = []
self.completed_tasks: List[Task] = []
def register_agent(
self,
agent_id: str,
agent: ProductionAgent,
supported_tasks: List[TaskType]
) -> None:
"""注册一个专业Agent."""
self.agents[agent_id] = {
"agent": agent,
"supported_tasks": supported_tasks
}
logger.info(f"注册Agent: {agent_id}, 支持任务: {[t.value for t in supported_tasks]}")
def dispatch_task(self, task: Task) -> Dict[str, Any]:
"""分发任务给合适的Agent."""
# 查找支持该任务类型的Agent
suitable_agents = [
(agent_id, info) for agent_id, info in self.agents.items()
if task.task_type in info["supported_tasks"]
]
if not suitable_agents:
return {
"success": False,
"error": f"没有Agent支持任务类型: {task.task_type.value}"
}
# 选择第一个合适的Agent(实际场景中可加入负载均衡等逻辑)
agent_id, agent_info = suitable_agents[0]
agent = agent_info["agent"]
task.assigned_agent = agent_id
task.status = "running"
logger.info(f"任务 {task.task_id} 分配给 Agent {agent_id}")
# 执行任务
result = agent.invoke(task.description)
task.status = "completed" if result["state"] == "completed" else "failed"
task.result = result
self.completed_tasks.append(task)
return {
"success": result["state"] == "completed",
"task_id": task.task_id,
"agent_id": agent_id,
"result": result
}
def create_and_run_workflow(
self,
tasks: List[Task]
) -> Dict[str, Any]:
"""创建并执行工作流."""
results = []
for task in tasks:
result = self.dispatch_task(task)
results.append(result)
# 如果某任务失败,可以选择终止或继续
if not result["success"]:
logger.warning(f"任务 {task.task_id} 执行失败,工作流可能受影响")
return {
"total_tasks": len(tasks),
"completed": sum(1 for r in results if r["success"]),
"failed": sum(1 for r in results if not r["success"]),
"task_results": results
}
记忆与上下文管理
长时间运行的Agent需要记忆系统来维护状态和上下文。有效的记忆管理可以让Agent在多轮对话中保持连贯性。
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import json
class ConversationMemory:
"""
对话记忆管理器。
管理短期记忆(当前对话)和长期记忆(累积知识)。
"""
def __init__(
self,
max_short_memory: int = 10,
max_context_tokens: int = 4000
):
self.short_memory: List[Dict[str, str]] = [] # 当前对话
self.long_memory: List[Dict[str, Any]] = [] # 长期记忆
self.max_short_memory = max_short_memory
self.max_context_tokens = max_context_tokens
def add_turn(self, role: str, content: str) -> None:
"""添加一轮对话."""
self.short_memory.append({
"role": role,
"content": content,
"timestamp": datetime.now().isoformat()
})
# 保持短期记忆在限制内
if len(self.short_memory) > self.max_short_memory:
self.short_memory.pop(0)
def get_recent_context(self, num_turns: int = 5) -> str:
"""获取最近的对话上下文."""
recent = self.short_memory[-num_turns:]
formatted = []
for turn in recent:
role_name = "用户" if turn["role"] == "user" else "助手"
formatted.append(f"{role_name}: {turn['content']}")
return "\n".join(formatted)
def add_to_long_memory(
self,
key: str,
value: Any,
importance: int = 1
) -> None:
"""
添加长期记忆。
Args:
key: 记忆的标识键
value: 记忆内容
importance: 重要性等级(1-5)
"""
# 检查是否已存在
for memory in self.long_memory:
if memory["key"] == key:
memory["value"] = value
memory["last_access"] = datetime.now().isoformat()
memory["access_count"] += 1
return
# 添加新记忆
self.long_memory.append({
"key": key,
"value": value,
"created_at": datetime.now().isoformat(),
"last_access": datetime.now().isoformat(),
"access_count": 1,
"importance": importance
})
# 按重要性排序,保留最重要的记忆
self._prune_long_memory()
def recall(self, keyword: str) -> List[Dict[str, Any]]:
"""根据关键词召回长期记忆."""
results = []
keyword_lower = keyword.lower()
for memory in self.long_memory:
if keyword_lower in str(memory["value"]).lower():
memory["last_access"] = datetime.now().isoformat()
results.append(memory)
return results
def _prune_long_memory(self) -> None:
"""修剪长期记忆,保留最重要的."""
# 按重要性和访问频率排序
def sort_key(m):
return (m["importance"], m["access_count"], -len(self.long_memory))
self.long_memory.sort(key=sort_key, reverse=True)
# 保留前100条
if len(self.long_memory) > 100:
self.long_memory = self.long_memory[:100]
def get_memory_summary(self) -> Dict[str, Any]:
"""获取记忆摘要."""
return {
"short_memory_count": len(self.short_memory),
"long_memory_count": len(self.long_memory),
"recent_topics": [
m["key"] for m in self.long_memory[-5:]
]
}
class ContextWindowManager:
"""
上下文窗口管理器。
管理LLM的上下文长度,防止超出限制。
"""
def __init__(
self,
max_tokens: int = 8000,
reserved_tokens: int = 1000
):
self.max_tokens = max_tokens
self.reserved_tokens = reserved_tokens
self.available_tokens = max_tokens - reserved_tokens
def estimate_tokens(self, text: str) -> int:
"""粗略估算token数量(中文约2字符/token,英文约4字符/token)."""
chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
other_chars = len(text) - chinese_chars
return int(chinese_chars / 2 + other_chars / 4)
def truncate_to_fit(
self,
text: str,
additional_tokens: int = 0
) -> str:
"""截断文本以适应上下文窗口."""
budget = self.available_tokens - additional_tokens
estimated = self.estimate_tokens(text)
if estimated <= budget:
return text
# 按比例截断
ratio = budget / estimated
target_length = int(len(text) * ratio)
return text[:target_length] + "...[已截断]"
def build_messages(
self,
system_prompt: str,
history: List[Dict[str, str]],
current_input: str,
max_history_turns: int = 10
) -> List[Dict[str, str]]:
"""构建消息列表,自动管理长度."""
messages = []
# 系统提示
system_tokens = self.estimate_tokens(system_prompt)
messages.append({"role": "system", "content": system_prompt})
# 历史消息(从最近开始,保留足够空间)
remaining_budget = self.available_tokens - system_tokens
current_input_tokens = self.estimate_tokens(current_input)
remaining_budget -= current_input_tokens
included_history = []
for turn in reversed(history[-max_history_turns:]):
turn_tokens = self.estimate_tokens(turn["content"])
if remaining_budget >= turn_tokens:
included_history.insert(0, turn)
remaining_budget -= turn_tokens
else:
break
for turn in included_history:
messages.append({"role": turn["role"], "content": turn["content"]})
# 当前输入
truncated_input = self.truncate_to_fit(current_input, system_tokens)
messages.append({"role": "user", "content": truncated_input})
return messages
常见应用场景
场景一:智能客服系统
class CustomerServiceAgent:
"""
智能客服Agent。
特性:
- FAQ知识库检索
- 订单状态查询
- 问题分类与路由
- 情感分析与满意度跟踪
"""
def __init__(
self,
llm: ChatOpenAI,
knowledge_base: Dict[str, Any],
agent_logger: AgentLogger
):
self.llm = llm
self.knowledge_base = knowledge_base
self.logger = agent_logger
# 初始化工具
self.tools = {
"faq_search": self._create_faq_tool(),
"order_query": self._create_order_tool(),
"sentiment": self._create_sentiment_tool()
}
def _create_faq_tool(self) -> BaseTool:
"""创建FAQ检索工具."""
class FAQSearchInput(ToolInput):
query: str = Field(..., description="用户问题")
class FAQSearchOutput(ToolOutput):
answers: List[str] = Field(default_factory=list)
class FAQSearchTool(BaseTool):
name = "faq_search"
description = "搜索FAQ知识库获取常见问题的标准答案"
input_model = FAQSearchInput
def _run(self, input_data: FAQSearchInput) -> FAQSearchOutput:
query = input_data.query.lower()
results = []
for q, a in self.knowledge_base.get("faq", {}).items():
if any(kw in query for kw in q.split()):
results.append(f"Q: {q}\nA: {a}")
return FAQSearchOutput(
success=True,
result=results,
answers=results
)
return FAQSearchTool()
def handle_customer_query(
self,
customer_id: str,
query: str
) -> Dict[str, Any]:
"""处理客户咨询."""
session_id = f"cs_{customer_id}_{datetime.now():%Y%m%d%H%M%S}"
self.logger.log_agent_start(session_id, query)
# 情感分析
sentiment_result = self._analyze_sentiment(query)
# 根据情感调整响应策略
priority = "normal"
if sentiment_result.get("negative_score", 0) > 0.7:
priority = "high"
query = f"[高优先级] 客户情绪激动: {query}"
# 构建prompt
prompt = self._build_service_prompt(query, sentiment_result)
# 生成回复
response = self.llm.invoke(prompt)
self.logger.log_agent_complete(
session_id, "completed", 1, 0.5, response.content
)
return {
"response": response.content,
"sentiment": sentiment_result,
"priority": priority,
"session_id": session_id
}
def _analyze_sentiment(self, text: str) -> Dict[str, Any]:
"""分析文本情感."""
# 简化实现,实际可用专门的情感分析模型
negative_words = ["不满意", "投诉", "太差", "垃圾", "退款", "退货"]
positive_words = ["谢谢", "满意", "很好", "棒", "优秀"]
text_lower = text.lower()
neg_count = sum(1 for w in negative_words if w in text_lower)
pos_count = sum(1 for w in positive_words if w in text_lower)
total = neg_count + pos_count
if total == 0:
return {"sentiment": "neutral", "negative_score": 0.5}
neg_score = neg_count / total
return {
"sentiment": "negative" if neg_score > 0.5 else "positive",
"negative_score": neg_score,
"positive_score": 1 - neg_score
}
def _build_service_prompt(
self,
query: str,
sentiment: Dict[str, Any]
) -> str:
"""构建服务prompt."""
return f"""你是一个专业的在线客服代表。请用友好、专业的态度回答客户问题。
客户情感分析: {sentiment['sentiment']}
{'注意:客户情绪较为激动,请特别注意语气和态度。' if sentiment['sentiment'] == 'negative' else ''}
客户问题: {query}
请提供有帮助的回复。如果问题超出你的能力范围,请引导客户联系人工客服。
"""
场景二:代码审查助手
class CodeReviewAgent:
"""
代码审查Agent。
自动化代码审查,发现潜在问题和改进建议。
"""
def __init__(self, llm: ChatOpenAI):
self.llm = llm
# 审查规则库
self.review_rules = {
"security": [
"SQL注入风险",
"XSS漏洞",
"敏感信息硬编码",
"不安全的随机数"
],
"performance": [
"循环中的数据库查询",
"大对象重复创建",
"不必要的深度拷贝",
"低效的数据结构"
],
"style": [
"命名不规范",
"过长函数",
"重复代码",
"缺少文档注释"
]
}
def review_code(
self,
code: str,
language: str = "python",
focus_areas: List[str] = None
) -> Dict[str, Any]:
"""审查代码."""
focus_areas = focus_areas or ["security", "performance", "style"]
# 构建审查prompt
prompt = self._build_review_prompt(code, language, focus_areas)
# 获取LLM审查结果
response = self.llm.invoke(prompt)
# 解析结果
review_result = self._parse_review_response(response.content)
# 添加评分
review_result["overall_score"] = self._calculate_score(review_result)
return review_result
def _build_review_prompt(
self,
code: str,
language: str,
focus_areas: List[str]
) -> str:
"""构建审查prompt."""
focus_text = "\n".join([f"- {area}: {', '.join(self.review_rules[area])}"
for area in focus_areas])
return f"""你是一个{language}代码审查专家。请审查以下代码,关注以下方面:
关注领域:
{focus_text}
代码:
```{language}
{code}
请以JSON格式返回审查结果:
{{
“issues”: [
{{
“severity”: “high/medium/low”,
“category”: “问题类别”,
“line”: “行号(如适用)”,
“description”: “问题描述”,
“suggestion”: “修改建议”
}}
],
“summary”: “总体评价”,
“recommendations”: [“改进建议列表”]
}}
“””
def _parse_review_response(self, response: str) -> Dict[str, Any]:
"""解析审查响应."""
# 尝试提取JSON部分
import re
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
# 如果无法解析,返回原始文本
return {
"issues": [],
"summary": response,
"recommendations": [],
"raw_response": response
}
def _calculate_score(self, review_result: Dict[str, Any]) -> float:
"""计算代码评分(满分100)."""
base_score = 100
severity_weights = {
"high": 15,
"medium": 5,
"low": 2
}
for issue in review_result.get("issues", []):
severity = issue.get("severity", "low").lower()
deduction = severity_weights.get(severity, 2)
base_score -= deduction
return max(0, min(100, base_score))
---
最佳实践与优化建议
性能优化建议
在实际生产环境中,性能优化是永恒的话题。以下是针对Agent系统的一些关键优化策略:
**LLM调用优化**:LLM调用通常是整个Agent流程中最耗时的环节。通过缓存相似的请求、使用流式输出、批量处理等手段,可以显著降低延迟和成本。另外,根据任务复杂度选择合适的模型也很重要——简单查询用GPT-3.5,复杂推理用GPT-4,可以实现性价比的最优平衡。
**工具执行优化**:对于IO密集型的工具调用(如API请求),使用异步执行可以大幅提升吞吐量。可以通过concurrent.futures或asyncio实现并发工具调用。同时,添加结果缓存机制可以避免重复调用。
```python
import asyncio
from functools import lru_cache
from typing import Any
class OptimizedAgent:
"""优化后的Agent实现."""
def __init__(self, *args, **kwargs):
# ... 初始化代码 ...
self._cache = {}
self._semaphore = asyncio.Semaphore(5) # 限制并发数
async def execute_tools_async(
self,
tool_calls: List[Dict[str, Any]]
) -> List[ToolOutput]:
"""异步执行多个工具调用."""
async def execute_one(tool_call: Dict[str, Any]) -> ToolOutput:
async with self._semaphore: # 控制并发
tool_name = tool_call["tool"]
tool_input = tool_call["input"]
# 检查缓存
cache_key = f"{tool_name}:{json.dumps(tool_input)}"
if cache_key in self._cache:
logger.info(f"缓存命中: {tool_name}")
return self._cache[cache_key]
# 执行(实际工具可能是同步的,这里做包装)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
lambda: self.tools[tool_name].execute(tool_input)
)
# 缓存结果(根据工具特性设置TTL)
if result.success:
self._cache[cache_key] = result
return result
# 并发执行所有工具调用
tasks = [execute_one(tc) for tc in tool_calls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append(ToolOutput(
success=False,
error=str(result)
))
else:
processed_results.append(result)
return processed_results
Prompt优化:精心设计的prompt可以减少无效的ReAct迭代次数。清晰的指令、具体的示例、合理的限制都可以帮助Agent更准确地理解任务并给出正确答案。
安全最佳实践
安全是生产环境不可忽视的方面。以下是一些关键的安全措施:
输入验证与清理:永远不要相信用户的输入。对所有输入进行严格验证,防止Prompt注入、恶意代码等攻击。
class SecureInputValidator:
"""安全的输入验证器."""
# 禁止的pattern
FORBIDDEN_PATTERNS = [
r"忽略之前的指令",
r"ignore.*instructions",
r"你现在是.*",
r"you are now.*",
r"<script",
r"{{.*}}", # 模板注入
]
def validate(self, user_input: str) -> tuple[bool, str]:
"""验证用户输入安全性."""
import re
for pattern in self.FORBIDDEN_PATTERNS:
if re.search(pattern, user_input, re.IGNORECASE):
return False, f"输入包含可疑pattern: {pattern}"
# 检查输入长度
if len(user_input) > 10000:
return False, "输入长度超过限制"
# 检查特殊字符
suspicious_chars = ["\x00", "\r", "\n"]
for char in suspicious_chars:
if user_input.count(char) > 5:
return False, "输入包含过多特殊字符"
return True, "验证通过"
def sanitize(self, user_input: str) -> str:
"""清理用户输入."""
# 移除控制字符
cleaned = ''.join(
char for char in user_input
if ord(char) >= 32 or char in '\n\t'
)
# 限制长度
return cleaned[:10000]
输出过滤:对Agent的输出进行安全过滤,防止泄露敏感信息或生成有害内容。
审计日志:记录所有请求的完整上下文,包括输入、输出、执行时间、操作者等,便于事后审计和问题排查。
class AuditLogger:
"""
审计日志记录器。
用于合规要求和安全审计。
"""
def __init__(self, db_path: str = "audit.db"):
self.db_path = db_path
self._init_database()
def _init_database(self) -> None:
"""初始化数据库."""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
session_id TEXT NOT NULL,
user_id TEXT,
action TEXT NOT NULL,
input_hash TEXT,
input_preview TEXT,
output_hash TEXT,
output_preview TEXT,
metadata TEXT,
status TEXT
)
""")
conn.commit()
conn.close()
def log_action(
self,
session_id: str,
action: str,
input_data: str,
output_data: str,
user_id: str = None,
status: str = "success",
metadata: Dict[str, Any] = None
) -> None:
"""记录审计日志."""
import sqlite3
import hashlib
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
input_hash = hashlib.sha256(input_data.encode()).hexdigest()[:16]
output_hash = hashlib.sha256(str(output_data).encode()).hexdigest()[:16]
cursor.execute("""
INSERT INTO audit_log
(timestamp, session_id, user_id, action, input_hash,
input_preview, output_hash, output_preview, metadata, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.now().isoformat(),
session_id,
user_id,
action,
input_hash,
input_data[:200] if len(input_data) > 200 else input_data,
output_hash,
str(output_data)[:200] if len(str(output_data)) > 200 else str(output_data),
json.dumps(metadata) if metadata else None,
status
))
conn.commit()
conn.close()
总结与资源链接
通过本文的讲解,你应该已经对agents-towards-production项目有了全面的理解。这个项目的核心价值在于它提供了一套从”Demo”到”Production”的完整方法论,涵盖了:
在架构设计层面,它倡导模块化、可测试的Agent结构,让复杂的Agent逻辑变得可维护。在可靠性保障层面,它提供了多层级的错误处理和恢复策略,确保Agent在各种异常情况下都能体面地降级。在可观测性层面,它实现了完整的日志、追踪和监控能力,让Agent的决策过程变得透明可查。在性能优化层面,它展示了批处理、缓存、异步执行等技术的应用,帮助你在真实负载下保持良好的响应速度。
需要强调的是,这些实践不是教条,而是经过生产环境验证的经验。在实际应用中,你需要根据具体场景进行取舍和调整。例如,小型项目的监控需求可能不需要那么复杂,而大型分布式系统的容错设计则需要更加严谨。
相关资源链接:
- 项目主页:https://github.com/NirDiamant/agents-towards-production
- LangChain官方文档:https://python.langchain.com/docs/get_started/introduction
- OpenAI API文档:https://platform.openai.com/docs/api-reference
推荐的下一步学习方向:
如果你想进一步提升Agent开发能力,可以深入研究以下领域:强化学习与Agent的结合(如RLHF在Agent优化中的应用)、多模态Agent(处理图像、音频等非文本输入)、Agent的隐私与安全(联邦学习、差分隐私在Agent中的应用)、以及Agent的评估与测试(如何科学地衡量Agent的性能)。
AI Agent的生态正在快速发展,今天的最佳实践可能明天就需要更新。保持学习的习惯,关注最新的研究成果和开源项目,才能在这个领域持续进步。
祝你在Agent开发的道路上探索愉快!
评论区