从”无法触达”到”全域连接”:Agent-Reach如何让AI智能体真正打通所有工具与平台
一、项目概述:为什么这个项目值得关注
1.1 当前的AI Agent困境
当我们谈论AI Agent时,一个核心问题始终困扰着开发者和技术爱好者:“我的AI助手无法与外部世界真正交互”。想象一下,你构建了一个看似强大的AI Agent,它可以回答问题、生成内容,但它却无法帮你发送邮件、无法查询实时数据、无法操作你的本地文件、无法调用第三方API——这样的Agent真的有实用价值吗?
大多数现有的AI Agent框架都存在”信息孤岛”问题:模型能力强大,但缺乏与外部系统通信的桥梁。就像一个天才被锁在房间里,无法获取外界信息,无法执行任何实际任务。
1.2 Agent-Reach的破局之道
Agent-Reach(项目地址:https://github.com/Panniantong/Agent-Reach)正是为解决这一痛点而生的开源项目。它的核心理念可以用一句话概括:让AI Agent具备真正的”触达”能力,能够跨越系统边界、操作任意工具、访问任意数据源。
这个项目代表了AI Agent发展的一个重要方向:不是让模型本身变得更强大,而是让模型与外部世界的连接变得更加顺畅和可靠。它提供了一套完整的方法论和工具集,帮助开发者快速构建能够真正”行动”的AI Agent。
1.3 项目核心价值
┌─────────────────────────────────────────────────────────────┐
│ Agent-Reach 核心价值 │
├─────────────────────────────────────────────────────────────┤
│ 🔗 工具集成 │ 标准化接口,快速接入数百种工具和服务 │
│ 🌐 平台覆盖 │ 支持Web、桌面应用、API、本地系统等全平台 │
│ 🔄 任务编排 │ 智能调度多步骤复杂任务,自动处理依赖关系 │
│ 🛡️ 安全控制 │ 细粒度权限管理,防止Agent执行危险操作 │
│ 📊 可观测性 │ 完整的日志和监控,追踪每个操作的执行过程 │
└─────────────────────────────────────────────────────────────┘
对于希望构建实用AI应用的开发者而言,Agent-Reach提供了三条核心优势:
第一,降低接入成本。 传统方式下,将AI模型与外部系统对接需要编写大量胶水代码,理解各种API的细节。Agent-Reach提供了统一的抽象层,让这一过程变得简单可控。
第二,提升可靠性。 AI Agent在执行任务时可能出错,特别是涉及外部系统调用时。Agent-Reach内置了错误处理、重试机制和回滚策略,大幅提升了系统的稳定性。
第三,保障安全性。 当我们赋予AI Agent操作外部系统的能力时,安全性成为首要考量。Agent-Reach采用了沙箱隔离、权限控制、操作审计等多重安全机制,确保AI的行为在可控范围内。
二、环境搭建:快速启动Agent-Reach
2.1 前置要求
在开始之前,确保你的开发环境满足以下要求:
环境要求检查清单:
├── Python 版本:3.9 或更高
├── 内存:建议 8GB 以上(运行复杂Agent任务)
├── 磁盘空间:至少 2GB 可用空间
├── 网络:需要访问外部API(部分功能需要代理)
└── 操作系统:支持 Windows、macOS、Linux
2.2 安装步骤
第一步:创建虚拟环境(强烈推荐)
为避免依赖冲突,建议在独立的环境中安装Agent-Reach:
# 创建新的虚拟环境
python -m venv agent-reach-env
# 激活虚拟环境(Windows系统)
agent-reach-env\Scripts\activate
# 激活虚拟环境(macOS/Linux系统)
source agent-reach-env/bin/activate
# 确认激活成功,命令行前会出现环境名称
# (agent-reach-env) $
第二步:安装核心依赖
# 使用pip安装Agent-Reach核心包
pip install agent-reach
# 如果需要使用额外的连接器,可以安装扩展包
pip install agent-reach[connectors] # 包含常用连接器
pip install agent-reach[web] # Web相关功能
pip install agent-reach[desktop] # 桌面应用控制功能
pip install agent-reach[all] # 安装所有功能
# 如果安装速度较慢,可以使用国内镜像源
pip install agent-reach -i https://pypi.tuna.tsinghua.edu.cn/simple
第三步:验证安装
# 创建一个简单的验证脚本
# 文件名:verify_installation.py
import agent_reach
# 打印版本信息
print("Agent-Reach 版本:", agent_reach.__version__)
# 检查核心模块是否可用
print("\n核心模块检查:")
print("- Core 模块:", hasattr(agent_reach, 'core'))
print("- Connector 模块:", hasattr(agent_reach, 'connectors'))
print("- Monitor 模块:", hasattr(agent_reach, 'monitor'))
# 运行基础连接测试
try:
from agent_reach.core import AgentReach
test_agent = AgentReach()
print("\n✓ Agent-Reach 安装成功!")
except Exception as e:
print(f"\n✗ 安装验证失败: {e}")
运行验证脚本:
python verify_installation.py
如果看到”Agent-Reach 安装成功!”的消息,说明环境搭建完成。
2.3 配置文件的创建与初始化
Agent-Reach使用YAML格式的配置文件来管理各种设置。让我们创建一个基础配置文件:
# 文件名:agent_reach_config.yaml
# 保存位置:项目根目录或用户目录 ~/.agent_reach/config.yaml
# ============ 基础配置 ============
version: "1.0"
project_name: "my-first-agent"
# ============ Agent 配置 ============
agent:
name: "MyAssistant"
description: "我的第一个AI助手"
# 选择后端模型
model:
provider: "openai" # 可选:openai, anthropic, local, huggingface
model_name: "gpt-4"
api_key: "${OPENAI_API_KEY}" # 使用环境变量
# 如果使用本地模型
# provider: "local"
# model_path: "./models/llama-7b"
# ============ 连接器配置 ============
connectors:
# 启用Web搜索连接器
web_search:
enabled: true
default_engine: "duckduckgo" # 可选:google, bing, duckduckgo
rate_limit: 10 # 每分钟请求数限制
# 启用文件系统连接器
file_system:
enabled: true
allowed_paths:
- "./workspace"
- "/tmp/agent_workspace"
max_file_size: "10MB"
# 启用API连接器
api:
enabled: true
timeout: 30
retry_count: 3
# ============ 安全配置 ============
security:
# 操作白名单
allowed_operations:
- "read"
- "search"
- "write"
# 禁止的操作
forbidden_operations:
- "delete_system"
- "execute_remote"
# 敏感操作需要确认
confirm_sensitive: true
# ============ 日志配置 ============
logging:
level: "INFO" # DEBUG, INFO, WARNING, ERROR
log_file: "./logs/agent_reach.log"
max_file_size: "10MB"
backup_count: 5
初始化Agent-Reach时指定配置文件:
# 文件名:init_agent.py
from agent_reach import AgentReach
# 方式一:使用默认配置
agent = AgentReach()
# 方式二:指定配置文件路径
agent = AgentReach(config_path="./agent_reach_config.yaml")
# 方式三:直接传递配置字典
config = {
"agent": {
"name": "TestAgent",
"model": {
"provider": "openai",
"model_name": "gpt-3.5-turbo",
"api_key": "your-api-key"
}
}
}
agent = AgentReach(config=config)
print("Agent初始化完成!")
三、核心功能详解
3.1 Agent核心架构
Agent-Reach采用了模块化的核心架构,理解这个架构对于深入使用项目至关重要:
┌─────────────────────────────────────────────────────────────────┐
│ Agent-Reach 架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ │
│ │ User │ │
│ │ Input │ │
│ └──────┬───────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Intent │────▶│ Planner │ │
│ │ Parser │ │ Module │ │
│ └──────────────┘ └──────┬───────┘ │
│ │ │
│ ┌────────────────────┼────────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Executor │ │ Executor │ │ Executor │ │
│ │ (Connector) │ │ (Connector) │ │ (Connector) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Web Search │ │ File System │ │ APIs │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Memory Module │ │
│ │ (上下文管理 / 历史记录 / 知识检索) │ │
│ └──────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
意图解析器(Intent Parser)负责理解用户的真实需求。它不仅仅解析字面意思,还会结合上下文和对话历史,推断用户的深层意图。例如,当用户说”帮我查一下天气”,意图解析器会识别出这是一个信息查询请求,并提取出关键参数(地点、时间等)。
规划模块(Planner)是Agent的”大脑”。它根据解析出的意图,规划出一系列需要执行的操作序列。对于复杂任务,规划模块会将大任务分解为小步骤,并确定步骤之间的依赖关系和执行顺序。
执行器(Executor)负责具体操作的执行。每个执行器对应一种连接器(Connector),通过标准化的接口与外部系统交互。Agent-Reach内置了多种常用连接器,同时也支持自定义连接器。
记忆模块(Memory)管理Agent的上下文信息。它包括对话历史、提取的关键信息、用户偏好等。良好的记忆管理是实现连续对话的基础。
3.2 连接器系统
连接器是Agent-Reach与外部世界交互的桥梁。系统提供了丰富的内置连接器,涵盖了常见的应用场景:
网络搜索连接器
from agent_reach.connectors import WebSearchConnector
# 创建搜索连接器
search_connector = WebSearchConnector(
engine="duckduckgo", # 选择搜索引擎
language="zh-CN", # 设置搜索语言
region="cn" # 设置搜索区域
)
# 执行搜索
results = search_connector.search(
query="Python异步编程最佳实践",
max_results=10
)
# 访问搜索结果
for result in results:
print(f"标题: {result.title}")
print(f"链接: {result.url}")
print(f"摘要: {result.snippet}")
print("---")
文件系统连接器
from agent_reach.connectors import FileSystemConnector
# 创建文件系统连接器
fs_connector = FileSystemConnector(
base_path="./workspace",
allowed_extensions=[".txt", ".md", ".py", ".json", ".yaml"],
max_file_size="5MB"
)
# 读取文件
content = fs_connector.read("documents/readme.md")
print(content)
# 写入文件
fs_connector.write(
"output/summary.txt",
"这是自动生成的分析报告内容..."
)
# 列出目录
files = fs_connector.list_directory("documents")
for f in files:
print(f"- {f.name} ({f.size} bytes)")
# 搜索文件
py_files = fs_connector.search_files(
pattern="*.py",
recursive=True
)
API连接器
from agent_reach.connectors import APIConnector
# 创建API连接器
api_connector = APIConnector(
base_url="https://api.example.com",
timeout=30,
headers={
"Authorization": "Bearer your-token-here",
"Content-Type": "application/json"
}
)
# GET请求
response = api_connector.get("/users/123")
user_data = response.json()
# POST请求
new_data = {
"name": "张三",
"email": "zhangsan@example.com"
}
response = api_connector.post("/users", json_data=new_data)
# 带参数查询
response = api_connector.get(
"/search",
params={"q": "关键词", "limit": 20}
)
3.3 任务编排系统
对于复杂的多步骤任务,Agent-Reach提供了强大的任务编排功能:
from agent_reach import TaskOrchestrator
# 创建任务编排器
orchestrator = TaskOrchestrator()
# 定义任务流程
workflow = orchestrator.create_workflow("research_task")
# 添加任务节点
workflow.add_task(
task_id="search_papers",
connector="web_search",
params={"query": "machine learning recent advances"},
output_key="papers"
)
workflow.add_task(
task_id="filter_papers",
connector="builtin", # 内置处理任务
operation="filter",
params={
"source": "papers", # 引用上一个任务的输出
"condition": "year >= 2023"
},
output_key="recent_papers"
)
workflow.add_task(
task_id="summarize",
connector="builtin",
operation="transform",
params={
"source": "recent_papers",
"operation": "summarize",
"max_length": 200
},
output_key="summary",
depends_on=["filter_papers"] # 明确依赖关系
)
workflow.add_task(
task_id="save_report",
connector="file_system",
params={
"operation": "write",
"path": "report.md",
"content": "outputs.summary" # 引用任务输出
},
depends_on=["summarize"] # 等待summarize完成后执行
)
# 执行工作流
result = orchestrator.execute(workflow)
# 获取执行结果
print("执行状态:", result.status)
print("报告已保存至:", result.outputs.get("report_path"))
3.4 记忆与上下文管理
Agent-Reach的记忆模块支持多种存储后端,便于在不同场景下选择合适的方案:
from agent_reach.memory import ConversationMemory, KnowledgeBase
# 创建对话记忆
conversation = ConversationMemory(
max_history=50, # 保留最近50轮对话
summary_threshold=20 # 对话超过20轮时自动生成摘要
)
# 添加用户消息
conversation.add_user_message("帮我分析一下明天的天气")
conversation.add_user_message("顺便查一下穿衣建议")
# 添加助手回复
conversation.add_assistant_message(
"明天天气晴朗,气温15-25度,适宜户外活动。"
)
# 获取对话历史(支持自动摘要)
history = conversation.get_history(include_summary=True)
for item in history:
print(f"{item.role}: {item.content}")
# 创建知识库(支持向量检索)
kb = KnowledgeBase(
storage="vector", # 使用向量数据库
embed_model="text-embedding-ada-002"
)
# 添加知识条目
kb.add(
text="Python是一种高级编程语言,由Guido van Rossum创建。",
metadata={"source": "wiki", "category": "programming"}
)
kb.add(
text="异步编程允许程序在等待I/O操作时继续执行其他任务。",
metadata={"source": "tutorial", "category": "python"}
)
# 语义检索
results = kb.search("谁创造了Python?", top_k=3)
for r in results:
print(f"相关度: {r.score:.2f}")
print(f"内容: {r.text}")
四、实战教程:构建一个多功能AI助手
4.1 项目目标
在这一部分,我们将从头开始构建一个多功能AI助手,它能够:
项目目标清单:
├── 📰 实时信息查询:搜索新闻、查询天气、获取股票行情
├── 📁 文件管理:读取、创建、搜索本地文件
├── 📧 邮件处理:读取和发送邮件(通过IMAP/SMTP)
├── 📊 数据分析:读取CSV/Excel文件并生成简单报告
└── 🔄 任务编排:将多个操作组合成自动化工作流
4.2 第一步:创建Agent实例
# 文件名:my_assistant.py
import os
from agent_reach import AgentReach
# 设置API密钥(建议使用环境变量)
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# 初始化Agent
assistant = AgentReach(
name="全能助手",
description="一个可以帮助你完成多种任务的多功能AI助手",
config_path="./agent_reach_config.yaml"
)
# 配置启用的连接器
assistant.enable_connector("web_search")
assistant.enable_connector("file_system")
assistant.enable_connector("email")
assistant.enable_connector("data_analysis")
print("✓ Agent初始化完成!")
4.3 第二步:实现实时信息查询功能
让我们为Agent添加网络搜索能力:
# 文件名:search_capability.py
from agent_reach.connectors import WebSearchConnector
from agent_reach.core import Tool
# 创建搜索工具
web_search = WebSearchConnector(
engine="duckduckgo",
language="zh-CN",
rate_limit=5 # 避免请求过于频繁
)
# 定义搜索工具的描述(Agent据此决定何时调用)
search_tool = Tool(
name="web_search",
description="搜索互联网获取最新信息",
parameters={
"query": {
"type": "string",
"description": "搜索关键词",
"required": True
},
"max_results": {
"type": "integer",
"description": "返回结果数量,默认5条",
"default": 5
}
},
handler=web_search
)
# 将工具注册到Agent
assistant.register_tool(search_tool)
# 封装便捷的搜索方法
def search(query: str, max_results: int = 5):
"""
执行网络搜索的便捷方法
参数:
query: 搜索关键词
max_results: 最大结果数
返回:
搜索结果列表
"""
return web_search.search(query=query, max_results=max_results)
# 使用示例
if __name__ == "__main__":
# 搜索最新新闻
news = search("人工智能最新进展 2024", max_results=3)
print("=== 最新AI新闻 ===")
for i, item in enumerate(news, 1):
print(f"{i}. {item.title}")
print(f" 来源: {item.source}")
print(f" 链接: {item.url}")
print()
4.4 第三步:实现文件管理功能
创建一个完整的文件管理模块:
# 文件名:file_manager.py
import os
from datetime import datetime
from agent_reach.connectors import FileSystemConnector
from agent_reach.core import Tool
class FileManager:
"""文件管理器类"""
def __init__(self, workspace_path: str = "./workspace"):
"""
初始化文件管理器
参数:
workspace_path: 工作目录路径
"""
self.connector = FileSystemConnector(
base_path=workspace_path,
allowed_extensions=[
".txt", ".md", ".py", ".json",
".yaml", ".yml", ".csv", ".xlsx"
],
max_file_size="50MB"
)
# 确保工作目录存在
os.makedirs(workspace_path, exist_ok=True)
def create_tool(self) -> Tool:
"""创建文件管理工具"""
return Tool(
name="file_manager",
description="管理本地文件和目录",
parameters={
"operation": {
"type": "string",
"description": "操作类型:read, write, list, search, info",
"required": True,
"enum": ["read", "write", "list", "search", "info"]
},
"path": {
"type": "string",
"description": "文件或目录路径",
"required": True
},
"content": {
"type": "string",
"description": "写入文件的内容(仅write操作需要)",
"required": False
},
"pattern": {
"type": "string",
"description": "搜索模式(仅search操作需要)",
"required": False
}
},
handler=self._handle_operation
)
def _handle_operation(self, operation: str, path: str, **kwargs):
"""处理文件操作"""
operations = {
"read": self._read_file,
"write": self._write_file,
"list": self._list_directory,
"search": self._search_files,
"info": self._get_file_info
}
if operation not in operations:
raise ValueError(f"不支持的操作: {operation}")
return operations[operation](path, **kwargs)
def _read_file(self, path: str, **kwargs) -> dict:
"""读取文件内容"""
content = self.connector.read(path)
return {
"status": "success",
"path": path,
"content": content,
"size": len(content),
"lines": len(content.split('\n'))
}
def _write_file(self, path: str, content: str = "", **kwargs) -> dict:
"""写入文件内容"""
self.connector.write(path, content)
return {
"status": "success",
"path": path,
"bytes_written": len(content.encode('utf-8'))
}
def _list_directory(self, path: str, **kwargs) -> dict:
"""列出目录内容"""
items = self.connector.list_directory(path)
return {
"status": "success",
"path": path,
"items": [
{
"name": item.name,
"type": "directory" if item.is_dir else "file",
"size": item.size
}
for item in items
],
"total": len(items)
}
def _search_files(self, path: str, pattern: str = "*", **kwargs) -> dict:
"""搜索文件"""
files = self.connector.search_files(path, pattern, recursive=True)
return {
"status": "success",
"pattern": pattern,
"matches": [f.name for f in files],
"count": len(files)
}
def _get_file_info(self, path: str, **kwargs) -> dict:
"""获取文件信息"""
info = self.connector.get_info(path)
return {
"status": "success",
"name": info.name,
"size": info.size,
"created": info.created_time,
"modified": info.modified_time,
"extension": info.extension
}
def create_notes(self, title: str, content: str) -> dict:
"""
便捷方法:创建笔记
参数:
title: 笔记标题
content: 笔记内容
返回:
创建结果
"""
# 生成安全的文件名
safe_title = "".join(c if c.isalnum() or c in (' ', '-', '_') else '_' for c in title)
filename = f"notes/{safe_title}_{datetime.now().strftime('%Y%m%d')}.md"
# 添加元信息
full_content = f"""---
title: {title}
created: {datetime.now().isoformat()}
tags: []
---
# {title}
{content}
"""
return self._write_file(filename, full_content)
# 使用示例
if __name__ == "__main__":
fm = FileManager("./workspace")
# 注册工具
assistant.register_tool(fm.create_tool())
# 创建笔记
result = fm.create_notes(
title="会议纪要",
content="讨论了项目进度和下一步计划..."
)
print(f"笔记创建结果: {result}")
4.5 第四步:实现数据分析功能
为Agent添加读取和分析数据文件的能力:
# 文件名:data_analysis.py
import pandas as pd
import json
from io import StringIO
from agent_reach.connectors import FileSystemConnector
from agent_reach.core import Tool
class DataAnalyzer:
"""数据分析器类"""
def __init__(self, workspace_path: str = "./workspace"):
"""初始化数据分析器"""
self.fs = FileSystemConnector(base_path=workspace_path)
self.supported_formats = [".csv", ".xlsx", ".json", ".txt"]
def create_tool(self) -> Tool:
"""创建数据分析工具"""
return Tool(
name="data_analysis",
description="读取和分析数据文件",
parameters={
"operation": {
"type": "string",
"description": "操作类型",
"required": True,
"enum": ["load", "summary", "filter", "aggregate", "export"]
},
"file_path": {
"type": "string",
"description": "数据文件路径",
"required": True
},
"options": {
"type": "object",
"description": "操作选项",
"required": False
}
},
handler=self._handle_operation
)
def _handle_operation(self, operation: str, file_path: str, **kwargs):
"""处理数据分析操作"""
operations = {
"load": self._load_data,
"summary": self._generate_summary,
"filter": self._filter_data,
"aggregate": self._aggregate_data,
"export": self._export_data
}
if operation not in operations:
raise ValueError(f"不支持的操作: {operation}")
return operations[operation](file_path, **kwargs)
def _load_data(self, file_path: str, **kwargs) -> dict:
"""加载数据文件"""
content = self.fs.read(file_path)
ext = file_path.lower().split('.')[-1]
if ext == 'csv':
df = pd.read_csv(StringIO(content))
elif ext in ['xlsx', 'xls']:
# 对于Excel文件,需要先保存再读取
df = pd.read_excel(file_path)
elif ext == 'json':
data = json.loads(content)
if isinstance(data, list):
df = pd.DataFrame(data)
else:
return {"status": "success", "data": data, "type": "object"}
else:
return {
"status": "success",
"content": content[:1000], # 限制返回长度
"type": "text"
}
return {
"status": "success",
"rows": len(df),
"columns": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()}
}
def _generate_summary(self, file_path: str, **kwargs) -> dict:
"""生成数据摘要"""
content = self.fs.read(file_path)
df = pd.read_csv(StringIO(content))
# 计算基本统计信息
numeric_cols = df.select_dtypes(include=['number']).columns
summary = {
"total_rows": len(df),
"total_columns": len(df.columns),
"column_names": list(df.columns),
"numeric_columns": list(numeric_cols),
"missing_values": df.isnull().sum().to_dict(),
"statistics": {}
}
# 对数值列计算统计量
for col in numeric_cols:
summary["statistics"][col] = {
"mean": float(df[col].mean()),
"median": float(df[col].median()),
"std": float(df[col].std()),
"min": float(df[col].min()),
"max": float(df[col].max())
}
# 对分类列计算频次
categorical_cols = df.select_dtypes(include=['object']).columns
summary["top_values"] = {}
for col in categorical_cols:
top5 = df[col].value_counts().head(5).to_dict()
summary["top_values"][col] = top5
return summary
def _filter_data(self, file_path: str, **kwargs) -> dict:
"""筛选数据"""
options = kwargs.get('options', {})
column = options.get('column')
value = options.get('value')
operator = options.get('operator', '==')
content = self.fs.read(file_path)
df = pd.read_csv(StringIO(content))
if column not in df.columns:
return {"status": "error", "message": f"列 {column} 不存在"}
# 执行筛选
if operator == '==':
filtered = df[df[column] == value]
elif operator == '!=':
filtered = df[df[column] != value]
elif operator == '>':
filtered = df[df[column] > value]
elif operator == '<':
filtered = df[df[column] < value]
elif operator == 'contains':
filtered = df[df[column].str.contains(value, na=False)]
else:
return {"status": "error", "message": f"不支持的操作符: {operator}"}
return {
"status": "success",
"total_rows": len(df),
"filtered_rows": len(filtered),
"data": filtered.head(100).to_dict('records') # 限制返回行数
}
def _aggregate_data(self, file_path: str, **kwargs) -> dict:
"""数据聚合分析"""
options = kwargs.get('options', {})
group_by = options.get('group_by')
agg_col = options.get('agg_column')
agg_func = options.get('agg_function', 'mean')
content = self.fs.read(file_path)
df = pd.read_csv(StringIO(content))
if group_by not in df.columns:
return {"status": "error", "message": f"分组列 {group_by} 不存在"}
# 执行聚合
if agg_col and agg_col in df.columns:
result = df.groupby(group_by)[agg_col].agg(agg_func)
else:
result = df.groupby(group_by).size()
return {
"status": "success",
"group_by": group_by,
"aggregation": result.to_dict()
}
def _export_data(self, file_path: str, **kwargs) -> dict:
"""导出数据"""
options = kwargs.get('options', {})
output_format = options.get('format', 'csv')
output_path = options.get('output_path')
if not output_path:
return {"status": "error", "message": "必须指定输出路径"}
content = self.fs.read(file_path)
df = pd.read_csv(StringIO(content))
# 导出
if output_format == 'csv':
df.to_csv(output_path, index=False)
elif output_format == 'json':
df.to_json(output_path, orient='records', force_ascii=False, indent=2)
elif output_format == 'excel':
df.to_excel(output_path, index=False)
return {
"status": "success",
"output_path": output_path,
"rows_exported": len(df)
}
# 使用示例
if __name__ == "__main__":
analyzer = DataAnalyzer("./workspace")
# 加载数据
result = analyzer._load_data("data/sales.csv")
print("数据加载结果:", result)
# 生成摘要
summary = analyzer._generate_summary("data/sales.csv")
print("\n数据摘要:")
print(f"- 总行数: {summary['total_rows']}")
print(f"- 总列数: {summary['total_columns']}")
print(f"- 数值列统计:")
for col, stats in summary['statistics'].items():
print(f" {col}: 均值={stats['mean']:.2f}, 最大={stats['max']:.2f}")
4.6 第五步:集成所有功能
现在让我们把所有组件整合在一起,创建一个完整的多功能助手:
# 文件名:full_assistant.py
import os
from agent_reach import AgentReach
from agent_reach.memory import ConversationMemory
# 导入自定义模块
from search_capability import search
from file_manager import FileManager
from data_analysis import DataAnalyzer
class MultiFunctionalAssistant:
"""多功能AI助手主类"""
def __init__(self, config_path: str = None):
"""
初始化多功能助手
参数:
config_path: 配置文件路径
"""
# 初始化核心Agent
self.agent = AgentReach(
name="全能助手",
description="帮助用户完成信息查询、文件管理、数据分析等任务",
config_path=config_path
)
# 初始化组件
self.memory = ConversationMemory(max_history=100)
self.file_manager = FileManager("./workspace")
self.data_analyzer = DataAnalyzer("./workspace")
# 注册所有工具
self._register_tools()
def _register_tools(self):
"""注册所有可用工具"""
# 注册文件管理工具
self.agent.register_tool(self.file_manager.create_tool())
# 注册数据分析工具
self.agent.register_tool(self.data_analyzer.create_tool())
print("✓ 所有工具注册完成")
def process(self, user_input: str) -> str:
"""
处理用户输入
参数:
user_input: 用户的自然语言输入
返回:
Agent的响应
"""
# 保存用户消息
self.memory.add_user_message(user_input)
# 判断用户意图并路由
response = self._route_request(user_input)
# 保存助手回复
self.memory.add_assistant_message(response)
return response
def _route_request(self, user_input: str) -> str:
"""根据用户输入路由到相应处理函数"""
user_lower = user_input.lower()
# 搜索请求
if any(kw in user_lower for kw in ['搜索', '查询', '查找', '找一下']):
return self._handle_search(user_input)
# 文件操作请求
if any(kw in user_lower for kw in ['读取', '打开', '查看文件']):
return self._handle_file_read(user_input)
if any(kw in user_lower for kw in ['写入', '保存', '创建文件']):
return self._handle_file_write(user_input)
if any(kw in user_lower for kw in ['列出', '目录', '文件夹']):
return self._handle_list_files(user_input)
# 数据分析请求
if any(kw in user_lower for kw in ['分析', '统计', '汇总']):
return self._handle_data_analysis(user_input)
if any(kw in user_lower for kw in ['筛选', '过滤', '查找记录']):
return self._handle_data_filter(user_input)
# 默认:使用Agent的通用处理能力
return self._handle_general(user_input)
def _handle_search(self, user_input: str) -> str:
"""处理搜索请求"""
# 提取搜索关键词(简化版)
keywords = user_input.replace('搜索', '').replace('查询', '').replace('查找', '').strip()
if not keywords:
return "请告诉我您想搜索什么内容?"
try:
results = search(keywords, max_results=5)
if not results:
return f"没有找到与'{keywords}'相关的搜索结果"
response = f"为您找到以下与'{keywords}'相关的信息:\n\n"
for i, r in enumerate(results, 1):
response += f"{i}. **{r.title}**\n"
response += f" {r.snippet[:100]}...\n"
response += f" 来源: {r.source} | 链接: {r.url}\n\n"
return response
except Exception as e:
return f"搜索时出现错误: {str(e)}"
def _handle_file_read(self, user_input: str) -> str:
"""处理文件读取请求"""
# 提取文件路径(简化版,实际应用中需要NLU)
import re
paths = re.findall(r'["\'](.+?)["\']|([\w./]+\.\w+)', user_input)
if not paths:
return "请告诉我您想读取哪个文件?"
file_path = paths[0][0] or paths[0][1]
try:
result = self.file_manager._read_file(file_path)
return f"文件内容 ({file_path}):\n\n{result['content'][:2000]}"
except Exception as e:
return f"读取文件时出现错误: {str(e)}"
def _handle_file_write(self, user_input: str) -> str:
"""处理文件写入请求"""
# 简化实现:写入到notes目录
import re
paths = re.findall(r'["\'](.+?)["\']', user_input)
if not paths:
return "请告诉我文件名和内容"
file_path = paths[0]
# 实际应用中需要从用户输入提取内容
content = "用户创建的文件内容"
try:
result = self.file_manager._write_file(file_path, content)
return f"✓ 文件已创建: {file_path}"
except Exception as e:
return f"创建文件时出现错误: {str(e)}"
def _handle_list_files(self, user_input: str) -> str:
"""处理列出文件请求"""
import re
paths = re.findall(r'["\'](.+?)["\']|([\w./]+)', user_input)
dir_path = paths[0][0] or paths[0][1] if paths else "."
try:
result = self.file_manager._list_directory(dir_path)
response = f"目录内容 ({dir_path}):\n\n"
for item in result['items'][:20]: # 限制显示数量
icon = "📁" if item['type'] == 'directory' else "📄"
response += f"{icon} {item['name']} ({item['type']})\n"
response += f"\n共 {result['total']} 个项目"
return response
except Exception as e:
return f"列出目录时出现错误: {str(e)}"
def _handle_data_analysis(self, user_input: str) -> str:
"""处理数据分析请求"""
import re
paths = re.findall(r'["\'](.+?)["\']|([\w./]+\.\w+)', user_input)
if not paths:
return "请告诉我您想分析哪个数据文件?"
file_path = paths[0][0] or paths[0][1]
try:
summary = self.data_analyzer._generate_summary(file_path)
response = f"📊 数据分析报告: {file_path}\n\n"
response += f"**基本信息**\n"
response += f"- 总行数: {summary['total_rows']}\n"
response += f"- 总列数: {summary['total_columns']}\n"
response += f"- 列名: {', '.join(summary['column_names'])}\n\n"
if summary['statistics']:
response += f"**数值列统计**\n"
for col, stats in summary['statistics'].items():
response += f"- {col}: 均值={stats['mean']:.2f}, 范围=[{stats['min']:.2f}, {stats['max']:.2f}]\n"
return response
except Exception as e:
return f"分析数据时出现错误: {str(e)}"
def _handle_data_filter(self, user_input: str) -> str:
"""处理数据筛选请求"""
# 简化实现
return "请提供筛选条件,例如:筛选 column='value'"
def _handle_general(self, user_input: str) -> str:
"""处理通用请求(使用AI模型)"""
# 这里可以调用大语言模型处理
# 为了演示,我们返回一个提示
return f"我收到了您的消息:'{user_input}'\n\n作为全能助手,我可以帮您:\n" \
"- 🔍 搜索网络信息\n" \
"- 📁 管理本地文件\n" \
"- 📊 分析数据文件\n\n请告诉我您具体想做什么?"
# 运行助手
if __name__ == "__main__":
# 初始化助手
assistant = MultiFunctionalAssistant()
# 示例对话
print("\n" + "="*50)
print("欢迎使用全能助手!")
print("="*50 + "\n")
# 示例交互
test_inputs = [
"帮我搜索一下人工智能的最新进展",
"列出workspace目录下的所有文件",
"分析一下data/sales.csv文件"
]
for user_input in test_inputs:
print(f"👤 用户: {user_input}")
response = assistant.process(user_input)
print(f"🤖 助手: {response}\n")
4.7 第六步:创建高级工作流
现在让我们创建一个更复杂的工作流示例,展示如何将多个操作组合:
# 文件名:advanced_workflow.py
from agent_reach import TaskOrchestrator, AgentReach
class ResearchWorkflow:
"""研究工作流:自动完成从搜索到报告生成的完整流程"""
def __init__(self):
"""初始化工作流"""
self.orchestrator = TaskOrchestrator()
def create_research_workflow(self, topic: str) -> dict:
"""
创建研究工作流
参数:
topic: 研究主题
返回:
工作流定义
"""
workflow = self.orchestrator.create_workflow(f"research_{topic}")
# 任务1:搜索相关信息
workflow.add_task(
task_id="search_info",
connector="web_search",
params={
"query": f"{topic} 最新动态",
"max_results": 10
},
output_key="search_results"
)
# 任务2:搜索学术论文
workflow.add_task(
task_id="search_papers",
connector="web_search",
params={
"query": f"{topic} academic papers 2024",
"max_results": 5
},
output_key="papers"
)
# 任务3:合并和整理信息
workflow.add_task(
task_id="compile_info",
connector="builtin",
operation="merge",
params={
"sources": ["search_results", "papers"]
},
output_key="compiled_data",
depends_on=["search_info", "search_papers"]
)
# 任务4:生成报告
workflow.add_task(
task_id="generate_report",
connector="file_system",
params={
"operation": "write",
"path": f"reports/{topic}_report.md",
"content_template": """
# {topic} 研究报告
生成时间: {timestamp}
## 概述
{overview}
## 最新动态
{recent_news}
## 相关论文
{papers_summary}
## 结论
{conclusion}
"""
},
output_key="report_path",
depends_on=["compile_info"]
)
return workflow
def execute(self, topic: str) -> dict:
"""执行研究工作流"""
workflow = self.create_research_workflow(topic)
result = self.orchestrator.execute(workflow)
return {
"status": result.status,
"report_path": result.outputs.get("report_path"),
"sources_found": len(result.outputs.get("search_results", []))
}
class AutomatedReportingWorkflow:
"""自动化报告工作流"""
def __init__(self, data_dir: str = "./data"):
"""初始化工作流"""
self.data_dir = data_dir
self.orchestrator = TaskOrchestrator()
def create_daily_report_workflow(self, date: str) -> dict:
"""
创建日报生成工作流
参数:
date: 日期(格式:YYYY-MM-DD)
返回:
工作流定义
"""
workflow = self.orchestrator.create_workflow(f"daily_report_{date}")
# 任务1:收集今日数据文件
workflow.add_task(
task_id="collect_files",
connector="file_system",
params={
"operation": "list",
"path": self.data_dir,
"pattern": f"*{date}*"
},
output_key="data_files"
)
# 任务2:分析每个数据文件
workflow.add_task(
task_id="analyze_data",
connector="builtin",
operation="batch_process",
params={
"source": "data_files",
"operation": "summary"
},
output_key="analysis_results",
depends_on=["collect_files"]
)
# 任务3:生成可视化配置
workflow.add_task(
task_id="create_charts",
connector="builtin",
operation="generate_charts",
params={
"data": "analysis_results",
"chart_types": ["line", "bar", "pie"]
},
output_key="chart_config",
depends_on=["analyze_data"]
)
# 任务4:生成日报文档
workflow.add_task(
task_id="generate_report",
connector="file_system",
params={
"operation": "write",
"path": f"reports/daily_report_{date}.md"
},
output_key="report_path",
depends_on=["analyze_data", "create_charts"]
)
# 任务5:发送报告(可选)
workflow.add_task(
task_id="send_report",
connector="email",
params={
"to": "team@company.com",
"subject": f"每日报告 - {date}",
"attachment": "report_path"
},
depends_on=["generate_report"]
)
return workflow
def execute(self, date: str) -> dict:
"""执行日报工作流"""
workflow = self.create_daily_report_workflow(date)
result = self.orchestrator.execute(workflow)
return {
"status": result.status,
"report_path": result.outputs.get("report_path"),
"charts_generated": len(result.outputs.get("chart_config", {}).get("charts", []))
}
# 使用示例
if __name__ == "__main__":
# 创建研究工作流实例
research = ResearchWorkflow()
# 执行研究工作流
print("正在执行研究工作流...")
result = research.execute("Python异步编程")
print(f"工作流状态: {result['status']}")
print(f"报告路径: {result['report_path']}")
print(f"找到的信息源: {result['sources_found']}")
# 创建日报工作流实例
reporting = AutomatedReportingWorkflow()
# 执行日报工作流
print("\n正在执行日报工作流...")
result = reporting.execute("2024-01-15")
print(f"工作流状态: {result['status']}")
print(f"报告路径: {result['report_path']}")
print(f"生成的图表数: {result['charts_generated']}")
五、常见使用场景
5.1 场景一:智能客服系统
Agent-Reach可以快速搭建一个智能客服系统,处理客户的常见问题:
# 文件名:customer_service.py
from agent_reach import AgentReach
from agent_reach.connectors import APIConnector
class CustomerServiceAgent:
"""客服Agent"""
def __init__(self):
"""初始化客服Agent"""
self.agent = AgentReach(
name="在线客服",
description="帮助客户解答问题和处理请求"
)
# 连接知识库API
self.knowledge_base = APIConnector(
base_url="https://api.company.com/knowledge",
headers={"Authorization": "Bearer xxx"}
)
# 连接工单系统API
self.ticket_system = APIConnector(
base_url="https://api.company.com/tickets",
headers={"Authorization": "Bearer xxx"}
)
self._setup_handlers()
def _setup_handlers(self):
"""设置问题处理函数"""
self.intent_handlers = {
"查询订单": self._handle_order_query,
"产品咨询": self._handle_product_inquiry,
"投诉建议": self._handle_complaint,
"技术支持": self._handle_tech_support,
"退货退款": self._handle_refund
}
def process(self, customer_id: str, message: str) -> str:
"""
处理客户消息
参数:
customer_id: 客户ID
message: 客户消息
返回:
回复内容
"""
# 分析客户意图
intent = self._analyze_intent(message)
# 调用相应的处理函数
if intent in self.intent_handlers:
response = self.intent_handlers[intent](customer_id, message)
else:
# 无法识别意图时,搜索知识库
response = self._search_knowledge_base(message)
return response
def _analyze_intent(self, message: str) -> str:
"""分析客户意图"""
# 简化实现:关键词匹配
keywords = {
"查询订单": ["订单", "快递", "发货", "物流"],
"产品咨询": ["产品", "功能", "规格", "价格"],
"投诉建议": ["投诉", "不满", "建议", "反馈"],
"技术支持": ["无法", "问题", "故障", "报错"],
"退货退款": ["退货", "退款", "取消"]
}
message_lower = message.lower()
for intent, kws in keywords.items():
if any(kw in message_lower for kw in kws):
return intent
return "未知"
def _handle_order_query(self, customer_id: str, message: str) -> str:
"""处理订单查询"""
# 从工单系统获取客户订单
orders = self.ticket_system.get(
f"/customer/{customer_id}/orders"
).json()
if not orders:
return "您好,暂时没有查询到您的订单信息。"
# 提取订单号
order_no = self._extract_order_no(message)
if order_no:
# 查询指定订单
for order in orders:
if order['order_no'] == order_no:
return self._format_order_info(order)
return f"未找到订单号 {order_no} 的信息"
else:
# 返回最近订单
latest = orders[0]
return f"您最近的订单:\n{self._format_order_info(latest)}"
def _handle_product_inquiry(self, customer_id: str, message: str) -> str:
"""处理产品咨询"""
# 搜索知识库
return self._search_knowledge_base(message)
def _handle_complaint(self, customer_id: str, message: str) -> str:
"""处理投诉"""
# 创建工单
ticket = self.ticket_system.post(
"/tickets",
json_data={
"customer_id": customer_id,
"type": "complaint",
"content": message,
"status": "open"
}
).json()
return f"感谢您的反馈!您的投诉已受理,工单号:{ticket['id']}," \
f"我们的工作人员将在24小时内与您联系。"
def _handle_tech_support(self, customer_id: str, message: str) -> str:
"""处理技术支持"""
# 搜索技术支持知识库
articles = self.knowledge_base.get(
"/search",
params={"q": message, "category": "tech_support"}
).json()
if articles:
return f"根据您的问题,为您推荐以下解决方案:\n" + \
"\n".join(f"- {a['title']}: {a['url']}" for a in articles[:3])
else:
return "抱歉,暂未找到相关解决方案。" \
"是否需要创建技术支持工单?"
def _handle_refund(self, customer_id: str, message: str) -> str:
"""处理退货退款"""
return "退货退款申请已收到,请在7天内将商品寄回," \
"收到商品后我们将尽快处理退款。"
def _search_knowledge_base(self, query: str) -> str:
"""搜索知识库"""
articles = self.knowledge_base.get(
"/search",
params={"q": query}
).json()
if not articles:
return "抱歉,暂未找到相关信息。" \
"您可以尝试联系人工客服获得帮助。"
response = "根据您的问题,为您找到以下相关信息:\n\n"
for article in articles[:3]:
response += f"📄 {article['title']}\n"
response += f" {article['summary'][:100]}...\n"
response += f" 链接: {article['url']}\n\n"
return response
def _extract_order_no(self, message: str) -> str:
"""从消息中提取订单号"""
import re
match = re.search(r'\d{10,}', message)
return match.group(0) if match else None
def _format_order_info(self, order: dict) -> str:
"""格式化订单信息"""
return f"""订单号: {order['order_no']}
商品: {order['product_name']}
金额: ¥{order['amount']}
状态: {order['status']}
下单时间: {order['created_at']}
"""
# 使用示例
if __name__ == "__main__":
service = CustomerServiceAgent()
# 处理客户咨询
responses = [
("C001", "我的订单什么时候发货?订单号:202401150001"),
("C001", "产品质量有问题,我要投诉"),
("C002", "这个产品的功能有哪些?")
]
for customer_id, message in responses:
print(f"客户: {message}")
response = service.process(customer_id, message)
print(f"客服: {response}\n")
5.2 场景二:个人知识管理系统
构建一个能够自动收集、整理和检索知识的系统:
# 文件名:knowledge_management.py
import os
from datetime import datetime
from agent_reach import AgentReach
from agent_reach.connectors import WebSearchConnector, FileSystemConnector
from agent_reach.memory import KnowledgeBase
class PersonalKnowledgeManager:
"""个人知识管理系统"""
def __init__(self, knowledge_dir: str = "./knowledge"):
"""
初始化知识管理系统
参数:
knowledge_dir: 知识库存储目录
"""
self.knowledge_dir = knowledge_dir
os.makedirs(knowledge_dir, exist_ok=True)
# 初始化组件
self.search = WebSearchConnector(engine="duckduckgo")
self.fs = FileSystemConnector(base_path=knowledge_dir)
# 初始化向量知识库
self.kb = KnowledgeBase(
storage="vector",
embed_model="text-embedding-ada-002"
)
# 知识分类
self.categories = {
"技术": ["编程", "框架", "工具", "代码"],
"产品": ["功能", "设计", "用户", "市场"],
"商业": ["战略", "运营", "财务", "投资"],
"生活": ["健康", "旅行", "美食", "兴趣"]
}
def collect_from_web(self, topic: str, max_results: int = 10) -> dict:
"""
从网络收集知识
参数:
topic: 主题
max_results: 最大结果数
返回:
收集结果
"""
# 搜索相关信息
results = self.search.search(topic, max_results=max_results)
collected = []
for result in results:
item = {
"title": result.title,
"url": result.url,
"snippet": result.snippet,
"source": result.source,
"topic": topic,
"collected_at": datetime.now().isoformat(),
"category": self._categorize(topic)
}
# 保存到文件
filename = f"web/{self._sanitize_filename(result.title)}.md"
content = self._format_knowledge_item(item)
self.fs.write(filename, content)
# 添加到向量知识库
self.kb.add(
text=f"{item['title']}\n{item['snippet']}",
metadata=item
)
collected.append(item)
return {
"collected": len(collected),
"items": collected
}
def add_note(self, content: str, tags: list = None, category: str = None) -> dict:
"""
添加笔记
参数:
content: 笔记内容
tags: 标签列表
category: 分类
返回:
添加结果
"""
# 自动分类
if not category:
category = self._categorize(content)
# 自动生成标签
if not tags:
tags = self._extract_tags(content)
# 创建笔记文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"notes/{timestamp}.md"
note_content = f"""---
title: {content[:50]}...
created: {datetime.now().isoformat()}
category: {category}
tags: {', '.join(tags)}
---
# 笔记
{content}
---
"""
self.fs.write(filename, note_content)
# 添加到知识库
self.kb.add(
text=content,
metadata={
"type": "note",
"category": category,
"tags": tags,
"filename": filename
}
)
return {
"status": "success",
"filename": filename,
"category": category,
"tags": tags
}
def search_knowledge(self, query: str, top_k: int = 5) -> list:
"""
搜索知识
参数:
query: 搜索查询
top_k: 返回结果数量
返回:
搜索结果列表
"""
results = self.kb.search(query, top_k=top_k)
formatted_results = []
for r in results:
formatted_results.append({
"content": r.text[:200] + "...",
"metadata": r.metadata,
"relevance": round(r.score, 3)
})
return formatted_results
def get_category_summary(self, category: str) -> dict:
"""
获取分类摘要
参数:
category: 分类名称
返回:
分类统计信息
"""
# 列出分类目录
notes_dir = f"notes"
try:
items = self.fs.list_directory(notes_dir)
# 统计该分类的笔记
category_notes = []
for item in items:
if item.is_file:
content = self.fs.read(f"{notes_dir}/{item.name}")
# 简化检查:实际应用中应解析metadata
if category in content:
category_notes.append(item.name)
return {
"category": category,
"note_count": len(category_notes),
"notes": category_notes[:10] # 返回前10条
}
except Exception as e:
return {"error": str(e)}
def _categorize(self, text: str) -> str:
"""自动分类文本"""
text_lower = text.lower()
scores = {}
for category, keywords in self.categories.items():
score = sum(1 for kw in keywords if kw in text_lower)
scores[category] = score
if max(scores.values()) > 0:
return max(scores, key=scores.get)
return "其他"
def _extract_tags(self, content: str) -> list:
"""提取标签(简化实现)"""
# 实际应用中应使用NLP提取关键词
words = content.split()
# 返回前5个较长的词作为标签
tags = [w for w in words if len(w) > 3][:5]
return tags
def _sanitize_filename(self, title: str) -> str:
"""生成安全的文件名"""
import re
safe = re.sub(r'[^\w\s-]', '', title)
safe = re.sub(r'[-\s]+', '_', safe)
return safe[:50]
def _format_knowledge_item(self, item: dict) -> str:
"""格式化知识条目"""
return f"""---
title: {item['title']}
url: {item['url']}
source: {item['source']}
topic: {item['topic']}
category: {item['category']}
collected_at: {item['collected_at']}
---
# {item['title']}
{item['snippet']}
来源: {item['url']}
"""
# 使用示例
if __name__ == "__main__":
# 初始化知识管理器
km = PersonalKnowledgeManager()
# 收集网络知识
print("正在收集网络知识...")
result = km.collect_from_web("Python异步编程", max_results=5)
print(f"已收集 {result['collected']} 条知识")
# 添加笔记
print("\n正在添加笔记...")
note_result = km.add_note(
content="今天学习了Agent-Reach框架,这是一个用于构建AI Agent的工具包...",
tags=["AI", "Agent", "学习"]
)
print(f"笔记已保存: {note_result['filename']}")
print(f"分类: {note_result['category']}")
print(f"标签: {note_result['tags']}")
# 搜索知识
print("\n正在搜索知识...")
results = km.search_knowledge("Python 异步")
for i, r in enumerate(results, 1):
print(f"\n{i}. 相关度: {r['relevance']}")
print(f" {r['content']}")
5.3 场景三:自动化数据处理管道
创建自动化处理数据的管道:
# 文件名:data_pipeline.py
import os
from datetime import datetime, timedelta
from agent_reach import TaskOrchestrator
from agent_reach.connectors import FileSystemConnector, APIConnector
class DataPipeline:
"""数据处理管道"""
def __init__(self, config: dict = None):
"""
初始化数据管道
参数:
config: 管道配置
"""
self.config = config or {}
self.fs = FileSystemConnector(base_path="./data")
self.orchestrator = TaskOrchestrator()
def create_etl_pipeline(self, source_type: str = "api") -> dict:
"""
创建ETL(抽取-转换-加载)管道
参数:
source_type: 数据源类型(api, file, database)
返回:
管道定义
"""
pipeline = self.orchestrator.create_workflow("etl_pipeline")
# 抽取阶段
if source_type == "api":
pipeline.add_task(
task_id="extract_from_api",
connector="api",
params={
"endpoint": "/data/export",
"method": "GET"
},
output_key="raw_data"
)
elif source_type == "file":
pipeline.add_task(
task_id="extract_from_file",
connector="file_system",
params={
"operation": "read",
"path": "input/data.csv"
},
output_key="raw_data"
)
# 转换阶段
pipeline.add_task(
task_id="clean_data",
connector="builtin",
operation="transform",
params={
"source": "raw_data",
"transformations": [
{"type": "remove_nulls"},
{"type": "deduplicate", "key": "id"},
{"type": "normalize", "fields": ["name", "email"]}
]
},
output_key="cleaned_data",
depends_on=["extract_from_api", "extract_from_file"]
)
pipeline.add_task(
task_id="enrich_data",
connector="builtin",
operation="transform",
params={
"source": "cleaned_data",
"transformations": [
{"type": "add_field", "field": "processed_at", "value": "now"},
{"type": "add_field", "field": "status", "value": "processed"}
]
},
output_key="enriched_data",
depends_on=["clean_data"]
)
# 加载阶段
pipeline.add_task(
task_id="load_to_database",
connector="api",
params={
"endpoint": "/data/import",
"method": "POST",
"body": "enriched_data"
},
output_key="import_result",
depends_on=["enrich_data"]
)
pipeline.add_task(
task_id="archive_raw_data",
connector="file_system",
params={
"operation": "move",
"source": "input/data.csv",
"destination": "archive/{timestamp}/data.csv"
},
depends_on=["load_to_database"]
)
return pipeline
class ReportGenerator:
"""自动化报告生成器"""
def __init__(self):
"""初始化报告生成器"""
self.fs = FileSystemConnector(base_path="./reports")
def generate_daily_report(self, date: str = None) -> str:
"""
生成日报
参数:
date: 日期,默认为今天
返回:
报告文件路径
"""
if not date:
date = datetime.now().strftime("%Y-%m-%d")
# 收集数据
metrics = self._collect_metrics(date)
alerts = self._get_alerts(date)
# 生成报告
report_content = self._format_report(date, metrics, alerts)
# 保存报告
report_path = f"daily_{date}.md"
self.fs.write(report_path, report_content)
return report_path
def generate_weekly_report(self, end_date: str = None) -> str:
"""
生成周报
参数:
end_date: 周结束日期,默认为今天
返回:
报告文件路径
"""
if not end_date:
end_date = datetime.now()
else:
end_date = datetime.strptime(end_date, "%Y-%m-%d")
start_date = end_date - timedelta(days=7)
# 收集周数据
daily_reports = []
current = start_date
while current <= end_date:
date_str = current.strftime("%Y-%m-%d")
try:
report = self.fs.read(f"daily_{date_str}.md")
daily_reports.append((date_str, report))
except:
pass
current += timedelta(days=1)
# 生成周报
week_summary = self._summarize_week(daily_reports)
report_path = f"weekly_{end_date.strftime('%Y-%m-%d')}.md"
self.fs.write(report_path, week_summary)
return report_path
def _collect_metrics(self, date: str) -> dict:
"""收集日指标"""
# 模拟数据收集
return {
"users": {"total": 1000, "active": 850, "new": 50},
"revenue": {"total": 50000, "avg_order": 200},
"performance": {"response_time": 120, "uptime": 99.9}
}
def _get_alerts(self, date: str) -> list:
"""获取告警"""
# 模拟告警获取
return []
def _format_report(self, date: str, metrics: dict, alerts: list) -> str:
"""格式化日报"""
return f"""# 日报 - {date}
## 概览
- 日期: {date}
- 状态: {'正常' if not alerts else '有告警'}
## 关键指标
### 用户指标
- 总用户数: {metrics['users']['total']}
- 活跃用户: {metrics['users']['active']}
- 新增用户: {metrics['users']['new']}
### 收入指标
- 总收入: ¥{metrics['revenue']['total']}
- 平均订单: ¥{metrics['revenue']['avg_order']}
### 性能指标
- 响应时间: {metrics['performance']['response_time']}ms
- 可用性: {metrics['performance']['uptime']}%
## 告警信息
{'无' if not alerts else chr(10).join(f'- {a}' for a in alerts)}
---
生成时间: {datetime.now().isoformat()}
"""
def _summarize_week(self, daily_reports: list) -> str:
"""生成周总结"""
if not daily_reports:
return "# 周报\n\n本周无数据"
total_users = sum(r[1].count('活跃用户:') for r in daily_reports)
return f"""# 周报
## 本周概述
- 报告天数: {len(daily_reports)}
- 累计活跃用户: {total_users}
## 每日详情
{chr(10).join(f'- {date}: 查看详情' for date, _ in daily_reports)}
---
生成时间: {datetime.now().isoformat()}
"""
# 使用示例
if __name__ == "__main__":
# 创建数据管道
pipeline = DataPipeline()
etl_workflow = pipeline.create_etl_pipeline(source_type="api")
print("ETL管道已创建")
# 创建报告生成器
generator = ReportGenerator()
# 生成日报
report_path = generator.generate_daily_report()
print(f"日报已生成: {report_path}")
# 生成周报
weekly_path = generator.generate_weekly_report()
print(f"周报已生成: {weekly_path}")
六、技巧与最佳实践
6.1 提升Agent响应质量的技巧
技巧一:精心设计工具描述
工具描述是Agent理解何时应该调用工具的关键。好的描述应该包含:
# 不推荐的描述(过于简单)
tool_bad = Tool(
name="search",
description="搜索"
)
# 推荐的描述(详细且具体)
tool_good = Tool(
name="web_search",
description="""在互联网上搜索最新信息。
使用场景:
- 用户询问实时信息(天气、新闻、股价等)
- 用户询问需要最新数据的问题
- 用户要求查找某些资料或教程
参数说明:
- query: 搜索关键词,尽量具体
- max_results: 返回结果数量,默认5条
返回格式:
- title: 结果标题
- url: 来源链接
- snippet: 内容摘要
- source: 来源网站
""",
parameters={...}
)
技巧二:实现智能意图判断
# 文件名:intent_classifier.py
from typing import List, Dict, Tuple
class IntentClassifier:
"""意图分类器"""
def __init__(self):
"""初始化分类器"""
# 定义意图及其关键词
self.intents = {
"search": {
"keywords": ["搜索", "查询", "查找", "找一下", "有没有"],
"weight": 1.0
},
"file_operation": {
"keywords": ["读取", "写入", "创建", "删除", "列出", "打开", "保存"],
"weight": 1.0
},
"data_analysis": {
"keywords": ["分析", "统计", "汇总", "计算", "对比", "趋势"],
"weight": 1.2 # 数据分析关键词权重更高
},
"task_automation": {
"keywords": ["定时", "自动", "批量", "循环", "每天", "每周"],
"weight": 1.1
}
}
def classify(self, user_input: str) -> List[Tuple[str, float]]:
"""
分类用户意图
参数:
user_input: 用户输入
返回:
按匹配度排序的意图列表 [(intent, score), ...]
"""
scores = {}
for intent, config in self.intents.items():
score = 0.0
keywords = config["keywords"]
weight = config["weight"]
for keyword in keywords:
if keyword in user_input:
# 考虑关键词位置(前面的词权重更高)
position = user_input.find(keyword)
position_bonus = 1.0 - (position / len(user_input)) * 0.3
score += weight * position_bonus
if score > 0:
scores[intent] = score
# 按分数排序
sorted_intents = sorted(
scores.items(),
key=lambda x: x[1],
reverse=True
)
return sorted_intents
def get_primary_intent(self, user_input: str) -> str:
"""
获取主要意图
参数:
user_input: 用户输入
返回:
最可能的意图
"""
classified = self.classify(user_input)
return classified[0][0] if classified else "general"
技巧三:实现上下文感知的回复
# 文件名:context_aware_response.py
from agent_reach.memory import ConversationMemory
class ContextAwareResponse:
"""上下文感知回复生成器"""
def __init__(self):
"""初始化"""
self.memory = ConversationMemory(max_history=50)
def generate_response(self, user_input: str, agent_context: dict) -> str:
"""
生成上下文感知的回复
参数:
user_input: 用户输入
agent_context: Agent上下文信息
返回:
生成的回复
"""
# 获取对话历史
history = self.memory.get_history(include_summary=True)
# 分析当前话题
current_topic = self._extract_topic(user_input)
# 检查话题是否延续
last_topic = self._get_last_topic(history)
# 如果是延续话题,使用更简短的回复
if current_topic == last_topic and last_topic is not None:
return self._generate_continuation_response(
user_input, agent_context, history
)
else:
# 新话题,使用完整回复
return self._generate_new_topic_response(
user_input, agent_context, history
)
def _extract_topic(self, text: str) -> str:
"""提取话题关键词"""
# 简化实现
words = text.split()
# 返回第一个实词
for word in words:
if len(word) > 2:
return word
return "unknown"
def _get_last_topic(self, history: list) -> str:
"""获取上一个话题"""
if len(history) >= 2:
# 扫描最近几轮对话
for item in reversed(history[-4:]):
topic = self._extract_topic(item.content)
if topic != "unknown":
return topic
return None
def _generate_continuation_response(
self,
user_input: str,
agent_context: dict,
history: list
) -> str:
"""生成延续话题的回复"""
# 使用简短、连贯的回复风格
return f"好的,继续。关于{agent_context.get('topic', '这个话题')}," \
f"您还有什么想了解的吗?"
def _generate_new_topic_response(
self,
user_input: str,
agent_context: dict,
history: list
) -> str:
"""生成新话题的回复"""
# 使用完整、详细的回复风格
return f"好的,我来帮您处理:{user_input}\n\n" \
f"这是新的任务,我会立即开始处理。"
6.2 性能优化建议
建议一:使用缓存减少重复查询
# 文件名:smart_cache.py
import time
from functools import wraps
from typing import Any, Callable, Optional
class SmartCache:
"""智能缓存管理器"""
def __init__(self, default_ttl: int = 300):
"""
初始化缓存
参数:
default_ttl: 默认过期时间(秒)
"""
self.cache = {}
self.default_ttl = default_ttl
def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
if key in self.cache:
entry = self.cache[key]
if time.time() < entry["expires_at"]:
return entry["value"]
else:
# 过期,删除
del self.cache[key]
return None
def set(self, key: str, value: Any, ttl: int = None):
"""设置缓存值"""
ttl = ttl or self.default_ttl
self.cache[key] = {
"value": value,
"expires_at": time.time() + ttl,
"created_at": time.time()
}
def invalidate(self, key: str):
"""使缓存失效"""
if key in self.cache:
del self.cache[key]
def clear(self):
"""清空所有缓存"""
self.cache.clear()
def cached(self, ttl: int = None, key_func: Callable = None):
"""
缓存装饰器
参数:
ttl: 过期时间
key_func: 生成缓存键的函数
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
if key_func:
cache_key = key_func(*args, **kwargs)
else:
cache_key = f"{func.__name__}:{str(args)}:{str(kwargs)}"
# 尝试从缓存获取
cached_value = self.get(cache_key)
if cached_value is not None:
return cached_value
# 执行函数
result = func(*args, **kwargs)
# 存入缓存
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# 使用示例
cache = SmartCache(default_ttl=600)
@cache.cached(ttl=300, key_func=lambda q: f"search:{q}")
def cached_search(query: str):
"""带缓存的搜索函数"""
# 实际搜索逻辑
...
建议二:实现异步执行提高响应速度
# 文件名:async_executor.py
import asyncio
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor
class AsyncExecutor:
"""异步任务执行器"""
def __init__(self, max_workers: int = 5):
"""
初始化执行器
参数:
max_workers: 最大并发数
"""
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def execute_parallel(self, tasks: List[Dict]) -> List[Any]:
"""
并行执行多个任务
参数:
tasks: 任务列表,每个任务包含name和func
返回:
任务结果列表
"""
loop = asyncio.get_event_loop()
# 创建所有任务
futures = []
for task in tasks:
future = loop.run_in_executor(
self.executor,
task["func"],
*task.get("args", []),
**task.get("kwargs", {})
)
futures.append((task.get("name", str(i)), future))
# 等待所有任务完成
results = {}
for name, future in futures:
try:
results[name] = await future
except Exception as e:
results[name] = {"error": str(e)}
return results
async def execute_with_timeout(
self,
func: Callable,
timeout: float,
*args,
**kwargs
) -> Any:
"""
带超时的执行
参数:
func: 要执行的函数
timeout: 超时时间(秒)
*args, **kwargs: 函数参数
返回:
函数结果
"""
loop = asyncio.get_event_loop()
try:
result = await asyncio.wait_for(
loop.run_in_executor(self.executor, func, *args, **kwargs),
timeout=timeout
)
return {"status": "success", "result": result}
except asyncio.TimeoutError:
return {"status": "timeout", "message": f"执行超时({timeout}秒)"}
except Exception as e:
return {"status": "error", "message": str(e)}
# 使用示例
async def main():
executor = AsyncExecutor(max_workers=3)
tasks = [
{"name": "search", "func": search_function},
{"name": "analyze", "func": analyze_function},
{"name": "save", "func": save_function}
]
results = await executor.execute_parallel(tasks)
print(results)
# 运行
asyncio.run(main())
6.3 安全最佳实践
实践一:实现操作审计日志
# 文件名:audit_logger.py
import json
from datetime import datetime
from typing import Optional
from enum import Enum
class OperationType(Enum):
"""操作类型枚举"""
READ = "read"
WRITE = "write"
DELETE = "delete"
EXECUTE = "execute"
QUERY = "query"
class AuditLogger:
"""审计日志记录器"""
def __init__(self, log_file: str = "./logs/audit.log"):
"""
初始化审计日志
参数:
log_file: 日志文件路径
"""
self.log_file = log_file
def log(
self,
operation: OperationType,
user_id: str,
resource: str,
details: dict = None,
status: str = "success",
error: str = None
):
"""
记录审计日志
参数:
operation: 操作类型
user_id: 用户ID
resource: 资源标识
details: 详细信息
status: 操作状态
error: 错误信息
"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"operation": operation.value,
"user_id": user_id,
"resource": resource,
"details": details or {},
"status": status,
"error": error
}
# 写入日志文件
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
def query_logs(
self,
user_id: str = None,
operation: OperationType = None,
start_time: datetime = None,
end_time: datetime = None
) -> list:
"""
查询审计日志
参数:
user_id: 用户ID过滤
operation: 操作类型过滤
start_time: 开始时间
end_time: 结束时间
返回:
符合条件的日志条目
"""
results = []
with open(self.log_file, "r", encoding="utf-8") as f:
for line in f:
entry = json.loads(line)
# 应用过滤条件
if user_id and entry["user_id"] != user_id:
continue
if operation and entry["operation"] != operation.value:
continue
if start_time:
entry_time = datetime.fromisoformat(entry["timestamp"])
if entry_time < start_time:
continue
if end_time:
entry_time = datetime.fromisoformat(entry["timestamp"])
if entry_time > end_time:
continue
results.append(entry)
return results
# 使用示例
audit = AuditLogger()
# 记录操作
audit.log(
operation=OperationType.READ,
user_id="user123",
resource="file://documents/report.pdf",
details={"size": 1024}
)
# 查询用户的所有操作
logs = audit.query_logs(user_id="user123")
实践二:实现权限检查装饰器
# 文件名:permission_check.py
from functools import wraps
from typing import List, Set
class Permission:
"""权限定义"""
READ = "read"
WRITE = "write"
DELETE = "delete"
EXECUTE = "execute"
ADMIN = "admin"
class UserRole:
"""用户角色"""
GUEST = "guest"
USER = "user"
ADMIN = "admin"
# 角色权限映射
PERMISSIONS = {
GUEST: {Permission.READ},
USER: {Permission.READ, Permission.WRITE},
ADMIN: {Permission.READ, Permission.WRITE, Permission.DELETE, Permission.EXECUTE, Permission.ADMIN}
}
class PermissionChecker:
"""权限检查器"""
def __init__(self):
"""初始化"""
self.user_permissions = {}
def grant_permission(self, user_id: str, permission: str):
"""授予权限"""
if user_id not in self.user_permissions:
self.user_permissions[user_id] = set()
self.user_permissions[user_id].add(permission)
def revoke_permission(self, user_id: str, permission: str):
"""撤销权限"""
if user_id in self.user_permissions:
self.user_permissions[user_id].discard(permission)
def get_permissions(self, user_id: str, role: str = UserRole.GUEST) -> Set[str]:
"""获取用户权限"""
# 基础权限(来自角色)
base_permissions = UserRole.PERMISSIONS.get(role, set())
# 额外权限
extra_permissions = self.user_permissions.get(user_id, set())
return base_permissions | extra_permissions
def has_permission(self, user_id: str, permission: str, role: str = UserRole.GUEST) -> bool:
"""检查是否有权限"""
user_permissions = self.get_permissions(user_id, role)
return permission in user_permissions
def require_permission(self, permission: str):
"""
权限检查装饰器
参数:
permission: 需要的权限
"""
def decorator(func):
@wraps(func)
def wrapper(self, user_id: str, *args, **kwargs):
role = kwargs.get("role", UserRole.GUEST)
if not self.has_permission(user_id, permission, role):
raise PermissionError(
f"用户 {user_id} 缺少权限: {permission}"
)
return func(self, user_id, *args, **kwargs)
return wrapper
return decorator
# 使用示例
checker = PermissionChecker()
@checker.require_permission(Permission.WRITE)
def write_file(user_id: str, content: str, role: str = UserRole.GUEST):
"""写入文件(需要WRITE权限)"""
print(f"用户 {user_id} 正在写入文件...")
...
# 测试
try:
write_file("user1", "Hello", role=UserRole.USER)
# 成功
except PermissionError as e:
# 权限不足
print(e)
七、项目总结与延伸学习
7.1 核心要点回顾
通过这篇教程,我们学习了如何全面使用Agent-Reach构建AI应用:
┌─────────────────────────────────────────────────────────────┐
│ 核心学习要点 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ✓ 环境搭建 │
│ └── 虚拟环境、依赖安装、配置文件 │
│ │
│ ✓ 架构理解 │
│ └── 意图解析、规划模块、执行器、记忆模块 │
│ │
│ ✓ 连接器系统 │
│ └── 网络搜索、文件管理、API调用、数据分析 │
│ │
│ ✓ 任务编排 │
│ └── 工作流定义、任务依赖、执行调度 │
│ │
│ ✓ 实际应用 │
│ └── 智能客服、知识管理、数据处理管道 │
│ │
│ ✓ 最佳实践 │
│ └── 工具描述、性能优化、安全审计 │
│ │
└─────────────────────────────────────────────────────────────┘
7.2 相关AI项目推荐
Agent-Reach为构建AI Agent提供了强大的基础能力。如果你对AI应用开发感兴趣,以下项目也值得关注:
LangChain – 广泛使用的LLM应用开发框架,与Agent-Reach可以互补使用
– GitHub: https://github.com/langchain-ai/langchain
– 特点:丰富的Chain组件、工具集成、内存管理
AutoGPT – 自主AI Agent的标杆项目
– GitHub: https://github.com/Significant-Gravitas/AutoGPT
– 特点:目标驱动的自主执行、任务分解与反思
LlamaIndex – 专注于知识检索的框架
– GitHub: https://github.com/emptycrown/llama-hub
– 特点:强大的数据连接器、知识图谱构建
Semantic Kernel – 微软出品的AI编排框架
– GitHub: https://github.com/microsoft/semantic-kernel
– 特点:企业级支持、多语言SDK、与Azure深度集成
CrewAI – 多智能体协作框架
– GitHub: https://github.com/joaomdmoura/crewAI
– 特点:多Agent角色定义、任务协作流程
7.3 进一步学习路径
学习路径建议:
┌─────────────────┐
│ 基础知识 │
│ - Python编程 │
│ - API理解 │
│ - 异步编程 │
└────────┬────────┘
│
┌────────▼────────┐
│ Agent-Reach │
│ 入门指南 │
│ - 本教程内容 │
│ - 基础应用开发 │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────────▼─────┐ ┌──────▼──────┐ ┌────▼─────────┐
│ 高级特性 │ │ 性能优化 │ │ 生产部署 │
│ - 自定义连接器│ │ - 缓存策略 │ │ - 容器化 │
│ - 复杂工作流 │ │ - 异步执行 │ │ - 监控告警 │
│ - 多Agent协作│ │ - 资源管理 │ │ - CI/CD │
└──────────────┘ └─────────────┘ └──────────────┘
7.4 参与项目贡献
Agent-Reach是一个开源项目,欢迎大家参与贡献:
贡献方式:
├── 🐛 报告Bug - 在GitHub Issues中提交问题报告
├── 💡 提出建议 - 分享你的使用场景和功能需求
├── 📝 完善文档 - 帮助改进教程和使用文档
├── 🔧 提交代码 - 修复Bug或开发新功能
├── 🧪 测试反馈 - 分享你的使用测试结果
└── 📢 社区推广 - 帮助宣传项目,影响更多人
7.5 资源链接
官方资源:
├── GitHub仓库: https://github.com/Panniantong/Agent-Reach
├── 官方文档: https://agent-reach.readthedocs.io
├── 示例代码: https://github.com/Panniantong/Agent-Reach/tree/main/examples
├── 问题反馈: https://github.com/Panniantong/Agent-Reach/issues
└── 讨论交流: https://github.com/Panniantong/Agent-Reach/discussions
学习资源:
├── Python官方文档: https://docs.python.org
├── 异步编程指南: https://docs.python.org/3/library/asyncio.html
├── REST API设计最佳实践: https://restfulapi.net
└── 向量数据库介绍: https://www.pinecone.io/learn/vector-database
结语
Agent-Reach代表了一种重要的技术趋势:让AI从”能说会道”走向”能做事”。它不仅仅是一个工具库,更是一种构建AI应用的理念——通过标准化的接口和模块化的设计,让AI Agent能够真正与外部世界交互,完成实际任务。
通过本教程的学习,你应该已经掌握了Agent-Reach的核心概念和使用方法。接下来,建议你:
- 动手实践:从最简单的示例开始,逐步构建自己的应用
- 深入探索:阅读项目源码,理解底层实现原理
- 社区互动:加入社区,与其他开发者交流经验
- 持续学习:关注AI Agent领域的最新发展,保持知识更新
AI Agent的潜力才刚刚开始显现。期待看到你用Agent-Reach构建出令人惊叹的应用!
祝你学习愉快,开发顺利!
本文档最后更新于2024年,基于Agent-Reach最新版本编写。如有任何问题或建议,欢迎在项目仓库中提交Issue。
评论区