CowAgent:让AI助手真正「会干活」的开源Agent框架,我用了三个月后彻底惊艳了

CowAgent:让AI助手真正「会干活」的开源Agent框架,我用了三个月后彻底惊艳了

CowAgent:让AI助手真正「会干活」的开源Agent框架,我用了三个月后彻底惊艳了

引言

你是否曾经遇到过这样的困境:给AI下达了一个看似简单的任务,它却只是返回一段文字答案,而无法真正帮你执行操作?又或者,当你想让AI帮你在电脑上完成一系列复杂操作时,却发现它「有心无力」?

今天我要介绍的这个开源项目——CowAgent,正是为了解决这个痛点而生。它不仅仅是一个对话式AI,更是一个能够真正帮你「干活」的智能助手框架。


为什么 CowAgent 值得关注

传统AI助手的局限性

在介绍 CowAgent 之前,让我们先回顾一下传统AI助手的工作模式:

用户:帮我查一下明天的天气
AI:明天天气晴朗,温度15-25度,适合外出活动。

用户:帮我把这张照片发到微信群
AI:抱歉,我无法直接执行这些操作...

看到了吗?传统AI只能「回答问题」,而无法「执行任务」。这就是我们常说的「语言模型天花板」——它很聪明,但缺乏真正的「手」和「脚」。

CowAgent 的核心突破

CowAgent 的出现打破了这一困境。它构建了一个完整的Agent执行架构,让AI能够:

  • 理解你的意图:不仅听懂你说的,还能理解你想做的
  • 规划执行步骤:将复杂任务拆解为可执行的子任务
  • 调用各种工具:文件操作、代码执行、API调用、系统控制…
  • 自我纠错:任务执行失败时能够自动重试或调整策略

简单来说,CowAgent 让AI从「能说」进化到「能做事」

为什么选择 CowAgent

特性 传统AI助手 CowAgent
交互方式 纯文本对话 对话 + 任务执行
能力范围 回答问题 完成任务链
自动化程度
可扩展性 有限 插件系统支持
执行透明度 黑盒 可追踪、可干预

环境搭建:5分钟快速入门

系统要求

在开始之前,确保你的环境满足以下要求:

- Python 3.9 或更高版本
- pip 包管理器
- 稳定的网络连接(用于调用AI模型API)
- 至少4GB可用内存

安装步骤

第一步:创建虚拟环境(推荐但非必需)

# 创建新的虚拟环境
python -m venv cowagent-env

# 激活虚拟环境
# Windows 系统
cowagent-env\Scripts\activate

# macOS / Linux 系统
source cowagent-env/bin/activate

第二步:安装 CowAgent

# 使用 pip 安装
pip install cowagent

# 或者从源码安装(获取最新功能)
git clone https://github.com/zhayujie/CowAgent.git
cd CowAgent
pip install -e .

第三步:配置API密钥

CowAgent 需要连接AI模型来提供智能能力。你需要配置API密钥:

# 在项目根目录创建 config.py 文件
import os

# 设置你的API密钥(以OpenAI为例)
os.environ["OPENAI_API_KEY"] = "你的API密钥"

# 如果使用其他模型,可以设置对应的密钥
# os.environ["ANTHROPIC_API_KEY"] = "你的Claude密钥"
# os.environ["ZHIPU_API_KEY"] = "你的智谱密钥"

第四步:验证安装

# 创建一个简单的测试脚本 test_install.py
from cowagent import CowAgent

# 初始化Agent
agent = CowAgent()

# 测试基本对话
response = agent.chat("你好,请介绍一下你自己")
print(response)

# 预期输出:Agent会介绍自己的功能和特点

运行测试脚本,如果一切正常,你会看到类似以下输出:

CowAgent初始化完成!
你好!我是CowAgent,一个智能任务执行助手...

核心功能详解

1. 多层级Agent架构

CowAgent 采用分层设计,这是它最核心的设计理念:

┌─────────────────────────────────────┐
│         User Interface              │
│         (用户交互层)                │
├─────────────────────────────────────┤
│      CowAgent Core                  │
│   ┌─────────────────────────┐       │
│   │   Supervisor Agent      │       │
│   │   (任务监督者)          │       │
│   ├─────────────────────────┤       │
│   │   Specialized Agents    │       │
│   │   ┌─────┐ ┌─────┐ ┌────┐│       │
│   │   │搜索 │ │文件 │ │代码 ││       │
│   │   │Agent│ │Agent│ │Agent││       │
│   │   └─────┘ └─────┘ └────┘│       │
│   └─────────────────────────┘       │
├─────────────────────────────────────┤
│        Tool Layer                   │
│    (工具执行层 - 可扩展)            │
└─────────────────────────────────────┘

Supervisor Agent(监督者) 是整个系统的中枢,它负责:
– 理解用户意图
– 分解复杂任务
– 分配给专业Agent
– 整合子任务结果

Specialized Agents(专业Agent) 是各个领域的专家:
– 每个Agent专注特定能力
– 可以并行执行独立任务
– 通过标准化接口通信

2. 强大的工具系统

CowAgent 的工具系统是其执行能力的基础。每个工具都是一个独立的执行单元:

内置工具示例

# 文件操作工具
class FileTool:
    """文件操作工具集"""

    def read_file(self, path: str) -> str:
        """读取文件内容"""
        with open(path, 'r', encoding='utf-8') as f:
            return f.read()

    def write_file(self, path: str, content: str) -> bool:
        """写入文件内容"""
        with open(path, 'w', encoding='utf-8') as f:
            f.write(content)
        return True

    def list_directory(self, path: str) -> list:
        """列出目录内容"""
        import os
        return os.listdir(path)

# 代码执行工具
class CodeTool:
    """代码执行工具"""

    def execute_python(self, code: str) -> dict:
        """执行Python代码"""
        import subprocess
        result = subprocess.run(
            ['python', '-c', code],
            capture_output=True,
            text=True
        )
        return {
            'stdout': result.stdout,
            'stderr': result.stderr,
            'returncode': result.returncode
        }

自定义工具

你可以轻松添加自己的工具:

from cowagent.tools import BaseTool

class MyCustomTool(BaseTool):
    """自定义工具示例"""

    # 工具的唯一标识符
    name = "my_custom_tool"

    # 工具的详细描述,用于AI理解何时使用
    description = """
    这是一个自定义工具,用于执行特定业务逻辑。
    当用户需要XXX操作时使用此工具。
    """

    # 工具参数定义
    parameters = {
        "type": "object",
        "properties": {
            "param1": {
                "type": "string",
                "description": "参数1的说明"
            }
        },
        "required": ["param1"]
    }

    def execute(self, param1: str, **kwargs) -> dict:
        """
        实际执行逻辑

        Args:
            param1: 输入参数

        Returns:
            执行结果字典
        """
        # 你的业务逻辑
        result = f"处理了: {param1}"

        return {
            "success": True,
            "result": result
        }

# 注册工具
agent = CowAgent()
agent.register_tool(MyCustomTool())

3. 记忆系统

CowAgent 内置了强大的记忆系统,让Agent能够:

  • 短期记忆:记住当前对话的上下文
  • 长期记忆:持久化重要信息到本地存储
  • 向量检索:基于语义相似度召回历史信息
from cowagent.memory import ConversationMemory, VectorMemory

# 初始化记忆系统
conversation_memory = ConversationMemory(
    max_turns=20  # 保留最近20轮对话
)

vector_memory = VectorMemory(
    persist_path="./memory_data"  # 持久化路径
)

