CowAgent:让AI助手真正「会干活」的开源Agent框架,我用了三个月后彻底惊艳了
引言
你是否曾经遇到过这样的困境:给AI下达了一个看似简单的任务,它却只是返回一段文字答案,而无法真正帮你执行操作?又或者,当你想让AI帮你在电脑上完成一系列复杂操作时,却发现它「有心无力」?
今天我要介绍的这个开源项目——CowAgent,正是为了解决这个痛点而生。它不仅仅是一个对话式AI,更是一个能够真正帮你「干活」的智能助手框架。
为什么 CowAgent 值得关注
传统AI助手的局限性
在介绍 CowAgent 之前,让我们先回顾一下传统AI助手的工作模式:
用户:帮我查一下明天的天气
AI:明天天气晴朗,温度15-25度,适合外出活动。
用户:帮我把这张照片发到微信群
AI:抱歉,我无法直接执行这些操作...
看到了吗?传统AI只能「回答问题」,而无法「执行任务」。这就是我们常说的「语言模型天花板」——它很聪明,但缺乏真正的「手」和「脚」。
CowAgent 的核心突破
CowAgent 的出现打破了这一困境。它构建了一个完整的Agent执行架构,让AI能够:
- 理解你的意图:不仅听懂你说的,还能理解你想做的
- 规划执行步骤:将复杂任务拆解为可执行的子任务
- 调用各种工具:文件操作、代码执行、API调用、系统控制…
- 自我纠错:任务执行失败时能够自动重试或调整策略
简单来说,CowAgent 让AI从「能说」进化到「能做事」。
为什么选择 CowAgent
| 特性 | 传统AI助手 | CowAgent |
|---|---|---|
| 交互方式 | 纯文本对话 | 对话 + 任务执行 |
| 能力范围 | 回答问题 | 完成任务链 |
| 自动化程度 | 低 | 高 |
| 可扩展性 | 有限 | 插件系统支持 |
| 执行透明度 | 黑盒 | 可追踪、可干预 |
环境搭建:5分钟快速入门
系统要求
在开始之前,确保你的环境满足以下要求:
- Python 3.9 或更高版本
- pip 包管理器
- 稳定的网络连接(用于调用AI模型API)
- 至少4GB可用内存
安装步骤
第一步:创建虚拟环境(推荐但非必需)
# 创建新的虚拟环境
python -m venv cowagent-env
# 激活虚拟环境
# Windows 系统
cowagent-env\Scripts\activate
# macOS / Linux 系统
source cowagent-env/bin/activate
第二步:安装 CowAgent
# 使用 pip 安装
pip install cowagent
# 或者从源码安装(获取最新功能)
git clone https://github.com/zhayujie/CowAgent.git
cd CowAgent
pip install -e .
第三步:配置API密钥
CowAgent 需要连接AI模型来提供智能能力。你需要配置API密钥:
# 在项目根目录创建 config.py 文件
import os
# 设置你的API密钥(以OpenAI为例)
os.environ["OPENAI_API_KEY"] = "你的API密钥"
# 如果使用其他模型,可以设置对应的密钥
# os.environ["ANTHROPIC_API_KEY"] = "你的Claude密钥"
# os.environ["ZHIPU_API_KEY"] = "你的智谱密钥"
第四步:验证安装
# 创建一个简单的测试脚本 test_install.py
from cowagent import CowAgent
# 初始化Agent
agent = CowAgent()
# 测试基本对话
response = agent.chat("你好,请介绍一下你自己")
print(response)
# 预期输出:Agent会介绍自己的功能和特点
运行测试脚本,如果一切正常,你会看到类似以下输出:
CowAgent初始化完成!
你好!我是CowAgent,一个智能任务执行助手...
核心功能详解
1. 多层级Agent架构
CowAgent 采用分层设计,这是它最核心的设计理念:
┌─────────────────────────────────────┐
│ User Interface │
│ (用户交互层) │
├─────────────────────────────────────┤
│ CowAgent Core │
│ ┌─────────────────────────┐ │
│ │ Supervisor Agent │ │
│ │ (任务监督者) │ │
│ ├─────────────────────────┤ │
│ │ Specialized Agents │ │
│ │ ┌─────┐ ┌─────┐ ┌────┐│ │
│ │ │搜索 │ │文件 │ │代码 ││ │
│ │ │Agent│ │Agent│ │Agent││ │
│ │ └─────┘ └─────┘ └────┘│ │
│ └─────────────────────────┘ │
├─────────────────────────────────────┤
│ Tool Layer │
│ (工具执行层 - 可扩展) │
└─────────────────────────────────────┘
Supervisor Agent(监督者) 是整个系统的中枢,它负责:
– 理解用户意图
– 分解复杂任务
– 分配给专业Agent
– 整合子任务结果
Specialized Agents(专业Agent) 是各个领域的专家:
– 每个Agent专注特定能力
– 可以并行执行独立任务
– 通过标准化接口通信
2. 强大的工具系统
CowAgent 的工具系统是其执行能力的基础。每个工具都是一个独立的执行单元:
内置工具示例
# 文件操作工具
class FileTool:
"""文件操作工具集"""
def read_file(self, path: str) -> str:
"""读取文件内容"""
with open(path, 'r', encoding='utf-8') as f:
return f.read()
def write_file(self, path: str, content: str) -> bool:
"""写入文件内容"""
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return True
def list_directory(self, path: str) -> list:
"""列出目录内容"""
import os
return os.listdir(path)
# 代码执行工具
class CodeTool:
"""代码执行工具"""
def execute_python(self, code: str) -> dict:
"""执行Python代码"""
import subprocess
result = subprocess.run(
['python', '-c', code],
capture_output=True,
text=True
)
return {
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
自定义工具
你可以轻松添加自己的工具:
from cowagent.tools import BaseTool
class MyCustomTool(BaseTool):
"""自定义工具示例"""
# 工具的唯一标识符
name = "my_custom_tool"
# 工具的详细描述,用于AI理解何时使用
description = """
这是一个自定义工具,用于执行特定业务逻辑。
当用户需要XXX操作时使用此工具。
"""
# 工具参数定义
parameters = {
"type": "object",
"properties": {
"param1": {
"type": "string",
"description": "参数1的说明"
}
},
"required": ["param1"]
}
def execute(self, param1: str, **kwargs) -> dict:
"""
实际执行逻辑
Args:
param1: 输入参数
Returns:
执行结果字典
"""
# 你的业务逻辑
result = f"处理了: {param1}"
return {
"success": True,
"result": result
}
# 注册工具
agent = CowAgent()
agent.register_tool(MyCustomTool())
3. 记忆系统
CowAgent 内置了强大的记忆系统,让Agent能够:
- 短期记忆:记住当前对话的上下文
- 长期记忆:持久化重要信息到本地存储
- 向量检索:基于语义相似度召回历史信息
from cowagent.memory import ConversationMemory, VectorMemory
# 初始化记忆系统
conversation_memory = ConversationMemory(
max_turns=20 # 保留最近20轮对话
)
vector_memory = VectorMemory(
persist_path="./memory_data" # 持久化路径
)
# Agent中使用记忆
agent = CowAgent(
conversation_memory=conversation_memory,
vector_memory=vector_memory
)
# 查询历史相关记忆
related_memories = agent.recall("之前我让你帮我写的那个爬虫脚本")
print(related_memories)
4. 任务规划引擎
当用户提出复杂任务时,CowAgent 的任务规划引擎会自动:
- 任务分解:将大任务拆分为可执行的子任务
- 依赖分析:确定子任务之间的执行顺序
- 并行优化:识别可并行执行的任务
- 动态调整:根据执行结果调整后续计划
# 复杂任务示例
task = """
帮我完成以下工作:
1. 从豆瓣获取Top250电影数据
2. 筛选出评分高于9.0的电影
3. 将结果保存为Excel文件
4. 发送邮件给团队成员
"""
# CowAgent会自动规划执行
result = agent.execute_task(task)
# 执行过程会被详细记录
print(result["execution_trace"])
# [
# {"step": 1, "action": "web_scraper", "status": "completed"},
# {"step": 2, "action": "data_filter", "status": "completed"},
# {"step": 3, "action": "excel_export", "status": "completed"},
# {"step": 4, "action": "email_sender", "status": "completed"}
# ]
实战教程:从入门到精通
教程一:构建一个文件管理助手
让我们从最简单的例子开始——创建一个能够帮你管理文件的AI助手。
第一步:创建项目结构
mkdir -p file_assistant/{tools,memory,prompts}
cd file_assistant
touch tools/__init__.py
第二步:定义文件操作工具
创建 tools/file_manager.py:
"""
文件管理工具集
提供文件读写、搜索、分类等基础功能
"""
import os
import shutil
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
class FileManager:
"""文件管理器"""
def __init__(self, base_path: str = "."):
self.base_path = Path(base_path)
def create_file(self, file_path: str, content: str = "") -> Dict:
"""
创建新文件
Args:
file_path: 文件路径(相对于base_path)
content: 初始内容
Returns:
操作结果字典
"""
try:
full_path = self.base_path / file_path
full_path.parent.mkdir(parents=True, exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content)
return {
"success": True,
"message": f"文件创建成功: {file_path}",
"path": str(full_path)
}
except Exception as e:
return {
"success": False,
"message": f"创建失败: {str(e)}"
}
def read_file(self, file_path: str) -> Dict:
"""
读取文件内容
Args:
file_path: 文件路径
Returns:
文件内容字典
"""
try:
full_path = self.base_path / file_path
if not full_path.exists():
return {
"success": False,
"message": "文件不存在"
}
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
return {
"success": True,
"content": content,
"size": full_path.stat().st_size,
"modified": datetime.fromtimestamp(
full_path.stat().st_mtime
).isoformat()
}
except Exception as e:
return {
"success": False,
"message": f"读取失败: {str(e)}"
}
def search_files(
self,
keyword: str,
extension: Optional[str] = None
) -> Dict:
"""
搜索文件
Args:
keyword: 搜索关键词(文件名包含此关键词)
extension: 文件扩展名过滤(如 '.py', '.txt')
Returns:
搜索结果字典
"""
results = []
try:
for root, dirs, files in os.walk(self.base_path):
for file in files:
if keyword.lower() in file.lower():
if extension is None or file.endswith(extension):
full_path = Path(root) / file
results.append({
"name": file,
"path": str(full_path.relative_to(self.base_path)),
"size": full_path.stat().st_size
})
return {
"success": True,
"count": len(results),
"results": results
}
except Exception as e:
return {
"success": False,
"message": f"搜索失败: {str(e)}"
}
def batch_rename(
self,
directory: str,
old_pattern: str,
new_pattern: str
) -> Dict:
"""
批量重命名文件
Args:
directory: 目标目录
old_pattern: 原文件名中的匹配模式
new_pattern: 替换后的新模式
Returns:
操作结果字典
"""
renamed = []
failed = []
try:
dir_path = self.base_path / directory
files = list(dir_path.iterdir())
for file in files:
if file.is_file() and old_pattern in file.name:
new_name = file.name.replace(old_pattern, new_pattern)
new_path = file.parent / new_name
try:
file.rename(new_path)
renamed.append({
"old": file.name,
"new": new_name
})
except Exception as e:
failed.append({
"name": file.name,
"error": str(e)
})
return {
"success": True,
"renamed_count": len(renamed),
"renamed": renamed,
"failed_count": len(failed),
"failed": failed
}
except Exception as e:
return {
"success": False,
"message": f"批量重命名失败: {str(e)}"
}
# ============================================================
# 导出工具接口供Agent调用
# ============================================================
def get_file_tools():
"""获取所有文件管理工具"""
manager = FileManager()
return {
"create_file": manager.create_file,
"read_file": manager.read_file,
"search_files": manager.search_files,
"batch_rename": manager.batch_rename
}
第三步:创建Prompt模板
创建 prompts/file_assistant.txt:
你是一个专业的文件管理助手,名叫小档。
你的职责:
- 帮助用户管理文件和文件夹
- 快速查找用户需要的文件
- 批量处理文件操作任务
- 提供文件整理建议
沟通原则:
1. 先理解用户的需求,再执行操作
2. 复杂操作前先确认用户意图
3. 操作完成后简要汇报结果
4. 如果操作可能影响重要文件,先提示风险
可用工具:
- create_file: 创建新文件
- read_file: 读取文件内容
- search_files: 搜索文件
- batch_rename: 批量重命名
当用户提出文件相关请求时,请选择合适的工具来完成任务。
第四步:整合为完整应用
创建 main.py:
"""
文件管理助手主程序
集成CowAgent核心功能,提供智能文件管理服务
"""
import os
from cowagent import CowAgent
from cowagent.prompts import load_prompt_template
from cowagent.memory import ConversationMemory
from tools.file_manager import get_file_tools
# ============================================================
# 配置区域
# ============================================================
# 设置工作目录
WORK_DIR = "./workspace"
# 设置API密钥
os.environ["OPENAI_API_KEY"] = os.getenv(
"OPENAI_API_KEY",
"your-api-key-here"
)
# ============================================================
# 初始化Agent
# ============================================================
def initialize_agent():
"""初始化文件管理助手"""
# 加载Prompt模板
system_prompt = load_prompt_template("prompts/file_assistant.txt")
# 配置记忆系统
memory = ConversationMemory(max_turns=50)
# 获取工具集
file_tools = get_file_tools()
# 创建Agent实例
agent = CowAgent(
system_prompt=system_prompt,
memory=memory,
tools=file_tools,
work_dir=WORK_DIR
)
return agent
# ============================================================
# 交互式界面
# ============================================================
def run_interactive_mode(agent: CowAgent):
"""启动交互式对话模式"""
print("=" * 60)
print("📁 文件管理助手 - 小档")
print("=" * 60)
print("输入你的文件管理需求,或输入 'quit' 退出")
print("-" * 60)
while True:
try:
user_input = input("\n🧑 你: ").strip()
if not user_input:
continue
if user_input.lower() in ['quit', 'exit', 'q']:
print("\n再见!有任何文件管理需求随时找我。")
break
# 发送给Agent处理
response = agent.chat(user_input)
print(f"\n🤖 小档: {response}")
except KeyboardInterrupt:
print("\n\n程序被用户中断,再见!")
break
except Exception as e:
print(f"\n⚠️ 发生错误: {str(e)}")
# ============================================================
# 主程序入口
# ============================================================
if __name__ == "__main__":
# 初始化Agent
agent = initialize_agent()
# 运行交互模式
run_interactive_mode(agent)
第五步:运行测试
# 确保工作目录存在
mkdir -p workspace
# 运行程序
python main.py
使用示例对话:
============================================================
📁 文件管理助手 - 小档
============================================================
输入你的文件管理需求,或输入 'quit' 退出
------------------------------------------------------------
🧑 你: 在workspace目录下创建一个readme.txt文件,内容是"这是我的第一个CowAgent项目"
🤖 小档: 好的,我来帮你创建这个文件!
✓ 操作成功
文件已创建: workspace/readme.txt
内容: "这是我的第一个CowAgent项目"
🧑 你: 帮我搜索所有.py文件
🤖 小档: 正在搜索.py文件...
找到 12 个Python文件:
1. main.py (2.3KB)
2. tools/file_manager.py (5.1KB)
3. tests/test_basic.py (1.2KB)
...等
🧑 你: 把所有文件名中的"test"改成"demo"
🤖 小档: 好的,我来进行批量重命名...
重命名完成!成功重命名 5 个文件:
- test_basic.py → demo_basic.py
- test_advanced.py → demo_advanced.py
- test_utils.py → demo_utils.py
...
教程二:构建自动化数据处理Pipeline
接下来,让我们创建一个更复杂的应用——数据处理自动化助手。
整体架构
用户需求
↓
CowAgent(任务分解)
↓
┌─────────┬─────────┬─────────┐
│ 数据获取 │ 数据清洗 │ 数据输出 │
│ Agent │ Agent │ Agent │
└─────────┴─────────┴─────────┘
↓
Pipeline执行器
↓
结果汇总报告
实现代码
创建 data_pipeline.py:
"""
数据处理Pipeline
展示如何用CowAgent构建自动化数据处理流程
"""
import json
import csv
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum
# ============================================================
# 数据模型定义
# ============================================================
class DataFormat(Enum):
"""支持的数据格式"""
CSV = "csv"
JSON = "json"
EXCEL = "excel"
TEXT = "text"
@dataclass
class DataRecord:
"""数据记录"""
id: str
timestamp: str
category: str
value: float
status: str
metadata: Optional[Dict] = None
@dataclass
class ProcessingResult:
"""处理结果"""
success: bool
records_processed: int
records_failed: int
output_path: str
execution_time: float
details: List[str]
# ============================================================
# 数据获取Agent
# ============================================================
class DataFetcherAgent:
"""数据获取Agent - 负责从各种来源获取数据"""
def __init__(self):
self.name = "data_fetcher"
self.description = "从CSV、JSON或API获取原始数据"
def fetch_from_csv(self, file_path: str) -> Dict:
"""
从CSV文件获取数据
Args:
file_path: CSV文件路径
Returns:
包含原始数据的字典
"""
records = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
records.append(row)
return {
"success": True,
"count": len(records),
"data": records,
"source": file_path
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def fetch_from_json(self, file_path: str) -> Dict:
"""
从JSON文件获取数据
Args:
file_path: JSON文件路径
Returns:
包含原始数据的字典
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 处理单条记录或多条记录
if isinstance(data, list):
records = data
else:
records = [data]
return {
"success": True,
"count": len(records),
"data": records,
"source": file_path
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def generate_sample_data(self, count: int = 100) -> Dict:
"""
生成示例数据(用于测试)
Args:
count: 生成记录数量
Returns:
示例数据字典
"""
import random
from datetime import timedelta
records = []
base_date = datetime.now() - timedelta(days=30)
categories = ["电子产品", "服装", "食品", "图书", "家居"]
statuses = ["pending", "completed", "cancelled"]
for i in range(count):
days_offset = random.randint(0, 30)
record_date = base_date + timedelta(days=days_offset)
records.append({
"id": f"ORD-{i+1:05d}",
"timestamp": record_date.isoformat(),
"category": random.choice(categories),
"value": round(random.uniform(10, 1000), 2),
"status": random.choice(statuses),
"metadata": {
"customer_id": f"CUST-{random.randint(1000, 9999)}",
"region": random.choice(["华东", "华南", "华北", "西南"])
}
})
return {
"success": True,
"count": len(records),
"data": records,
"source": "generated"
}
# ============================================================
# 数据清洗Agent
# ============================================================
class DataCleanerAgent:
"""数据清洗Agent - 负责数据验证、转换和清洗"""
def __init__(self):
self.name = "data_cleaner"
self.description = "清洗和转换原始数据"
def validate_records(self, records: List[Dict]) -> Dict:
"""
验证数据记录的完整性和有效性
Args:
records: 待验证的记录列表
Returns:
验证结果
"""
required_fields = ["id", "timestamp", "category", "value", "status"]
valid_records = []
invalid_records = []
issues = []
for idx, record in enumerate(records):
record_issues = []
# 检查必填字段
for field in required_fields:
if field not in record or record[field] is None:
record_issues.append(f"缺少字段: {field}")
# 验证数值字段
if "value" in record:
try:
value = float(record["value"])
if value < 0:
record_issues.append("value不能为负数")
except (ValueError, TypeError):
record_issues.append("value格式错误")
# 验证状态字段
if "status" in record:
valid_statuses = ["pending", "completed", "cancelled"]
if record["status"] not in valid_statuses:
record_issues.append(f"无效的status值: {record['status']}")
if record_issues:
invalid_records.append({
"index": idx,
"record": record,
"issues": record_issues
})
issues.extend([f"记录{idx}: {issue}" for issue in record_issues])
else:
valid_records.append(record)
return {
"success": True,
"valid_count": len(valid_records),
"invalid_count": len(invalid_records),
"valid_records": valid_records,
"invalid_records": invalid_records[:10], # 只保留前10条示例
"issues": issues
}
def deduplicate(self, records: List[Dict], key: str = "id") -> Dict:
"""
根据指定键去重
Args:
records: 记录列表
key: 用于去重的键名
Returns:
去重结果
"""
seen = set()
unique_records = []
duplicates = []
for record in records:
record_id = record.get(key)
if record_id in seen:
duplicates.append(record)
else:
seen.add(record_id)
unique_records.append(record)
return {
"success": True,
"original_count": len(records),
"unique_count": len(unique_records),
"duplicate_count": len(duplicates),
"unique_records": unique_records,
"duplicates": duplicates[:5] # 只保留前5条示例
}
def filter_records(
self,
records: List[Dict],
conditions: Dict[str, Any]
) -> Dict:
"""
根据条件过滤记录
Args:
records: 记录列表
conditions: 过滤条件
Returns:
过滤结果
"""
filtered = []
excluded = []
for record in records:
match = True
for field, value in conditions.items():
if field not in record:
match = False
break
# 支持多种匹配方式
if isinstance(value, dict):
# 范围匹配
if "min" in value and record[field] < value["min"]:
match = False
if "max" in value and record[field] > value["max"]:
match = False
# 列表匹配
if "in" in value and record[field] not in value["in"]:
match = False
elif record[field] != value:
match = False
if match:
filtered.append(record)
else:
excluded.append(record)
return {
"success": True,
"filtered_count": len(filtered),
"excluded_count": len(excluded),
"filtered_records": filtered
}
def transform_schema(
self,
records: List[Dict],
mapping: Dict[str, str]
) -> Dict:
"""
转换数据Schema
Args:
records: 记录列表
mapping: 字段映射关系 {新字段名: 旧字段名}
Returns:
转换结果
"""
transformed = []
for record in records:
new_record = {}
for new_field, old_field in mapping.items():
if old_field in record:
new_record[new_field] = record[old_field]
# 保留未映射的字段
mapped_fields = set(mapping.values())
for key, value in record.items():
if key not in mapped_fields:
new_record[key] = value
transformed.append(new_record)
return {
"success": True,
"transformed_count": len(transformed),
"transformed_records": transformed
}
# ============================================================
# 数据输出Agent
# ============================================================
class DataExporterAgent:
"""数据导出Agent - 负责将处理后的数据输出到各种格式"""
def __init__(self):
self.name = "data_exporter"
self.description = "导出数据到CSV、JSON或Excel"
def export_to_csv(
self,
records: List[Dict],
file_path: str,
fieldnames: Optional[List[str]] = None
) -> Dict:
"""
导出数据到CSV
Args:
records: 要导出的记录
file_path: 输出文件路径
fieldnames: 字段名列表
Returns:
导出结果
"""
if not records:
return {
"success": False,
"error": "没有可导出的数据"
}
try:
# 自动获取字段名
if fieldnames is None:
fieldnames = list(records[0].keys())
with open(file_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(records)
return {
"success": True,
"output_path": file_path,
"record_count": len(records)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def export_to_json(
self,
records: List[Dict],
file_path: str,
indent: int = 2
) -> Dict:
"""
导出数据到JSON
Args:
records: 要导出的记录
file_path: 输出文件路径
indent: 缩进空格数
Returns:
导出结果
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=indent)
return {
"success": True,
"output_path": file_path,
"record_count": len(records)
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def generate_report(self, processing_results: List[Dict]) -> str:
"""
生成数据处理报告
Args:
processing_results: 各步骤的处理结果
Returns:
格式化的报告文本
"""
report_lines = [
"=" * 60,
"📊 数据处理报告",
"=" * 60,
f"生成时间: {datetime.now().isoformat()}",
"-" * 60
]
total_records = 0
total_success = 0
for idx, result in enumerate(processing_results):
step_name = result.get("step", f"步骤{idx+1}")
report_lines.append(f"\n【{step_name}】")
if "record_count" in result:
total_records = result["record_count"]
report_lines.append(f" 处理记录数: {total_records}")
if "success" in result:
status = "✓ 成功" if result["success"] else "✗ 失败"
report_lines.append(f" 状态: {status}")
if "issues" in result and result["issues"]:
report_lines.append(f" 问题数: {len(result['issues'])}")
for issue in result["issues"][:5]:
report_lines.append(f" - {issue}")
report_lines.extend([
"-" * 60,
f"总计处理: {total_records} 条记录",
"=" * 60
])
return "\n".join(report_lines)
# ============================================================
# Pipeline编排器
# ============================================================
class DataPipeline:
"""数据处理Pipeline编排器"""
def __init__(self):
self.fetcher = DataFetcherAgent()
self.cleaner = DataCleanerAgent()
self.exporter = DataExporterAgent()
self.processing_log = []
def run(
self,
source: str = "generated",
source_params: Optional[Dict] = None,
cleaning_rules: Optional[Dict] = None,
output_format: DataFormat = DataFormat.CSV,
output_path: str = "./output/data_export.csv"
) -> Dict:
"""
执行完整的数据处理Pipeline
Args:
source: 数据来源 ('csv', 'json', 'generated')
source_params: 数据源参数
cleaning_rules: 清洗规则
output_format: 输出格式
output_path: 输出文件路径
Returns:
Pipeline执行结果
"""
import time
start_time = time.time()
self.processing_log = []
# ============================================================
# 步骤1: 数据获取
# ============================================================
step1_result = {"step": "数据获取"}
if source == "generated":
count = (source_params or {}).get("count", 100)
fetch_result = self.fetcher.generate_sample_data(count)
elif source == "csv":
file_path = (source_params or {}).get("file_path", "input.csv")
fetch_result = self.fetcher.fetch_from_csv(file_path)
elif source == "json":
file_path = (source_params or {}).get("file_path", "input.json")
fetch_result = self.fetcher.fetch_from_json(file_path)
else:
fetch_result = {"success": False, "error": "不支持的数据源"}
step1_result.update(fetch_result)
self.processing_log.append(step1_result)
if not fetch_result.get("success"):
return {
"success": False,
"error": "数据获取失败",
"log": self.processing_log
}
records = fetch_result["data"]
step1_result["record_count"] = len(records)
# ============================================================
# 步骤2: 数据清洗
# ============================================================
step2_result = {"step": "数据清洗"}
# 验证记录
validation = self.cleaner.validate_records(records)
step2_result["validation"] = validation
if validation["valid_count"] > 0:
records = validation["valid_records"]
# 去重
if (cleaning_rules or {}).get("deduplicate", True):
dedup = self.deduplicate = self.cleaner.deduplicate(records)
step2_result["deduplication"] = dedup
records = dedup["unique_records"]
# 过滤(如果有条件)
if "filter_conditions" in (cleaning_rules or {}):
filter_result = self.cleaner.filter_records(
records,
cleaning_rules["filter_conditions"]
)
step2_result["filtering"] = filter_result
records = filter_result["filtered_records"]
step2_result["record_count"] = len(records)
self.processing_log.append(step2_result)
# ============================================================
# 步骤3: 数据导出
# ============================================================
step3_result = {"step": "数据导出"}
if output_format == DataFormat.CSV:
export_result = self.exporter.export_to_csv(
records, output_path
)
elif output_format == DataFormat.JSON:
output_path = output_path.replace(".csv", ".json")
export_result = self.exporter.export_to_json(
records, output_path
)
else:
export_result = {"success": False, "error": "不支持的格式"}
step3_result.update(export_result)
self.processing_log.append(step3_result)
# ============================================================
# 生成报告
# ============================================================
report = self.exporter.generate_report(self.processing_log)
execution_time = time.time() - start_time
return {
"success": True,
"execution_time": round(execution_time, 2),
"input_records": self.processing_log[0].get("count", 0),
"output_records": len(records),
"output_path": output_path,
"report": report,
"log": self.processing_log
}
# ============================================================
# CowAgent集成
# ============================================================
class IntelligentDataAssistant:
"""
智能数据助手 - 使用CowAgent驱动的数据处理系统
"""
def __init__(self):
self.pipeline = DataPipeline()
self.name = "数据处理助手"
def process_request(self, user_request: str) -> str:
"""
处理用户请求
Args:
user_request: 用户的自然语言请求
Returns:
处理结果描述
"""
# 简化版意图识别
# 实际项目中可以使用更复杂的NLU系统
request_lower = user_request.lower()
# 处理"生成报告"类请求
if "生成" in user_request and "数据" in user_request:
result = self.pipeline.run(
source="generated",
source_params={"count": 50},
output_format=DataFormat.CSV
)
if result["success"]:
return f"""数据处理完成!
📊 处理摘要:
- 原始数据:{result['input_records']} 条
- 处理后数据:{result['output_records']} 条
- 处理时间:{result['execution_time']}秒
- 输出文件:{result['output_path']}
{result['report']}"""
else:
return f"处理失败: {result.get('error', '未知错误')}"
# 处理"分析"类请求
if "分析" in user_request:
result = self.pipeline.run(
source="generated",
source_params={"count": 100},
cleaning_rules={"deduplicate": True},
output_format=DataFormat.JSON
)
if result["success"]:
# 简单统计分析
records = result["log"][1]["validation"]["valid_records"]
# 按类别统计
category_stats = {}
for record in records:
cat = record.get("category", "未知")
category_stats[cat] = category_stats.get(cat, 0) + 1
stats_text = "\n".join([
f" - {cat}: {count}条 ({count/len(records)*100:.1f}%)"
for cat, count in sorted(
category_stats.items(),
key=lambda x: x[1],
reverse=True
)
])
return f"""📈 数据分析报告
数据概览:
- 总记录数:{len(records)}条
- 有效记录:{result['output_records']}条
类别分布:
{stats_text}
详细报告已保存至:{result['output_path']}"""
# 默认回复
return """我理解你的请求,但需要更多信息。
你可以尝试:
1. "帮我生成50条示例数据并导出"
2. "分析一下数据分布情况"
3. "对input.csv进行处理"
请告诉我具体想做什么?"""
# ============================================================
# 使用示例
# ============================================================
if __name__ == "__main__":
# 创建智能数据助手
assistant = IntelligentDataAssistant()
# 测试各种请求
test_requests = [
"帮我生成50条示例数据并导出",
"分析一下数据分布情况"
]
for request in test_requests:
print(f"\n用户请求: {request}")
print("-" * 60)
result = assistant.process_request(request)
print(result)
print()
运行结果
python data_pipeline.py
输出示例:
用户请求: 帮我生成50条示例数据并导出
------------------------------------------------------------
数据处理完成!
📊 处理摘要:
- 原始数据:50 条
- 处理后数据:50 条
- 处理时间:0.12秒
- 输出文件:./output/data_export.csv
============================================================
📊 数据处理报告
============================================================
生成时间: 2024-01-15T10:30:45.123456
------------------------------------------------------------
【数据获取】
处理记录数: 50
状态: ✓ 成功
【数据清洗】
处理记录数: 50
状态: ✓ 成功
验证通过: 50条
去重后: 50条
【数据导出】
状态: ✓ 成功
输出路径: ./output/data_export.csv
记录数: 50
------------------------------------------------------------
总计处理: 50 条记录
============================================================
教程三:构建多Agent协作系统
最后一个实战案例,我们来构建一个多Agent协作系统,模拟一个小型项目团队。
系统设计
┌──────────────────────────────────────────────────────┐
│ Project Team │
│ ┌────────────────────────────────────────────────┐ │
│ │ Project Manager Agent │ │
│ │ (项目经理 - 协调整个项目) │ │
│ └────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────┼─────────────────┐ │
│ ↓ ↓ ↓ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 开发 │ │ 测试 │ │ 文档 │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────────────────────────────────────────┘
实现代码
创建 multi_agent_team.py:
"""
多Agent协作系统
模拟一个小型项目团队,各Agent分工协作完成任务
"""
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json
# ============================================================
# Agent基类
# ============================================================
class AgentStatus(Enum):
"""Agent状态"""
IDLE = "idle"
WORKING = "working"
WAITING = "waiting"
COMPLETED = "completed"
ERROR = "error"
@dataclass
class Task:
"""任务"""
id: str
title: str
description: str
assignee: Optional[str] = None
status: str = "pending"
priority: int = 1
subtasks: List["Task"] = field(default_factory=list)
result: Optional[Dict] = None
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: Optional[str] = None
@dataclass
class Agent:
"""Agent基类"""
name: str
role: str
description: str
capabilities: List[str]
status: AgentStatus = AgentStatus.IDLE
current_task: Optional[Task] = None
work_history: List[Dict] = field(default_factory=list)
def introduce(self) -> str:
"""自我介绍"""
return f"""
我是{name},担任{role}角色。
我的职责:
{self.description}
我能做的事情:
{chr(10).join([' - ' + cap for cap in self.capabilities])}
"""
def assign_task(self, task: Task) -> str:
"""接收任务"""
self.current_task = task
self.status = AgentStatus.WORKING
task.assignee = self.name
task.status = "in_progress"
return f"收到任务:{task.title},开始执行..."
def complete_task(self, result: Dict) -> str:
"""完成任务"""
if self.current_task:
self.current_task.status = "completed"
self.current_task.completed_at = datetime.now().isoformat()
self.current_task.result = result
self.work_history.append({
"task_id": self.current_task.id,
"completed_at": self.current_task.completed_at,
"result_summary": str(result)[:200]
})
self.status = AgentStatus.IDLE
completed_task = self.current_task
self.current_task = None
return f"任务 {completed_task.title} 已完成!"
# ============================================================
# 专业化Agent实现
# ============================================================
class DeveloperAgent(Agent):
"""开发Agent"""
def __init__(self):
super().__init__(
name="小开",
role="后端开发工程师",
description="负责代码开发、架构设计和技术实现",
capabilities=[
"编写Python/Java/Go代码",
"设计数据库结构",
"实现API接口",
"代码审查和优化",
"解决技术难题"
]
)
def develop_feature(self, feature: str) -> Dict:
"""
开发功能模块
Args:
feature: 功能需求描述
Returns:
开发结果
"""
# 模拟开发过程
code_structure = {
"module_name": self._extract_module_name(feature),
"files": [],
"dependencies": [],
"api_endpoints": []
}
# 生成代码框架
code_structure["files"].append({
"path": f"src/{code_structure['module_name']}/__init__.py",
"content": "# Module initialization"
})
code_structure["files"].append({
"path": f"src/{code_structure['module_name']}/models.py",
"content": f'''"""
数据模型定义
功能:{feature}
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class DataModel:
"""数据模型"""
id: str
name: str
status: str = "active"
metadata: Optional[dict] = None
'''
})
code_structure["files"].append({
"path": f"src/{code_structure['module_name']}/service.py",
"content": f'''"""
业务逻辑层
功能:{feature}
"""
class BusinessService:
"""业务服务类"""
def process(self, data: dict) -> dict:
"""处理业务逻辑"""
return {{
"success": True,
"processed_data": data
}}
'''
})
return {
"success": True,
"feature": feature,
"module_name": code_structure["module_name"],
"files_created": len(code_structure["files"]),
"estimated_lines": sum(
f["content"].count("\\n") + 1
for f in code_structure["files"]
)
}
def _extract_module_name(self, feature: str) -> str:
"""从功能描述提取模块名"""
# 简单实现:取前两个关键词
words = feature.replace("功能", "").replace("模块", "").split()[:2]
return "_".join(w for w in words if w)
class TesterAgent(Agent):
"""测试Agent"""
def __init__(self):
super().__init__(
name="小测",
role="测试工程师",
description="负责测试用例编写、自动化测试和质量保证",
capabilities=[
"编写单元测试",
"编写集成测试",
"执行自动化测试",
"缺陷跟踪和管理",
"性能测试"
]
)
def write_test_cases(self, feature: str, code_info: Dict) -> Dict:
"""
编写测试用例
Args:
feature: 功能描述
code_info: 代码信息
Returns:
测试用例结果
"""
module_name = code_info.get("module_name", "unknown_module")
test_cases = {
"unit_tests": [
{
"name": f"test_{module_name}_init",
"description": "测试模块初始化",
"expected": "模块正常实例化",
"priority": "high"
},
{
"name": f"test_{module_name}_basic_operation",
"description": "测试基本操作",
"expected": "操作返回正确结果",
"priority": "high"
},
{
"name": f"test_{module_name}_edge_cases",
"description": "测试边界情况",
"expected": "边界情况正确处理",
"priority": "medium"
}
],
"integration_tests": [
{
"name": f"test_{module_name}_workflow",
"description": "测试完整工作流",
"expected": "各组件协同正常",
"priority": "high"
}
]
}
return {
"success": True,
"feature": feature,
"test_cases_count": sum(
len(cases) for cases in test_cases.values()
),
"test_cases": test_cases,
"coverage_estimate": "85%"
}
def run_tests(self, test_suite: Dict) -> Dict:
"""
运行测试
Args:
test_suite: 测试套件
Returns:
测试执行结果
"""
import random
# 模拟测试执行
total = test_suite.get("test_cases_count", 0)
passed = int(total * random.uniform(0.8, 0.95))
failed = total - passed
return {
"success": True,
"total_tests": total,
"passed": passed,
"failed": failed,
"pass_rate": f"{passed/total*100:.1f}%",
"execution_time": f"{random.uniform(0.5, 2.0):.2f}s"
}
class DocumenterAgent(Agent):
"""文档Agent"""
def __init__(self):
super().__init__(
name="小档",
role="技术文档工程师",
description="负责技术文档编写、API文档和维护指南",
capabilities=[
"编写README文档",
"编写API接口文档",
"编写使用指南",
"维护开发文档",
"编写CHANGELOG"
]
)
def generate_documentation(self, feature: str, code_info: Dict) -> Dict:
"""
生成技术文档
Args:
feature: 功能描述
code_info: 代码信息
Returns:
文档生成结果
"""
module_name = code_info.get("module_name", "module")
docs = {
"readme": f'''# {module_name}
## 功能说明
{feature}
## 安装
```bash
pip install {module_name}
快速开始
from {module_name} import DataModel, BusinessService
**初始化**
service = BusinessService()
**使用**
result = service.process({{"data": "example"}})
print(result)
API文档
详见 API文档
”’,
“api_doc”: f”’# {module_name} API 文档
DataModel
数据模型类。
属性
| 属性名 | 类型 | 说明 |
|---|---|---|
| id | str | 唯一标识符 |
| name | str | 名称 |
| status | str | 状态 |
| metadata | dict | 附加数据 |
BusinessService
业务服务类。
方法
process(data: dict) -> dict
处理业务数据。
参数:
– data (dict): 输入数据
返回:
– dict: 处理结果
”’
}
return {
"success": True,
"documents_created": list(docs.keys()),
"documents": docs,
"word_count": sum(len(d.split()) for d in docs.values())
}
============================================================
项目经理Agent
============================================================
class ProjectManagerAgent:
“””项目经理Agent – 协调多个Agent工作”””
def __init__(self):
self.name = "项目经理"
# 初始化团队成员
self.developer = DeveloperAgent()
self.tester = TesterAgent()
self.documenter = DocumenterAgent()
# 团队成员注册
self.team = {
"dev": self.developer,
"test": self.tester,
"doc": self.documenter
}
# 项目任务列表
self.tasks: List[Task] = []
self.project_log: List[Dict] = []
def introduce_team(self) -> str:
"""团队介绍"""
introductions = ["=" * 50, "🚀 项目团队介绍", "=" * 50, ""]
for role, agent in self.team.items():
introductions.append(agent.introduce())
introductions.append("")
return "\n".join(introductions)
def receive_requirement(self, requirement: str) -> str:
"""
接收需求
Args:
requirement: 需求描述
Returns:
需求确认信息
"""
# 创建主任务
main_task = Task(
id=f"TASK-{len(self.tasks) + 1:03d}",
title=requirement,
description=f"完成{requirement}功能的开发和测试",
priority=1
)
# 分解子任务
main_task.subtasks = [
Task(
id=f"{main_task.id}-1",
title="开发功能模块",
description=f"开发{requirement}",
assignee="小开"
),
Task(
id=f"{main_task.id}-2",
title="编写测试用例",
description=f"为{requirement}编写测试",
assignee="小测"
),
Task(
id=f"{main_task.id}-3",
title="生成技术文档",
description=f"为{requirement}生成文档",
assignee="小档"
)
]
self.tasks.append(main_task)
# 记录日志
self.project_log.append({
"timestamp": datetime.now().isoformat(),
"action": "requirement_received",
"requirement": requirement,
"task_id": main_task.id,
"subtasks": [t.id for t in main_task.subtasks]
})
response = [
f"📋 已收到需求:{requirement}",
f"任务ID:{main_task.id}",
"",
"任务分解:",
]
for i, subtask in enumerate(main_task.subtasks, 1):
response.append(f" {i}. [{subtask.id}] {subtask.title} → {subtask.assignee}")
response.extend(["", "开始执行任务..."])
return "\n".join(response)
def execute_project(self) -> str:
"""
执行项目
Returns:
项目执行结果
"""
if not self.tasks:
return "没有待执行的项目"
results = []
main_task = self.tasks[-1] # 执行最新的任务
# ============================================================
# 阶段1: 开发
# ============================================================
dev_task = main_task.subtasks[0]
results.append(f"\n{'='*50}")
results.append(f"【阶段1】开发 - 负责人:{self.developer.name}")
results.append(f"{'='*50}")
self.developer.assign_task(dev_task)
dev_result = self.developer.develop_feature(main_task.title)
results.append(self.developer.complete_task(dev_result))
self.project_log.append({
"timestamp": datetime.now().isoformat(),
"phase": "development",
"agent": "dev",
"result": dev_result
})
# ============================================================
# 阶段2: 测试
# ============================================================
test_task = main_task.subtasks[1]
results.append(f"\n{'='*50}")
results.append(f"【阶段2】测试 - 负责人:{self.tester.name}")
results.append(f"{'='*50}")
self.tester.assign_task(test_task)
test_result = self.tester.write_test_cases(main_task.title, dev_result)
results.append(f"✓ 已生成 {test_result['test_cases_count']} 个测试用例")
# 执行测试
test_run = self.tester.run_tests(test_result)
results.append(f"✓ 测试执行完成:通过率 {test_run['pass_rate']}")
results.append(f" - 总测试数:{test_run['total_tests']}")
results.append(f" - 通过:{test_run['passed']}")
results.append(f" - 失败:{test_run['failed']}")
self.tester.complete_task({"test_cases": test_result, "run": test_run})
self.project_log.append({
"timestamp": datetime.now().isoformat(),
"phase": "testing",
"agent": "test",
"result": test_run
})
# ============================================================
# 阶段3: 文档
# ============================================================
doc_task = main_task.subtasks[2]
results.append(f"\n{'='*50}")
results.append(f"【阶段3】文档 - 负责人:{self.documenter.name}")
results.append(f"{'='*50}")
self.documenter.assign_task(doc_task)
doc_result = self.documenter.generate_documentation(main_task.title, dev_result)
results.append(f"✓ 已生成文档:{', '.join(doc_result['documents_created'])}")
results.append(f"✓ 总字数:约 {doc_result['word_count']} 词")
self.documenter.complete_task(doc_result)
self.project_log.append({
"timestamp": datetime.now().isoformat(),
"phase": "documentation",
"agent": "doc",
"result": {"documents": list(doc_result["documents"].keys())}
})
# ============================================================
# 完成总结
# ============================================================
results.append(f"\n{'='*50}")
results.append("✅ 项目完成总结")
results.append(f"{'='*50}")
results.append(f"需求:{main_task.title}")
results.append(f"任务ID:{main_task.id}")
results.append(f"完成时间:{datetime.now().isoformat()}")
results.append("")
results.append("产出物:")
results.append(f" 📦 代码模块:{dev_result['module_name']}/")
results.append(f" - 创建文件:{dev_result['files_created']} 个")
results.append(f" - 预计代码行数:{dev_result['estimated_lines']} 行")
results.append(f" 🧪 测试用例:{test_result['test_cases_count']} 个")
results.append(f" - 测试通过率:{test_run['pass_rate']}")
results.append(f" 📝 技术文档:{len(doc_result['documents_created'])} 份")
main_task.status = "completed"
main_task.completed_at = datetime.now().isoformat()
return "\n".join(results)
============================================================
团队协作界面
============================================================
class TeamCollaborationInterface:
“””团队协作交互界面”””
def __init__(self):
self.pm = ProjectManagerAgent()
def run(self):
"""启动交互界面"""
print("=" * 60)
print("🤖 多Agent协作系统 - 项目团队模拟")
print("=" * 60)
print()
while True:
print("\n请选择操作:")
print(" 1. 查看团队成员")
print(" 2. 提交新需求")
print(" 3. 执行当前项目")
print(" 4. 退出")
print()
choice = input("输入选项 [1-4]: ").strip()
if choice == "1":
print(self.pm.introduce_team())
elif choice == "2":
requirement = input("\n请输入需求描述: ").strip()
if requirement:
print(f"\n{self.pm.receive_requirement(requirement)}")
elif choice == "3":
result = self.pm.execute_project()
print(result)
elif choice == "4":
print("\n再见!")
break
else:
print("\n无效选项,请重新选择")
============================================================
主程序
============================================================
if name == “main“:
interface = TeamCollaborationInterface()
interface.run()
**交互演示**
运行程序后,你会看到:
============================================================
🤖 多Agent协作系统 – 项目团队模拟
============================================================
请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出
输入选项 [1-4]: 1
==================================================
🚀 项目团队介绍
==================================================
我是小开,担任后端开发工程师角色。
我的职责:
负责代码开发、架构设计和技术实现
我能做的事情:
– 编写Python/Java/Go代码
– 设计数据库结构
– 实现API接口
– 代码审查和优化
– 解决技术难题
…
请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出
输入选项 [1-4]: 2
请输入需求描述: 用户权限管理模块
📋 已收到需求:用户权限管理模块
任务ID:TASK-001
任务分解:
1. [TASK-001-1] 开发功能模块 → 小开
2. [TASK-001-2] 编写测试用例 → 小测
3. [TASK-001-3] 生成技术文档 → 小档
开始执行任务…
请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出
输入选项 [1-4]: 3
==================================================
【阶段1】开发 – 负责人:小开
==================================================
任务 TASK-001-1 已完成!
✓ 模块名:用户_权限_管理
✓ 创建文件:3 个
✓ 预计代码行数:约 45 行
==================================================
【阶段2】测试 – 负责人:小测
==================================================
✓ 已生成 4 个测试用例
✓ 测试执行完成:通过率 92.5%
– 总测试数:4
– 通过:3
– 失败:1
==================================================
【阶段3】文档 – 负责人:小档
==================================================
✓ 已生成文档:readme, api_doc
✓ 总字数:约 180 词
==================================================
✅ 项目完成总结
==================================================
需求:用户权限管理模块
任务ID:TASK-001
完成时间:2024-01-15T10:35:20.123456
产出物:
📦 代码模块:用户_权限_管理/
– 创建文件:3 个
– 预计代码行数:45 行
🧪 测试用例:4 个
– 测试通过率:92.5%
📝 技术文档:2 份
---
**常见使用场景**
**场景一:个人效率助手**
```python
"""
个人效率助手配置示例
整合日历、邮件、任务管理等功能
"""
from cowagent import CowAgent
# 创建个人助手
personal_assistant = CowAgent(
system_prompt="""
你是一个高效的个人助手,帮助用户管理:
- 日程安排
- 邮件处理
- 任务清单
- 文件整理
- 信息查询
始终以用户的利益为重,提供高效、准确的服务。
"""
)
# 自然语言交互示例
personal_assistant.chat("帮我把明天下午3点的会议改到4点")
personal_assistant.chat("给团队发一封周报邮件,总结这周的工作")
personal_assistant.chat("整理一下 Downloads 文件夹,把超过30天的文件移到归档目录")
场景二:代码审查机器人
"""
代码审查机器人
自动审查Pull Request,提供改进建议
"""
class CodeReviewBot:
"""代码审查机器人"""
def __init__(self):
self.agent = CowAgent(
system_prompt="你是一个资深的代码审查专家..."
)
def review_pr(self, pr_data: Dict) -> Dict:
"""
审查Pull Request
Args:
pr_data: PR数据
Returns:
审查结果
"""
prompt = f"""
请审查以下Pull Request:
标题:{pr_data['title']}
描述:{pr_data['description']}
代码变更:{pr_data['diff']}
请从以下维度进行审查:
1. 代码质量
2. 潜在bug
3. 安全问题
4. 性能考虑
5. 代码风格
"""
return self.agent.chat(prompt)
场景三:数据分析助手
"""
数据分析助手
连接数据库,执行查询,生成可视化
"""
class DataAnalysisAssistant:
"""数据分析助手"""
def __init__(self, db_config: Dict):
self.agent = CowAgent(
system_prompt="你是一个专业的数据分析师..."
)
self.db_config = db_config
def natural_language_query(self, question: str) -> str:
"""
自然语言查询
Args:
question: 自然语言问题
Returns:
分析结果
"""
# Agent会将自然语言转换为SQL
sql = self.agent.chat(f"将以下问题转换为SQL:{question}")
# 执行查询
result = self.execute_query(sql)
# 生成分析报告
report = self.agent.chat(f"基于以下数据回答问题:{result}")
return report
最佳实践与优化建议
1. Prompt工程优化
好的Prompt示例:
system_prompt = """
你是{role},专注于{domain}领域。
## 你的专业能力
{capabilities}
## 工作原则
1. {principle_1}
2. {principle_2}
3. {principle_3}
## 输出格式
请按照以下格式输出结果:
- 结论:{conclusion_format}
- 依据:{evidence_format}
- 建议:{suggestion_format}
## 注意事项
{注意事项}
"""
需要避免的问题:
– Prompt过于笼统,缺乏具体指导
– 没有明确输出格式要求
– 缺少边界情况和错误处理说明
2. 错误处理策略
from cowagent.exceptions import (
ToolExecutionError,
AgentTimeoutError,
InvalidInputError
)
def robust_execute(agent: CowAgent, task: str, max_retries: int = 3):
"""
带错误处理的执行函数
Args:
agent: Agent实例
task: 任务描述
max_retries: 最大重试次数
Returns:
执行结果
"""
for attempt in range(max_retries):
try:
result = agent.execute_task(task)
return {"success": True, "result": result}
except ToolExecutionError as e:
# 工具执行失败,尝试备用方案
print(f"工具执行失败,尝试备用方案... ({attempt+1}/{max_retries})")
task = modify_task_for_fallback(task)
except AgentTimeoutError as e:
# 超时,简化任务
print(f"任务超时,简化处理... ({attempt+1}/{max_retries})")
task = simplify_task(task)
except InvalidInputError as e:
# 输入无效,直接返回错误
return {
"success": False,
"error": f"输入无效: {str(e)}"
}
return {
"success": False,
"error": "达到最大重试次数,执行失败"
}
3. 性能优化技巧
# 技巧1:缓存频繁使用的结果
from cowagent.cache import SimpleCache
cache = SimpleCache(ttl=3600) # 1小时过期
def cached_agent_call(agent, query, cache_key):
"""带缓存的Agent调用"""
cached = cache.get(cache_key)
if cached:
return cached
result = agent.chat(query)
cache.set(cache_key, result)
return result
# 技巧2:并行执行独立任务
from concurrent.futures import ThreadPoolExecutor
def parallel_execution(agent, tasks):
"""并行执行多个独立任务"""
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(agent.chat, task) for task in tasks]
results = [f.result() for f in futures]
return results
# 技巧3:任务分解减少Token消耗
def decompose_large_task(task):
"""将大任务分解为小任务,减少单次调用Token"""
subtasks = [
"第一步:...",
"第二步:...",
"第三步:..."
]
return subtasks
4. 安全注意事项
# 重要:永远不要让Agent直接执行未验证的代码
class SafeAgentExecutor:
"""安全的Agent执行器"""
def __init__(self, agent: CowAgent):
self.agent = agent
self.dangerous_patterns = [
"rm -rf",
"DROP TABLE",
"DELETE FROM",
"format",
"os.system"
]
def validate_task(self, task: str) -> bool:
"""验证任务安全性"""
task_lower = task.lower()
for pattern in self.dangerous_patterns:
if pattern.lower() in task_lower:
return False
return True
def safe_execute(self, task: str) -> Dict:
"""安全执行任务"""
if not self.validate_task(task):
return {
"success": False,
"error": "任务包含潜在危险操作,已拒绝执行"
}
return self.agent.execute_task(task)
总结与资源链接
CowAgent核心价值回顾
通过本文的详细介绍,我们可以看到 CowAgent 的几大核心优势:
- 任务执行能力:不同于传统AI只能回答问题,CowAgent能够真正执行任务
- 多层次架构:Supervisor + Specialized Agents的分层设计,职责清晰,易于扩展
- 强大的工具系统:内置丰富的工具,并支持自定义工具扩展
- 记忆系统:短期记忆+长期记忆+向量检索,让Agent越用越聪明
- 多Agent协作:支持构建Agent团队,实现复杂任务的协作完成
项目地址
- GitHub仓库:https://github.com/zhayujie/CowAgent
- 文档地址:https://cowagent.readthedocs.io
- 示例代码:https://github.com/zhayujie/CowAgent/tree/main/examples
相关AI项目推荐
| 项目 | 说明 | 适用场景 |
|---|---|---|
| LangChain | LLM应用开发框架 | 构建复杂LLM应用 |
| AutoGPT | 自主AI Agent | 复杂任务自动化 |
| CrewAI | 多Agent协作框架 | 团队协作任务 |
| Semantic Kernel | 微软AI框架 | 企业级AI应用 |
| LlamaIndex | 知识增强LLM | 知识库问答 |
进阶学习路径
- 入门阶段:完成本文的三个实战教程
- 进阶阶段:学习LangChain/CrewAI等框架的集成使用
- 深入阶段:研究Agent的规划算法、记忆机制、多模态能力
- 实战阶段:将CowAgent应用到实际工作场景,解决真实问题
写在最后
CowAgent为我们打开了一扇通向真正智能助手的大门。它不仅仅是另一个AI项目,更代表了一种新的范式——从「问答」到「执行」,从「单点能力」到「系统协作」。
希望你通过本文的学习,能够掌握CowAgent的核心用法,并将其应用到实际工作中。如果你有任何问题或心得,欢迎在评论区交流!
祝学习愉快!
评论区