# Agent中使用记忆
agent = CowAgent(
    conversation_memory=conversation_memory,
    vector_memory=vector_memory
)

# 查询历史相关记忆
related_memories = agent.recall("之前我让你帮我写的那个爬虫脚本")
print(related_memories)

4. 任务规划引擎

当用户提出复杂任务时,CowAgent 的任务规划引擎会自动:

  1. 任务分解:将大任务拆分为可执行的子任务
  2. 依赖分析:确定子任务之间的执行顺序
  3. 并行优化:识别可并行执行的任务
  4. 动态调整:根据执行结果调整后续计划
# 复杂任务示例
task = """
帮我完成以下工作:
1. 从豆瓣获取Top250电影数据
2. 筛选出评分高于9.0的电影
3. 将结果保存为Excel文件
4. 发送邮件给团队成员
"""

# CowAgent会自动规划执行
result = agent.execute_task(task)

# 执行过程会被详细记录
print(result["execution_trace"])
# [
#   {"step": 1, "action": "web_scraper", "status": "completed"},
#   {"step": 2, "action": "data_filter", "status": "completed"},
#   {"step": 3, "action": "excel_export", "status": "completed"},
#   {"step": 4, "action": "email_sender", "status": "completed"}
# ]

实战教程:从入门到精通

教程一:构建一个文件管理助手

让我们从最简单的例子开始——创建一个能够帮你管理文件的AI助手。

第一步:创建项目结构

mkdir -p file_assistant/{tools,memory,prompts}
cd file_assistant
touch tools/__init__.py

第二步:定义文件操作工具

创建 tools/file_manager.py

"""
文件管理工具集
提供文件读写、搜索、分类等基础功能
"""

import os
import shutil
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime

class FileManager:
    """文件管理器"""

    def __init__(self, base_path: str = "."):
        self.base_path = Path(base_path)

    def create_file(self, file_path: str, content: str = "") -> Dict:
        """
        创建新文件

        Args:
            file_path: 文件路径(相对于base_path)
            content: 初始内容

        Returns:
            操作结果字典
        """
        try:
            full_path = self.base_path / file_path
            full_path.parent.mkdir(parents=True, exist_ok=True)

            with open(full_path, 'w', encoding='utf-8') as f:
                f.write(content)

            return {
                "success": True,
                "message": f"文件创建成功: {file_path}",
                "path": str(full_path)
            }
        except Exception as e:
            return {
                "success": False,
                "message": f"创建失败: {str(e)}"
            }

    def read_file(self, file_path: str) -> Dict:
        """
        读取文件内容

        Args:
            file_path: 文件路径

        Returns:
            文件内容字典
        """
        try:
            full_path = self.base_path / file_path

            if not full_path.exists():
                return {
                    "success": False,
                    "message": "文件不存在"
                }

            with open(full_path, 'r', encoding='utf-8') as f:
                content = f.read()

            return {
                "success": True,
                "content": content,
                "size": full_path.stat().st_size,
                "modified": datetime.fromtimestamp(
                    full_path.stat().st_mtime
                ).isoformat()
            }
        except Exception as e:
            return {
                "success": False,
                "message": f"读取失败: {str(e)}"
            }

    def search_files(
        self,
        keyword: str,
        extension: Optional[str] = None
    ) -> Dict:
        """
        搜索文件

        Args:
            keyword: 搜索关键词(文件名包含此关键词)
            extension: 文件扩展名过滤(如 '.py', '.txt')

        Returns:
            搜索结果字典
        """
        results = []

        try:
            for root, dirs, files in os.walk(self.base_path):
                for file in files:
                    if keyword.lower() in file.lower():
                        if extension is None or file.endswith(extension):
                            full_path = Path(root) / file
                            results.append({
                                "name": file,
                                "path": str(full_path.relative_to(self.base_path)),
                                "size": full_path.stat().st_size
                            })

            return {
                "success": True,
                "count": len(results),
                "results": results
            }
        except Exception as e:
            return {
                "success": False,
                "message": f"搜索失败: {str(e)}"
            }

    def batch_rename(
        self,
        directory: str,
        old_pattern: str,
        new_pattern: str
    ) -> Dict:
        """
        批量重命名文件

        Args:
            directory: 目标目录
            old_pattern: 原文件名中的匹配模式
            new_pattern: 替换后的新模式

        Returns:
            操作结果字典
        """
        renamed = []
        failed = []

        try:
            dir_path = self.base_path / directory
            files = list(dir_path.iterdir())

            for file in files:
                if file.is_file() and old_pattern in file.name:
                    new_name = file.name.replace(old_pattern, new_pattern)
                    new_path = file.parent / new_name

                    try:
                        file.rename(new_path)
                        renamed.append({
                            "old": file.name,
                            "new": new_name
                        })
                    except Exception as e:
                        failed.append({
                            "name": file.name,
                            "error": str(e)
                        })

            return {
                "success": True,
                "renamed_count": len(renamed),
                "renamed": renamed,
                "failed_count": len(failed),
                "failed": failed
            }
        except Exception as e:
            return {
                "success": False,
                "message": f"批量重命名失败: {str(e)}"
            }

# ============================================================
# 导出工具接口供Agent调用
# ============================================================

def get_file_tools():
    """获取所有文件管理工具"""
    manager = FileManager()

    return {
        "create_file": manager.create_file,
        "read_file": manager.read_file,
        "search_files": manager.search_files,
        "batch_rename": manager.batch_rename
    }

第三步:创建Prompt模板

创建 prompts/file_assistant.txt

你是一个专业的文件管理助手,名叫小档。

你的职责:
- 帮助用户管理文件和文件夹
- 快速查找用户需要的文件
- 批量处理文件操作任务
- 提供文件整理建议

沟通原则:
1. 先理解用户的需求,再执行操作
2. 复杂操作前先确认用户意图
3. 操作完成后简要汇报结果
4. 如果操作可能影响重要文件,先提示风险

可用工具:
- create_file: 创建新文件
- read_file: 读取文件内容
- search_files: 搜索文件
- batch_rename: 批量重命名

当用户提出文件相关请求时,请选择合适的工具来完成任务。

第四步:整合为完整应用

创建 main.py

"""
文件管理助手主程序
集成CowAgent核心功能,提供智能文件管理服务
"""

import os
from cowagent import CowAgent
from cowagent.prompts import load_prompt_template
from cowagent.memory import ConversationMemory
from tools.file_manager import get_file_tools

# ============================================================
# 配置区域
# ============================================================

# 设置工作目录
WORK_DIR = "./workspace"

# 设置API密钥
os.environ["OPENAI_API_KEY"] = os.getenv(
    "OPENAI_API_KEY", 
    "your-api-key-here"
)

# ============================================================
# 初始化Agent
# ============================================================

def initialize_agent():
    """初始化文件管理助手"""

    # 加载Prompt模板
    system_prompt = load_prompt_template("prompts/file_assistant.txt")

    # 配置记忆系统
    memory = ConversationMemory(max_turns=50)

    # 获取工具集
    file_tools = get_file_tools()

    # 创建Agent实例
    agent = CowAgent(
        system_prompt=system_prompt,
        memory=memory,
        tools=file_tools,
        work_dir=WORK_DIR
    )

    return agent

# ============================================================
# 交互式界面
# ============================================================

def run_interactive_mode(agent: CowAgent):
    """启动交互式对话模式"""

    print("=" * 60)
    print("📁 文件管理助手 - 小档")
    print("=" * 60)
    print("输入你的文件管理需求,或输入 'quit' 退出")
    print("-" * 60)

    while True:
        try:
            user_input = input("\n🧑 你: ").strip()

            if not user_input:
                continue

            if user_input.lower() in ['quit', 'exit', 'q']:
                print("\n再见!有任何文件管理需求随时找我。")
                break

            # 发送给Agent处理
            response = agent.chat(user_input)

            print(f"\n🤖 小档: {response}")

        except KeyboardInterrupt:
            print("\n\n程序被用户中断,再见!")
            break
        except Exception as e:
            print(f"\n⚠️ 发生错误: {str(e)}")

# ============================================================
# 主程序入口
# ============================================================

if __name__ == "__main__":
    # 初始化Agent
    agent = initialize_agent()

    # 运行交互模式
    run_interactive_mode(agent)

第五步:运行测试

# 确保工作目录存在
mkdir -p workspace

# 运行程序
python main.py

使用示例对话:

============================================================
📁 文件管理助手 - 小档
============================================================
输入你的文件管理需求,或输入 'quit' 退出
------------------------------------------------------------

🧑 : workspace目录下创建一个readme.txt文件,内容是"这是我的第一个CowAgent项目"

🤖 小档: 好的,我来帮你创建这个文件!

 操作成功
文件已创建: workspace/readme.txt
内容: "这是我的第一个CowAgent项目"

🧑 : 帮我搜索所有.py文件

🤖 小档: 正在搜索.py文件...

找到 12 Python文件
1. main.py (2.3KB)
2. tools/file_manager.py (5.1KB)
3. tests/test_basic.py (1.2KB)
...

🧑 : 把所有文件名中的"test"改成"demo"

🤖 小档: 好的,我来进行批量重命名...

重命名完成!成功重命名 5 个文件:
- test_basic.py  demo_basic.py
- test_advanced.py  demo_advanced.py
- test_utils.py  demo_utils.py
...

教程二:构建自动化数据处理Pipeline

接下来,让我们创建一个更复杂的应用——数据处理自动化助手。

整体架构

用户需求
    ↓
CowAgent(任务分解)
    ↓
┌─────────┬─────────┬─────────┐
│ 数据获取 │ 数据清洗 │ 数据输出 │
│  Agent  │  Agent  │  Agent  │
└─────────┴─────────┴─────────┘
    ↓
Pipeline执行器
    ↓
结果汇总报告

实现代码

创建 data_pipeline.py

"""
数据处理Pipeline
展示如何用CowAgent构建自动化数据处理流程
"""

import json
import csv
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, asdict
from enum import Enum

# ============================================================
# 数据模型定义
# ============================================================

class DataFormat(Enum):
    """支持的数据格式"""
    CSV = "csv"
    JSON = "json"
    EXCEL = "excel"
    TEXT = "text"

@dataclass
class DataRecord:
    """数据记录"""
    id: str
    timestamp: str
    category: str
    value: float
    status: str
    metadata: Optional[Dict] = None

@dataclass
class ProcessingResult:
    """处理结果"""
    success: bool
    records_processed: int
    records_failed: int
    output_path: str
    execution_time: float
    details: List[str]

# ============================================================
# 数据获取Agent
# ============================================================

class DataFetcherAgent:
    """数据获取Agent - 负责从各种来源获取数据"""

    def __init__(self):
        self.name = "data_fetcher"
        self.description = "从CSV、JSON或API获取原始数据"

    def fetch_from_csv(self, file_path: str) -> Dict:
        """
        从CSV文件获取数据

        Args:
            file_path: CSV文件路径

        Returns:
            包含原始数据的字典
        """
        records = []

        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    records.append(row)

            return {
                "success": True,
                "count": len(records),
                "data": records,
                "source": file_path
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def fetch_from_json(self, file_path: str) -> Dict:
        """
        从JSON文件获取数据

        Args:
            file_path: JSON文件路径

        Returns:
            包含原始数据的字典
        """
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                data = json.load(f)

            # 处理单条记录或多条记录
            if isinstance(data, list):
                records = data
            else:
                records = [data]

            return {
                "success": True,
                "count": len(records),
                "data": records,
                "source": file_path
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def generate_sample_data(self, count: int = 100) -> Dict:
        """
        生成示例数据(用于测试)

        Args:
            count: 生成记录数量

        Returns:
            示例数据字典
        """
        import random
        from datetime import timedelta

        records = []
        base_date = datetime.now() - timedelta(days=30)
        categories = ["电子产品", "服装", "食品", "图书", "家居"]
        statuses = ["pending", "completed", "cancelled"]

        for i in range(count):
            days_offset = random.randint(0, 30)
            record_date = base_date + timedelta(days=days_offset)

            records.append({
                "id": f"ORD-{i+1:05d}",
                "timestamp": record_date.isoformat(),
                "category": random.choice(categories),
                "value": round(random.uniform(10, 1000), 2),
                "status": random.choice(statuses),
                "metadata": {
                    "customer_id": f"CUST-{random.randint(1000, 9999)}",
                    "region": random.choice(["华东", "华南", "华北", "西南"])
                }
            })

        return {
            "success": True,
            "count": len(records),
            "data": records,
            "source": "generated"
        }

# ============================================================
# 数据清洗Agent
# ============================================================

class DataCleanerAgent:
    """数据清洗Agent - 负责数据验证、转换和清洗"""

    def __init__(self):
        self.name = "data_cleaner"
        self.description = "清洗和转换原始数据"

    def validate_records(self, records: List[Dict]) -> Dict:
        """
        验证数据记录的完整性和有效性

        Args:
            records: 待验证的记录列表

        Returns:
            验证结果
        """
        required_fields = ["id", "timestamp", "category", "value", "status"]
        valid_records = []
        invalid_records = []
        issues = []

        for idx, record in enumerate(records):
            record_issues = []

            # 检查必填字段
            for field in required_fields:
                if field not in record or record[field] is None:
                    record_issues.append(f"缺少字段: {field}")

            # 验证数值字段
            if "value" in record:
                try:
                    value = float(record["value"])
                    if value < 0:
                        record_issues.append("value不能为负数")
                except (ValueError, TypeError):
                    record_issues.append("value格式错误")

            # 验证状态字段
            if "status" in record:
                valid_statuses = ["pending", "completed", "cancelled"]
                if record["status"] not in valid_statuses:
                    record_issues.append(f"无效的status值: {record['status']}")

            if record_issues:
                invalid_records.append({
                    "index": idx,
                    "record": record,
                    "issues": record_issues
                })
                issues.extend([f"记录{idx}: {issue}" for issue in record_issues])
            else:
                valid_records.append(record)

        return {
            "success": True,
            "valid_count": len(valid_records),
            "invalid_count": len(invalid_records),
            "valid_records": valid_records,
            "invalid_records": invalid_records[:10],  # 只保留前10条示例
            "issues": issues
        }

    def deduplicate(self, records: List[Dict], key: str = "id") -> Dict:
        """
        根据指定键去重

        Args:
            records: 记录列表
            key: 用于去重的键名

        Returns:
            去重结果
        """
        seen = set()
        unique_records = []
        duplicates = []

        for record in records:
            record_id = record.get(key)

            if record_id in seen:
                duplicates.append(record)
            else:
                seen.add(record_id)
                unique_records.append(record)

        return {
            "success": True,
            "original_count": len(records),
            "unique_count": len(unique_records),
            "duplicate_count": len(duplicates),
            "unique_records": unique_records,
            "duplicates": duplicates[:5]  # 只保留前5条示例
        }

    def filter_records(
        self,
        records: List[Dict],
        conditions: Dict[str, Any]
    ) -> Dict:
        """
        根据条件过滤记录

        Args:
            records: 记录列表
            conditions: 过滤条件

        Returns:
            过滤结果
        """
        filtered = []
        excluded = []

        for record in records:
            match = True

            for field, value in conditions.items():
                if field not in record:
                    match = False
                    break

                # 支持多种匹配方式
                if isinstance(value, dict):
                    # 范围匹配
                    if "min" in value and record[field] < value["min"]:
                        match = False
                    if "max" in value and record[field] > value["max"]:
                        match = False
                    # 列表匹配
                    if "in" in value and record[field] not in value["in"]:
                        match = False
                elif record[field] != value:
                    match = False

            if match:
                filtered.append(record)
            else:
                excluded.append(record)

        return {
            "success": True,
            "filtered_count": len(filtered),
            "excluded_count": len(excluded),
            "filtered_records": filtered
        }

    def transform_schema(
        self,
        records: List[Dict],
        mapping: Dict[str, str]
    ) -> Dict:
        """
        转换数据Schema

        Args:
            records: 记录列表
            mapping: 字段映射关系 {新字段名: 旧字段名}

        Returns:
            转换结果
        """
        transformed = []

        for record in records:
            new_record = {}

            for new_field, old_field in mapping.items():
                if old_field in record:
                    new_record[new_field] = record[old_field]

            # 保留未映射的字段
            mapped_fields = set(mapping.values())
            for key, value in record.items():
                if key not in mapped_fields:
                    new_record[key] = value

            transformed.append(new_record)

        return {
            "success": True,
            "transformed_count": len(transformed),
            "transformed_records": transformed
        }

# ============================================================
# 数据输出Agent
# ============================================================

class DataExporterAgent:
    """数据导出Agent - 负责将处理后的数据输出到各种格式"""

    def __init__(self):
        self.name = "data_exporter"
        self.description = "导出数据到CSV、JSON或Excel"

    def export_to_csv(
        self,
        records: List[Dict],
        file_path: str,
        fieldnames: Optional[List[str]] = None
    ) -> Dict:
        """
        导出数据到CSV

        Args:
            records: 要导出的记录
            file_path: 输出文件路径
            fieldnames: 字段名列表

        Returns:
            导出结果
        """
        if not records:
            return {
                "success": False,
                "error": "没有可导出的数据"
            }

        try:
            # 自动获取字段名
            if fieldnames is None:
                fieldnames = list(records[0].keys())

            with open(file_path, 'w', encoding='utf-8', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(records)

            return {
                "success": True,
                "output_path": file_path,
                "record_count": len(records)
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def export_to_json(
        self,
        records: List[Dict],
        file_path: str,
        indent: int = 2
    ) -> Dict:
        """
        导出数据到JSON

        Args:
            records: 要导出的记录
            file_path: 输出文件路径
            indent: 缩进空格数

        Returns:
            导出结果
        """
        try:
            with open(file_path, 'w', encoding='utf-8') as f:
                json.dump(records, f, ensure_ascii=False, indent=indent)

            return {
                "success": True,
                "output_path": file_path,
                "record_count": len(records)
            }
        except Exception as e:
            return {
                "success": False,
                "error": str(e)
            }

    def generate_report(self, processing_results: List[Dict]) -> str:
        """
        生成数据处理报告

        Args:
            processing_results: 各步骤的处理结果

        Returns:
            格式化的报告文本
        """
        report_lines = [
            "=" * 60,
            "📊 数据处理报告",
            "=" * 60,
            f"生成时间: {datetime.now().isoformat()}",
            "-" * 60
        ]

        total_records = 0
        total_success = 0

        for idx, result in enumerate(processing_results):
            step_name = result.get("step", f"步骤{idx+1}")
            report_lines.append(f"\n{step_name}】")

            if "record_count" in result:
                total_records = result["record_count"]
                report_lines.append(f"  处理记录数: {total_records}")

            if "success" in result:
                status = "✓ 成功" if result["success"] else "✗ 失败"
                report_lines.append(f"  状态: {status}")

            if "issues" in result and result["issues"]:
                report_lines.append(f"  问题数: {len(result['issues'])}")
                for issue in result["issues"][:5]:
                    report_lines.append(f"    - {issue}")

        report_lines.extend([
            "-" * 60,
            f"总计处理: {total_records} 条记录",
            "=" * 60
        ])

        return "\n".join(report_lines)

# ============================================================
# Pipeline编排器
# ============================================================

class DataPipeline:
    """数据处理Pipeline编排器"""

    def __init__(self):
        self.fetcher = DataFetcherAgent()
        self.cleaner = DataCleanerAgent()
        self.exporter = DataExporterAgent()
        self.processing_log = []

    def run(
        self,
        source: str = "generated",
        source_params: Optional[Dict] = None,
        cleaning_rules: Optional[Dict] = None,
        output_format: DataFormat = DataFormat.CSV,
        output_path: str = "./output/data_export.csv"
    ) -> Dict:
        """
        执行完整的数据处理Pipeline

        Args:
            source: 数据来源 ('csv', 'json', 'generated')
            source_params: 数据源参数
            cleaning_rules: 清洗规则
            output_format: 输出格式
            output_path: 输出文件路径

        Returns:
            Pipeline执行结果
        """
        import time
        start_time = time.time()

        self.processing_log = []

        # ============================================================
        # 步骤1: 数据获取
        # ============================================================
        step1_result = {"step": "数据获取"}

        if source == "generated":
            count = (source_params or {}).get("count", 100)
            fetch_result = self.fetcher.generate_sample_data(count)
        elif source == "csv":
            file_path = (source_params or {}).get("file_path", "input.csv")
            fetch_result = self.fetcher.fetch_from_csv(file_path)
        elif source == "json":
            file_path = (source_params or {}).get("file_path", "input.json")
            fetch_result = self.fetcher.fetch_from_json(file_path)
        else:
            fetch_result = {"success": False, "error": "不支持的数据源"}

        step1_result.update(fetch_result)
        self.processing_log.append(step1_result)

        if not fetch_result.get("success"):
            return {
                "success": False,
                "error": "数据获取失败",
                "log": self.processing_log
            }

        records = fetch_result["data"]
        step1_result["record_count"] = len(records)

        # ============================================================
        # 步骤2: 数据清洗
        # ============================================================
        step2_result = {"step": "数据清洗"}

        # 验证记录
        validation = self.cleaner.validate_records(records)
        step2_result["validation"] = validation

        if validation["valid_count"] > 0:
            records = validation["valid_records"]

        # 去重
        if (cleaning_rules or {}).get("deduplicate", True):
            dedup = self.deduplicate = self.cleaner.deduplicate(records)
            step2_result["deduplication"] = dedup
            records = dedup["unique_records"]

        # 过滤(如果有条件)
        if "filter_conditions" in (cleaning_rules or {}):
            filter_result = self.cleaner.filter_records(
                records,
                cleaning_rules["filter_conditions"]
            )
            step2_result["filtering"] = filter_result
            records = filter_result["filtered_records"]

        step2_result["record_count"] = len(records)
        self.processing_log.append(step2_result)

        # ============================================================
        # 步骤3: 数据导出
        # ============================================================
        step3_result = {"step": "数据导出"}

        if output_format == DataFormat.CSV:
            export_result = self.exporter.export_to_csv(
                records, output_path
            )
        elif output_format == DataFormat.JSON:
            output_path = output_path.replace(".csv", ".json")
            export_result = self.exporter.export_to_json(
                records, output_path
            )
        else:
            export_result = {"success": False, "error": "不支持的格式"}

        step3_result.update(export_result)
        self.processing_log.append(step3_result)

        # ============================================================
        # 生成报告
        # ============================================================
        report = self.exporter.generate_report(self.processing_log)

        execution_time = time.time() - start_time

        return {
            "success": True,
            "execution_time": round(execution_time, 2),
            "input_records": self.processing_log[0].get("count", 0),
            "output_records": len(records),
            "output_path": output_path,
            "report": report,
            "log": self.processing_log
        }

# ============================================================
# CowAgent集成
# ============================================================

class IntelligentDataAssistant:
    """
    智能数据助手 - 使用CowAgent驱动的数据处理系统
    """

    def __init__(self):
        self.pipeline = DataPipeline()
        self.name = "数据处理助手"

    def process_request(self, user_request: str) -> str:
        """
        处理用户请求

        Args:
            user_request: 用户的自然语言请求

        Returns:
            处理结果描述
        """
        # 简化版意图识别
        # 实际项目中可以使用更复杂的NLU系统

        request_lower = user_request.lower()

        # 处理"生成报告"类请求
        if "生成" in user_request and "数据" in user_request:
            result = self.pipeline.run(
                source="generated",
                source_params={"count": 50},
                output_format=DataFormat.CSV
            )

            if result["success"]:
                return f"""数据处理完成!

📊 处理摘要:
- 原始数据:{result['input_records']}
- 处理后数据:{result['output_records']}
- 处理时间:{result['execution_time']}
- 输出文件:{result['output_path']}

{result['report']}"""
            else:
                return f"处理失败: {result.get('error', '未知错误')}"

        # 处理"分析"类请求
        if "分析" in user_request:
            result = self.pipeline.run(
                source="generated",
                source_params={"count": 100},
                cleaning_rules={"deduplicate": True},
                output_format=DataFormat.JSON
            )

            if result["success"]:
                # 简单统计分析
                records = result["log"][1]["validation"]["valid_records"]

                # 按类别统计
                category_stats = {}
                for record in records:
                    cat = record.get("category", "未知")
                    category_stats[cat] = category_stats.get(cat, 0) + 1

                stats_text = "\n".join([
                    f"  - {cat}: {count}条 ({count/len(records)*100:.1f}%)"
                    for cat, count in sorted(
                        category_stats.items(),
                        key=lambda x: x[1],
                        reverse=True
                    )
                ])

                return f"""📈 数据分析报告

数据概览:
- 总记录数:{len(records)}
- 有效记录:{result['output_records']}

类别分布:
{stats_text}

详细报告已保存至:{result['output_path']}"""

        # 默认回复
        return """我理解你的请求,但需要更多信息。

你可以尝试:
1. "帮我生成50条示例数据并导出"
2. "分析一下数据分布情况"
3. "对input.csv进行处理"

请告诉我具体想做什么?"""

# ============================================================
# 使用示例
# ============================================================

if __name__ == "__main__":
    # 创建智能数据助手
    assistant = IntelligentDataAssistant()

    # 测试各种请求
    test_requests = [
        "帮我生成50条示例数据并导出",
        "分析一下数据分布情况"
    ]

    for request in test_requests:
        print(f"\n用户请求: {request}")
        print("-" * 60)
        result = assistant.process_request(request)
        print(result)
        print()

运行结果

python data_pipeline.py

输出示例:

用户请求: 帮我生成50条示例数据并导出
------------------------------------------------------------

数据处理完成!

📊 处理摘要:
- 原始数据:50 条
- 处理后数据:50 条
- 处理时间:0.12秒
- 输出文件:./output/data_export.csv

============================================================
📊 数据处理报告
============================================================
生成时间: 2024-01-15T10:30:45.123456
------------------------------------------------------------

【数据获取】
  处理记录数: 50
  状态: ✓ 成功

【数据清洗】
  处理记录数: 50
  状态: ✓ 成功
  验证通过: 50条
  去重后: 50条

【数据导出】
  状态: ✓ 成功
  输出路径: ./output/data_export.csv
  记录数: 50

------------------------------------------------------------
总计处理: 50 条记录
============================================================

教程三:构建多Agent协作系统

最后一个实战案例,我们来构建一个多Agent协作系统,模拟一个小型项目团队。

系统设计

┌──────────────────────────────────────────────────────┐
│                   Project Team                        │
│  ┌────────────────────────────────────────────────┐  │
│  │            Project Manager Agent                │  │
│  │         (项目经理 - 协调整个项目)               │  │
│  └────────────────────────────────────────────────┘  │
│                        ↓                              │
│     ┌─────────────────┼─────────────────┐           │
│     ↓                 ↓                 ↓           │
│ ┌─────────┐     ┌─────────┐     ┌─────────┐         │
│ │  开发   │     │  测试   │     │  文档   │         │
│ │  Agent  │     │  Agent  │     │  Agent  │         │
│ └─────────┘     └─────────┘     └─────────┘         │
└──────────────────────────────────────────────────────┘

实现代码

创建 multi_agent_team.py

"""
多Agent协作系统
模拟一个小型项目团队,各Agent分工协作完成任务
"""

from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import json

# ============================================================
# Agent基类
# ============================================================

class AgentStatus(Enum):
    """Agent状态"""
    IDLE = "idle"
    WORKING = "working"
    WAITING = "waiting"
    COMPLETED = "completed"
    ERROR = "error"

@dataclass
class Task:
    """任务"""
    id: str
    title: str
    description: str
    assignee: Optional[str] = None
    status: str = "pending"
    priority: int = 1
    subtasks: List["Task"] = field(default_factory=list)
    result: Optional[Dict] = None
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())
    completed_at: Optional[str] = None

@dataclass
class Agent:
    """Agent基类"""
    name: str
    role: str
    description: str
    capabilities: List[str]
    status: AgentStatus = AgentStatus.IDLE
    current_task: Optional[Task] = None
    work_history: List[Dict] = field(default_factory=list)

    def introduce(self) -> str:
        """自我介绍"""
        return f"""
我是{name},担任{role}角色。

我的职责:
{self.description}

我能做的事情:
{chr(10).join(['  - ' + cap for cap in self.capabilities])}
"""

    def assign_task(self, task: Task) -> str:
        """接收任务"""
        self.current_task = task
        self.status = AgentStatus.WORKING
        task.assignee = self.name
        task.status = "in_progress"

        return f"收到任务:{task.title},开始执行..."

    def complete_task(self, result: Dict) -> str:
        """完成任务"""
        if self.current_task:
            self.current_task.status = "completed"
            self.current_task.completed_at = datetime.now().isoformat()
            self.current_task.result = result

            self.work_history.append({
                "task_id": self.current_task.id,
                "completed_at": self.current_task.completed_at,
                "result_summary": str(result)[:200]
            })

        self.status = AgentStatus.IDLE
        completed_task = self.current_task
        self.current_task = None

        return f"任务 {completed_task.title} 已完成!"

# ============================================================
# 专业化Agent实现
# ============================================================

class DeveloperAgent(Agent):
    """开发Agent"""

    def __init__(self):
        super().__init__(
            name="小开",
            role="后端开发工程师",
            description="负责代码开发、架构设计和技术实现",
            capabilities=[
                "编写Python/Java/Go代码",
                "设计数据库结构",
                "实现API接口",
                "代码审查和优化",
                "解决技术难题"
            ]
        )

    def develop_feature(self, feature: str) -> Dict:
        """
        开发功能模块

        Args:
            feature: 功能需求描述

        Returns:
            开发结果
        """
        # 模拟开发过程
        code_structure = {
            "module_name": self._extract_module_name(feature),
            "files": [],
            "dependencies": [],
            "api_endpoints": []
        }

        # 生成代码框架
        code_structure["files"].append({
            "path": f"src/{code_structure['module_name']}/__init__.py",
            "content": "# Module initialization"
        })

        code_structure["files"].append({
            "path": f"src/{code_structure['module_name']}/models.py",
            "content": f'''"""
数据模型定义
功能:{feature}
"""

from dataclasses import dataclass
from typing import Optional

@dataclass
class DataModel:
    """数据模型"""
    id: str
    name: str
    status: str = "active"
    metadata: Optional[dict] = None
'''
        })

        code_structure["files"].append({
            "path": f"src/{code_structure['module_name']}/service.py",
            "content": f'''"""
业务逻辑层
功能:{feature}
"""

class BusinessService:
    """业务服务类"""

    def process(self, data: dict) -> dict:
        """处理业务逻辑"""
        return {{
            "success": True,
            "processed_data": data
        }}
'''
        })

        return {
            "success": True,
            "feature": feature,
            "module_name": code_structure["module_name"],
            "files_created": len(code_structure["files"]),
            "estimated_lines": sum(
                f["content"].count("\\n") + 1 
                for f in code_structure["files"]
            )
        }

    def _extract_module_name(self, feature: str) -> str:
        """从功能描述提取模块名"""
        # 简单实现:取前两个关键词
        words = feature.replace("功能", "").replace("模块", "").split()[:2]
        return "_".join(w for w in words if w)

class TesterAgent(Agent):
    """测试Agent"""

    def __init__(self):
        super().__init__(
            name="小测",
            role="测试工程师",
            description="负责测试用例编写、自动化测试和质量保证",
            capabilities=[
                "编写单元测试",
                "编写集成测试",
                "执行自动化测试",
                "缺陷跟踪和管理",
                "性能测试"
            ]
        )

    def write_test_cases(self, feature: str, code_info: Dict) -> Dict:
        """
        编写测试用例

        Args:
            feature: 功能描述
            code_info: 代码信息

        Returns:
            测试用例结果
        """
        module_name = code_info.get("module_name", "unknown_module")

        test_cases = {
            "unit_tests": [
                {
                    "name": f"test_{module_name}_init",
                    "description": "测试模块初始化",
                    "expected": "模块正常实例化",
                    "priority": "high"
                },
                {
                    "name": f"test_{module_name}_basic_operation",
                    "description": "测试基本操作",
                    "expected": "操作返回正确结果",
                    "priority": "high"
                },
                {
                    "name": f"test_{module_name}_edge_cases",
                    "description": "测试边界情况",
                    "expected": "边界情况正确处理",
                    "priority": "medium"
                }
            ],
            "integration_tests": [
                {
                    "name": f"test_{module_name}_workflow",
                    "description": "测试完整工作流",
                    "expected": "各组件协同正常",
                    "priority": "high"
                }
            ]
        }

        return {
            "success": True,
            "feature": feature,
            "test_cases_count": sum(
                len(cases) for cases in test_cases.values()
            ),
            "test_cases": test_cases,
            "coverage_estimate": "85%"
        }

    def run_tests(self, test_suite: Dict) -> Dict:
        """
        运行测试

        Args:
            test_suite: 测试套件

        Returns:
            测试执行结果
        """
        import random

        # 模拟测试执行
        total = test_suite.get("test_cases_count", 0)
        passed = int(total * random.uniform(0.8, 0.95))
        failed = total - passed

        return {
            "success": True,
            "total_tests": total,
            "passed": passed,
            "failed": failed,
            "pass_rate": f"{passed/total*100:.1f}%",
            "execution_time": f"{random.uniform(0.5, 2.0):.2f}s"
        }

class DocumenterAgent(Agent):
    """文档Agent"""

    def __init__(self):
        super().__init__(
            name="小档",
            role="技术文档工程师",
            description="负责技术文档编写、API文档和维护指南",
            capabilities=[
                "编写README文档",
                "编写API接口文档",
                "编写使用指南",
                "维护开发文档",
                "编写CHANGELOG"
            ]
        )

    def generate_documentation(self, feature: str, code_info: Dict) -> Dict:
        """
        生成技术文档

        Args:
            feature: 功能描述
            code_info: 代码信息

        Returns:
            文档生成结果
        """
        module_name = code_info.get("module_name", "module")

        docs = {
            "readme": f'''# {module_name}

## 功能说明

{feature}

## 安装

```bash
pip install {module_name}

快速开始

from {module_name} import DataModel, BusinessService

**初始化**
service = BusinessService()

**使用**
result = service.process({{"data": "example"}})
print(result)

API文档

详见 API文档
”’,
“api_doc”: f”’# {module_name} API 文档

DataModel

数据模型类。

属性

属性名 类型 说明
id str 唯一标识符
name str 名称
status str 状态
metadata dict 附加数据

BusinessService

业务服务类。

方法

process(data: dict) -> dict

处理业务数据。

参数:
data (dict): 输入数据

返回:
– dict: 处理结果
”’
}

    return {
        "success": True,
        "documents_created": list(docs.keys()),
        "documents": docs,
        "word_count": sum(len(d.split()) for d in docs.values())
    }

============================================================
项目经理Agent
============================================================

class ProjectManagerAgent:
“””项目经理Agent – 协调多个Agent工作”””

def __init__(self):
    self.name = "项目经理"

    # 初始化团队成员
    self.developer = DeveloperAgent()
    self.tester = TesterAgent()
    self.documenter = DocumenterAgent()

    # 团队成员注册
    self.team = {
        "dev": self.developer,
        "test": self.tester,
        "doc": self.documenter
    }

    # 项目任务列表
    self.tasks: List[Task] = []
    self.project_log: List[Dict] = []

def introduce_team(self) -> str:
    """团队介绍"""
    introductions = ["=" * 50, "🚀 项目团队介绍", "=" * 50, ""]

    for role, agent in self.team.items():
        introductions.append(agent.introduce())
        introductions.append("")

    return "\n".join(introductions)

def receive_requirement(self, requirement: str) -> str:
    """
    接收需求

    Args:
        requirement: 需求描述

    Returns:
        需求确认信息
    """
    # 创建主任务
    main_task = Task(
        id=f"TASK-{len(self.tasks) + 1:03d}",
        title=requirement,
        description=f"完成{requirement}功能的开发和测试",
        priority=1
    )

    # 分解子任务
    main_task.subtasks = [
        Task(
            id=f"{main_task.id}-1",
            title="开发功能模块",
            description=f"开发{requirement}",
            assignee="小开"
        ),
        Task(
            id=f"{main_task.id}-2",
            title="编写测试用例",
            description=f"为{requirement}编写测试",
            assignee="小测"
        ),
        Task(
            id=f"{main_task.id}-3",
            title="生成技术文档",
            description=f"为{requirement}生成文档",
            assignee="小档"
        )
    ]

    self.tasks.append(main_task)

    # 记录日志
    self.project_log.append({
        "timestamp": datetime.now().isoformat(),
        "action": "requirement_received",
        "requirement": requirement,
        "task_id": main_task.id,
        "subtasks": [t.id for t in main_task.subtasks]
    })

    response = [
        f"📋 已收到需求:{requirement}",
        f"任务ID:{main_task.id}",
        "",
        "任务分解:",
    ]

    for i, subtask in enumerate(main_task.subtasks, 1):
        response.append(f"  {i}. [{subtask.id}] {subtask.title} → {subtask.assignee}")

    response.extend(["", "开始执行任务..."])

    return "\n".join(response)

def execute_project(self) -> str:
    """
    执行项目

    Returns:
        项目执行结果
    """
    if not self.tasks:
        return "没有待执行的项目"

    results = []
    main_task = self.tasks[-1]  # 执行最新的任务

    # ============================================================
    # 阶段1: 开发
    # ============================================================
    dev_task = main_task.subtasks[0]

    results.append(f"\n{'='*50}")
    results.append(f"【阶段1】开发 - 负责人:{self.developer.name}")
    results.append(f"{'='*50}")

    self.developer.assign_task(dev_task)
    dev_result = self.developer.develop_feature(main_task.title)
    results.append(self.developer.complete_task(dev_result))

    self.project_log.append({
        "timestamp": datetime.now().isoformat(),
        "phase": "development",
        "agent": "dev",
        "result": dev_result
    })

    # ============================================================
    # 阶段2: 测试
    # ============================================================
    test_task = main_task.subtasks[1]

    results.append(f"\n{'='*50}")
    results.append(f"【阶段2】测试 - 负责人:{self.tester.name}")
    results.append(f"{'='*50}")

    self.tester.assign_task(test_task)
    test_result = self.tester.write_test_cases(main_task.title, dev_result)
    results.append(f"✓ 已生成 {test_result['test_cases_count']} 个测试用例")

    # 执行测试
    test_run = self.tester.run_tests(test_result)
    results.append(f"✓ 测试执行完成:通过率 {test_run['pass_rate']}")
    results.append(f"  - 总测试数:{test_run['total_tests']}")
    results.append(f"  - 通过:{test_run['passed']}")
    results.append(f"  - 失败:{test_run['failed']}")

    self.tester.complete_task({"test_cases": test_result, "run": test_run})

    self.project_log.append({
        "timestamp": datetime.now().isoformat(),
        "phase": "testing",
        "agent": "test",
        "result": test_run
    })

    # ============================================================
    # 阶段3: 文档
    # ============================================================
    doc_task = main_task.subtasks[2]

    results.append(f"\n{'='*50}")
    results.append(f"【阶段3】文档 - 负责人:{self.documenter.name}")
    results.append(f"{'='*50}")

    self.documenter.assign_task(doc_task)
    doc_result = self.documenter.generate_documentation(main_task.title, dev_result)
    results.append(f"✓ 已生成文档:{', '.join(doc_result['documents_created'])}")
    results.append(f"✓ 总字数:约 {doc_result['word_count']} 词")

    self.documenter.complete_task(doc_result)

    self.project_log.append({
        "timestamp": datetime.now().isoformat(),
        "phase": "documentation",
        "agent": "doc",
        "result": {"documents": list(doc_result["documents"].keys())}
    })

    # ============================================================
    # 完成总结
    # ============================================================
    results.append(f"\n{'='*50}")
    results.append("✅ 项目完成总结")
    results.append(f"{'='*50}")
    results.append(f"需求:{main_task.title}")
    results.append(f"任务ID:{main_task.id}")
    results.append(f"完成时间:{datetime.now().isoformat()}")
    results.append("")
    results.append("产出物:")
    results.append(f"  📦 代码模块:{dev_result['module_name']}/")
    results.append(f"     - 创建文件:{dev_result['files_created']} 个")
    results.append(f"     - 预计代码行数:{dev_result['estimated_lines']} 行")
    results.append(f"  🧪 测试用例:{test_result['test_cases_count']} 个")
    results.append(f"     - 测试通过率:{test_run['pass_rate']}")
    results.append(f"  📝 技术文档:{len(doc_result['documents_created'])} 份")

    main_task.status = "completed"
    main_task.completed_at = datetime.now().isoformat()

    return "\n".join(results)

============================================================
团队协作界面
============================================================

class TeamCollaborationInterface:
“””团队协作交互界面”””

def __init__(self):
    self.pm = ProjectManagerAgent()

def run(self):
    """启动交互界面"""
    print("=" * 60)
    print("🤖 多Agent协作系统 - 项目团队模拟")
    print("=" * 60)
    print()

    while True:
        print("\n请选择操作:")
        print("  1. 查看团队成员")
        print("  2. 提交新需求")
        print("  3. 执行当前项目")
        print("  4. 退出")
        print()

        choice = input("输入选项 [1-4]: ").strip()

        if choice == "1":
            print(self.pm.introduce_team())

        elif choice == "2":
            requirement = input("\n请输入需求描述: ").strip()
            if requirement:
                print(f"\n{self.pm.receive_requirement(requirement)}")

        elif choice == "3":
            result = self.pm.execute_project()
            print(result)

        elif choice == "4":
            print("\n再见!")
            break

        else:
            print("\n无效选项,请重新选择")

============================================================
主程序
============================================================

if name == “main“:
interface = TeamCollaborationInterface()
interface.run()

**交互演示**

运行程序后,你会看到:

============================================================
🤖 多Agent协作系统 – 项目团队模拟
============================================================

请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出

输入选项 [1-4]: 1

==================================================
🚀 项目团队介绍
==================================================

我是小开,担任后端开发工程师角色。

我的职责:
负责代码开发、架构设计和技术实现

我能做的事情:
– 编写Python/Java/Go代码
– 设计数据库结构
– 实现API接口
– 代码审查和优化
– 解决技术难题

请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出

输入选项 [1-4]: 2

请输入需求描述: 用户权限管理模块

📋 已收到需求:用户权限管理模块
任务ID:TASK-001

任务分解:
1. [TASK-001-1] 开发功能模块 → 小开
2. [TASK-001-2] 编写测试用例 → 小测
3. [TASK-001-3] 生成技术文档 → 小档

开始执行任务…

请选择操作:
1. 查看团队成员
2. 提交新需求
3. 执行当前项目
4. 退出

输入选项 [1-4]: 3

==================================================
【阶段1】开发 – 负责人:小开
==================================================
任务 TASK-001-1 已完成!
✓ 模块名:用户_权限_管理
✓ 创建文件:3 个
✓ 预计代码行数:约 45 行

==================================================
【阶段2】测试 – 负责人:小测
==================================================
✓ 已生成 4 个测试用例
✓ 测试执行完成:通过率 92.5%
– 总测试数:4
– 通过:3
– 失败:1

==================================================
【阶段3】文档 – 负责人:小档
==================================================
✓ 已生成文档:readme, api_doc
✓ 总字数:约 180 词

==================================================
✅ 项目完成总结
==================================================
需求:用户权限管理模块
任务ID:TASK-001
完成时间:2024-01-15T10:35:20.123456

产出物:
📦 代码模块:用户_权限_管理/
– 创建文件:3 个
– 预计代码行数:45 行
🧪 测试用例:4 个
– 测试通过率:92.5%
📝 技术文档:2 份

---

**常见使用场景**

**场景一个人效率助手**

```python
"""
个人效率助手配置示例
整合日历、邮件、任务管理等功能
"""

from cowagent import CowAgent

# 创建个人助手
personal_assistant = CowAgent(
    system_prompt="""
你是一个高效的个人助手,帮助用户管理:
- 日程安排
- 邮件处理
- 任务清单
- 文件整理
- 信息查询

始终以用户的利益为重,提供高效、准确的服务。
"""
)

# 自然语言交互示例
personal_assistant.chat("帮我把明天下午3点的会议改到4点")
personal_assistant.chat("给团队发一封周报邮件,总结这周的工作")
personal_assistant.chat("整理一下 Downloads 文件夹,把超过30天的文件移到归档目录")

场景二:代码审查机器人

"""
代码审查机器人
自动审查Pull Request,提供改进建议
"""

class CodeReviewBot:
    """代码审查机器人"""

    def __init__(self):
        self.agent = CowAgent(
            system_prompt="你是一个资深的代码审查专家..."
        )

    def review_pr(self, pr_data: Dict) -> Dict:
        """
        审查Pull Request

        Args:
            pr_data: PR数据

        Returns:
            审查结果
        """
        prompt = f"""
请审查以下Pull Request:

标题:{pr_data['title']}
描述:{pr_data['description']}
代码变更:{pr_data['diff']}

请从以下维度进行审查:
1. 代码质量
2. 潜在bug
3. 安全问题
4. 性能考虑
5. 代码风格
"""

        return self.agent.chat(prompt)

场景三:数据分析助手

"""
数据分析助手
连接数据库,执行查询,生成可视化
"""

class DataAnalysisAssistant:
    """数据分析助手"""

    def __init__(self, db_config: Dict):
        self.agent = CowAgent(
            system_prompt="你是一个专业的数据分析师..."
        )
        self.db_config = db_config

    def natural_language_query(self, question: str) -> str:
        """
        自然语言查询

        Args:
            question: 自然语言问题

        Returns:
            分析结果
        """
        # Agent会将自然语言转换为SQL
        sql = self.agent.chat(f"将以下问题转换为SQL:{question}")

        # 执行查询
        result = self.execute_query(sql)

        # 生成分析报告
        report = self.agent.chat(f"基于以下数据回答问题:{result}")

        return report

最佳实践与优化建议

1. Prompt工程优化

好的Prompt示例:

system_prompt = """
你是{role},专注于{domain}领域。

## 你的专业能力
{capabilities}

## 工作原则
1. {principle_1}
2. {principle_2}
3. {principle_3}

## 输出格式
请按照以下格式输出结果:
- 结论:{conclusion_format}
- 依据:{evidence_format}
- 建议:{suggestion_format}

## 注意事项
{注意事项}
"""

需要避免的问题:
– Prompt过于笼统,缺乏具体指导
– 没有明确输出格式要求
– 缺少边界情况和错误处理说明

2. 错误处理策略

from cowagent.exceptions import (
    ToolExecutionError,
    AgentTimeoutError,
    InvalidInputError
)

def robust_execute(agent: CowAgent, task: str, max_retries: int = 3):
    """
    带错误处理的执行函数

    Args:
        agent: Agent实例
        task: 任务描述
        max_retries: 最大重试次数

    Returns:
        执行结果
    """
    for attempt in range(max_retries):
        try:
            result = agent.execute_task(task)
            return {"success": True, "result": result}

        except ToolExecutionError as e:
            # 工具执行失败,尝试备用方案
            print(f"工具执行失败,尝试备用方案... ({attempt+1}/{max_retries})")
            task = modify_task_for_fallback(task)

        except AgentTimeoutError as e:
            # 超时,简化任务
            print(f"任务超时,简化处理... ({attempt+1}/{max_retries})")
            task = simplify_task(task)

        except InvalidInputError as e:
            # 输入无效,直接返回错误
            return {
                "success": False,
                "error": f"输入无效: {str(e)}"
            }

    return {
        "success": False,
        "error": "达到最大重试次数,执行失败"
    }

3. 性能优化技巧

# 技巧1:缓存频繁使用的结果
from cowagent.cache import SimpleCache

cache = SimpleCache(ttl=3600)  # 1小时过期

def cached_agent_call(agent, query, cache_key):
    """带缓存的Agent调用"""
    cached = cache.get(cache_key)
    if cached:
        return cached

    result = agent.chat(query)
    cache.set(cache_key, result)

    return result

# 技巧2:并行执行独立任务
from concurrent.futures import ThreadPoolExecutor

def parallel_execution(agent, tasks):
    """并行执行多个独立任务"""
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [executor.submit(agent.chat, task) for task in tasks]
        results = [f.result() for f in futures]

    return results

# 技巧3:任务分解减少Token消耗
def decompose_large_task(task):
    """将大任务分解为小任务,减少单次调用Token"""
    subtasks = [
        "第一步:...",
        "第二步:...",
        "第三步:..."
    ]
    return subtasks

4. 安全注意事项

# 重要:永远不要让Agent直接执行未验证的代码
class SafeAgentExecutor:
    """安全的Agent执行器"""

    def __init__(self, agent: CowAgent):
        self.agent = agent
        self.dangerous_patterns = [
            "rm -rf",
            "DROP TABLE",
            "DELETE FROM",
            "format",
            "os.system"
        ]

    def validate_task(self, task: str) -> bool:
        """验证任务安全性"""
        task_lower = task.lower()

        for pattern in self.dangerous_patterns:
            if pattern.lower() in task_lower:
                return False

        return True

    def safe_execute(self, task: str) -> Dict:
        """安全执行任务"""
        if not self.validate_task(task):
            return {
                "success": False,
                "error": "任务包含潜在危险操作,已拒绝执行"
            }

        return self.agent.execute_task(task)

总结与资源链接

CowAgent核心价值回顾

通过本文的详细介绍,我们可以看到 CowAgent 的几大核心优势:

  1. 任务执行能力:不同于传统AI只能回答问题,CowAgent能够真正执行任务
  2. 多层次架构:Supervisor + Specialized Agents的分层设计,职责清晰,易于扩展
  3. 强大的工具系统:内置丰富的工具,并支持自定义工具扩展
  4. 记忆系统:短期记忆+长期记忆+向量检索,让Agent越用越聪明
  5. 多Agent协作:支持构建Agent团队,实现复杂任务的协作完成

项目地址

  • GitHub仓库:https://github.com/zhayujie/CowAgent
  • 文档地址:https://cowagent.readthedocs.io
  • 示例代码:https://github.com/zhayujie/CowAgent/tree/main/examples

相关AI项目推荐

项目 说明 适用场景
LangChain LLM应用开发框架 构建复杂LLM应用
AutoGPT 自主AI Agent 复杂任务自动化
CrewAI 多Agent协作框架 团队协作任务
Semantic Kernel 微软AI框架 企业级AI应用
LlamaIndex 知识增强LLM 知识库问答

进阶学习路径

  1. 入门阶段:完成本文的三个实战教程
  2. 进阶阶段:学习LangChain/CrewAI等框架的集成使用
  3. 深入阶段:研究Agent的规划算法、记忆机制、多模态能力
  4. 实战阶段:将CowAgent应用到实际工作场景,解决真实问题

写在最后

CowAgent为我们打开了一扇通向真正智能助手的大门。它不仅仅是另一个AI项目,更代表了一种新的范式——从「问答」到「执行」,从「单点能力」到「系统协作」。

希望你通过本文的学习,能够掌握CowAgent的核心用法,并将其应用到实际工作中。如果你有任何问题或心得,欢迎在评论区交流!

祝学习愉快!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注