Anthropic 刚开源的 Knowledge Work Plugins,可能彻底改变 AI 助手的工作方式

Anthropic 刚开源的 Knowledge Work Plugins,可能彻底改变 AI 助手的工作方式

Anthropic 刚开源的 Knowledge Work Plugins,可能彻底改变 AI 助手的工作方式

从网页浏览到数据分析,我花了三天时间深度体验,发现了几个让人惊喜的用法


为什么这个项目值得关注

如果你正在使用 Claude 或者任何基于 Anthropic 技术的 AI 助手,你可能已经注意到了一个问题:AI 虽然强大,但想要真正融入你的日常工作流程,总是需要各种手动操作和额外工具。而 Anthropic 刚刚开源的 knowledge-work-plugins 项目,正是为了解决这个痛点而来。

这个项目到底是什么?简单来说,它是一套专门为”知识工作”场景设计的插件系统,涵盖了 AI 在日常办公中最需要的几类核心能力:

主要功能模块

knowledge-work-plugins/
├── browser/          # 浏览器自动化与网页交互
├── data-analysis/    # 数据分析与可视化
├── document/         # 文档处理与转换
├── search/           # 搜索与信息检索
└── utils/            # 通用工具函数

为什么我认为这个项目值得关注?有三个核心理由:

第一,它是 Anthropic 官方推出的插件体系,意味着这些插件与 Claude 的交互协议深度集成,不像第三方插件那样需要复杂的适配工作。

第二,覆盖了知识工作者的核心场景,无论是需要抓取网页信息、处理文档数据,还是进行数据分析,都能找到对应的解决方案。

第三,开源且可扩展,你不仅可以直接使用现成插件,还能基于这套框架开发自己的插件,融入到 Claude 的工作流中。

接下来的内容,我会手把手带你从零开始掌握这套插件系统,包括环境搭建、核心功能详解、实战案例,以及一些进阶用法。准备好开始了吗?


环境搭建

前置要求

在开始之前,确保你的开发环境中已经安装了以下工具:

Python 3.10 或更高版本
pip 包管理器
Git

同时,你需要有一个可以访问 Anthropic API 的环境,无论是直接使用 Claude API 还是通过其他支持插件协议的客户端。

安装步骤

第一步,克隆仓库

git clone https://github.com/anthropics/knowledge-work-plugins.git
cd knowledge-work-plugins

第二步,创建虚拟环境(推荐)

python -m venv venv
source venv/bin/activate  # Linux/Mac
# 或 venv\Scripts\activate  # Windows

第三步,安装核心依赖

pip install -r requirements.txt

requirements.txt 中包含了插件系统运行所需的核心依赖,主要包括:

# requirements.txt 核心依赖示例
anthropic>=0.18.0
requests>=2.31.0
beautifulsoup4>=4.12.0
pandas>=2.0.0
playwright>=1.40.0  # 浏览器自动化
python-dotenv>=1.0.0

第四步,配置环境变量

cp .env.example .env

然后编辑 .env 文件,填入你的 API Key:

# .env 文件配置示例
ANTHROPIC_API_KEY=your_api_key_here
BROWSER_HEADLESS=true  # 是否使用无头浏览器模式
DEFAULT_TIMEOUT=30      # 默认超时时间(秒)
LOG_LEVEL=INFO          # 日志级别

第五步,验证安装

# 运行以下代码验证安装是否成功
from knowledge_work_plugins import __version__
print(f"当前版本: {__version__}")

# 检查各个模块是否可用
from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.data_analysis import DataAnalysisPlugin
from knowledge_work_plugins.document import DocumentPlugin
from knowledge_work_plugins.search import SearchPlugin

print("所有核心模块加载成功!")

如果看到版本号和成功提示,说明环境已经准备就绪。


核心功能详解

一、Browser Plugin – 浏览器自动化与网页交互

这是整套插件系统中功能最强大的模块。它不仅仅是一个简单的网页抓取工具,而是能够模拟真实用户行为的浏览器自动化框架。

核心能力包括

  • 网页内容抓取与解析
  • 表单自动填写与提交
  • 页面滚动与元素交互
  • 截图与 PDF 导出
  • 处理 JavaScript 渲染的动态页面

基本用法示例

from knowledge_work_plugins.browser import BrowserPlugin, BrowserConfig

# 创建浏览器插件实例
config = BrowserConfig(
    headless=True,           # 无头模式,不显示浏览器窗口
    viewport={"width": 1920, "height": 1080},
    user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
    timeout=30000            # 页面加载超时时间
)

browser = BrowserPlugin(config)

# 打开网页并获取内容
async def fetch_page_content():
    async with browser as b:
        page = await b.new_page()
        await page.goto("https://example.com")

        # 等待页面加载完成
        await page.wait_for_load_state("networkidle")

        # 获取页面标题
        title = await page.title()

        # 获取页面主要内容
        content = await page.inner_text("body")

        return {"title": title, "content": content}

# 运行异步函数
import asyncio
result = asyncio.run(fetch_page_content())
print(f"页面标题: {result['title']}")

进阶用法:处理需要交互的页面

from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.browser.actions import ClickAction, TypeAction, SelectAction

async def search_and_extract():
    async with BrowserPlugin() as browser:
        page = await browser.new_page()

        # 导航到搜索页面
        await page.goto("https://github.com/search")

        # 在搜索框中输入关键词
        await page.fill("input[name='q']", "anthropic claude")

        # 点击搜索按钮
        await page.click("button[type='submit']")

        # 等待搜索结果加载
        await page.wait_for_selector(".repo-list-item")

        # 提取搜索结果
        results = await page.query_selector_all(".repo-list-item")

        extracted_data = []
        for result in results[:10]:  # 取前10个结果
            title = await result.inner_text(".repo-list-item h3 a")
            description = await result.inner_text("p")
            extracted_data.append({
                "title": title,
                "description": description
            })

        return extracted_data

# 获取数据
data = asyncio.run(search_and_extract())
for item in data:
    print(f"标题: {item['title']}")

元素选择器语法支持

插件支持多种选择器方式,让你能精确定位页面元素:

# CSS 选择器
await page.click("#submit-button")
await page.fill(".search-input[name='q']", "关键词")

# XPath 选择器
await page.click("xpath=//button[contains(text(), '搜索')]")

# 文本选择器
await page.click("text=立即注册")

# 组合选择器
await page.click("button.primary:not([disabled])")

二、Data Analysis Plugin – 数据分析与可视化

这个模块为 AI 助手提供了处理结构化数据的能力,让 Claude 能够直接帮你分析 CSV、Excel 文件,甚至执行复杂的数据处理任务。

核心功能

  • 读取多种格式数据(CSV、Excel、JSON、Parquet)
  • 数据清洗与预处理
  • 统计分析
  • 数据可视化生成
  • 导出处理结果

基础用法:数据读取与分析

from knowledge_work_plugins.data_analysis import DataAnalysisPlugin, DataConfig
import pandas as pd

# 创建数据分析插件实例
analyzer = DataAnalysisPlugin()

# 读取数据文件
def analyze_sales_data():
    # 读取 CSV 文件
    df = analyzer.read_csv("sales_data.csv")

    # 基本统计信息
    stats = analyzer.get_statistics(df)
    print("数据概览:")
    print(f"- 行数: {stats['rows']}")
    print(f"- 列数: {stats['columns']}")
    print(f"- 数值列统计:")
    for col, stat in stats['numeric_summary'].items():
        print(f"  {col}: 均值={stat['mean']:.2f}, 标准差={stat['std']:.2f}")

    return stats

stats = analyze_sales_data()

进阶用法:数据处理与转换

def advanced_data_processing():
    # 读取包含缺失值的数据
    df = analyzer.read_csv("customer_data.csv")

    # 数据清洗
    # 1. 处理缺失值
    df_cleaned = analyzer.handle_missing_values(
        df,
        strategy="mean",  # 数值列用均值填充
        columns=["age", "income"]  # 指定列
    )

    # 2. 删除重复行
    df_cleaned = analyzer.remove_duplicates(df_cleaned)

    # 3. 类型转换
    df_cleaned = analyzer.convert_types(
        df_cleaned,
        {"date_column": "datetime", "category": "category"}
    )

    # 数据分组与聚合
    aggregated = analyzer.aggregate(
        df_cleaned,
        group_by=["region", "product_category"],
        aggregations={
            "sales": ["sum", "mean", "count"],
            "profit": ["sum", "mean"]
        }
    )

    # 过滤数据
    high_value_customers = analyzer.filter(
        df_cleaned,
        conditions=[
            ("purchase_amount", ">", 1000),
            ("engagement_score", ">=", 80)
        ],
        logic="and"  # 所有条件都满足
    )

    return {
        "cleaned_data": df_cleaned,
        "aggregated": aggregated,
        "high_value": high_value_customers
    }

results = advanced_data_processing()
print(f"高价值客户数量: {len(results['high_value'])}")

数据可视化

def generate_visualizations():
    # 读取数据
    df = analyzer.read_csv("sales_data.csv")

    # 创建可视化配置
    viz_config = {
        "type": "bar",           # 图表类型:bar, line, scatter, pie, histogram
        "x": "product_category",
        "y": "sales_amount",
        "title": "各产品类别销售额",
        "color_scheme": "viridis",
        "show_legend": True
    }

    # 生成图表
    chart = analyzer.create_chart(df, viz_config)

    # 保存图表
    analyzer.save_chart(chart, "sales_chart.png", format="png")

    # 或者生成交互式 HTML
    analyzer.save_chart(chart, "sales_chart.html", format="html")

    # 创建多面板图表
    multi_viz = analyzer.create_subplots(
        df,
        charts=[
            {"type": "line", "x": "date", "y": "sales", "title": "销售趋势"},
            {"type": "histogram", "x": "price", "title": "价格分布"},
            {"type": "scatter", "x": "marketing_spend", "y": "revenue", "title": "营销投入与收入关系"}
        ],
        layout=(3, 1)
    )

    analyzer.save_chart(multi_viz, "dashboard.html", format="html")

    return "可视化图表已生成"

result = generate_visualizations()

三、Document Plugin – 文档处理与转换

知识工作中最常见的任务之一就是处理各种格式的文档。这个插件让 AI 能够读取、转换、甚至生成多种格式的文档。

支持的文件格式

  • 读取:PDF、DOCX、TXT、Markdown、HTML
  • 转换:PDF ↔ DOCX ↔ Markdown ↔ HTML
  • 生成:Markdown、HTML、PDF

基本用法:文档读取与内容提取

from knowledge_work_plugins.document import DocumentPlugin, DocumentConfig
import os

# 创建文档处理插件实例
doc_processor = DocumentPlugin()

def extract_document_content():
    # 处理 PDF 文件
    pdf_config = DocumentConfig(format="pdf", extract_tables=True)
    pdf_text = doc_processor.read("report.pdf", config=pdf_config)

    print("PDF 内容预览(前1000字符):")
    print(pdf_text[:1000])

    # 处理 Word 文档
    docx_text = doc_processor.read("document.docx")

    # 处理 Markdown 文件
    md_content = doc_processor.read("notes.md")

    # 提取特定元素
    structured_data = doc_processor.extract_structured(
        "report.pdf",
        extract_rules={
            "titles": {"type": "heading", "max_level": 2},
            "tables": {"type": "table"},
            "images": {"type": "image", "min_size": [100, 100]},
            "metadata": {"type": "metadata"}
        }
    )

    return {
        "pdf_text": pdf_text,
        "docx_text": docx_text,
        "markdown": md_content,
        "structured": structured_data
    }

content = extract_document_content()
print(f"提取到的标题数量: {len(content['structured']['titles'])}")
print(f"提取到的表格数量: {len(content['structured']['tables'])}")

进阶用法:文档转换与生成

def document_conversion_workflow():
    # 将 Markdown 转换为 HTML
    html_output = doc_processor.convert(
        input_file="article.md",
        output_format="html",
        options={
            "template": "article",
            "include_toc": True,
            "syntax_highlighting": True,
            "css_theme": "github-light"
        }
    )

    # 将 HTML 转换为 PDF
    pdf_output = doc_processor.convert(
        input_file="article.html",
        output_format="pdf",
        options={
            "page_size": "A4",
            "margins": {"top": "2cm", "bottom": "2cm", "left": "2.5cm", "right": "2.5cm"},
            "header": {"text": "知识工作助手", "align": "right"},
            "footer": {"text": "第 {page} 页", "align": "center"}
        }
    )

    # 批量转换
    batch_result = doc_processor.batch_convert(
        input_dir="markdown_files/",
        output_dir="html_files/",
        conversions=[
            ("*.md", "html"),
            ("*.txt", "html")
        ],
        options={"template": "minimal"}
    )

    return {
        "html": html_output,
        "pdf": pdf_output,
        "batch_result": batch_result
    }

conversion = document_conversion_workflow()
print(f"批量转换完成: {conversion['batch_result']['success_count']} 个文件")

文档生成功能

def generate_document():
    # 创建 Markdown 文档
    markdown_doc = doc_processor.create_markdown({
        "title": "季度销售报告",
        "author": "数据分析团队",
        "date": "2024-01-15",
        "sections": [
            {
                "heading": "执行摘要",
                "content": "本季度销售额同比增长 25%,主要得益于新产品线的推出。"
            },
            {
                "heading": "详细分析",
                "subsections": [
                    {
                        "heading": "按地区分析",
                        "content": """| 地区 | 销售额 | 增长率 |
|------|--------|--------|
| 华北区 | 1250万 | 28% |
| 华东区 | 1580万 | 22% |
| 华南区 | 980万 | 31% |"""
                    },
                    {
                        "heading": "按产品线分析",
                        "content": "各产品线表现良好,其中智能硬件系列表现最为突出。"
                    }
                ]
            },
            {
                "heading": "结论与建议",
                "content": "- 继续加大智能硬件产品线投入\n- 优化华南区渠道建设\n- 关注新兴市场机会"
            }
        ]
    })

    # 保存文档
    output_path = doc_processor.save(markdown_doc, "quarterly_report.md")

    # 自动转换为 PDF
    doc_processor.convert_to_pdf(output_path, "quarterly_report.pdf")

    return output_path

generated = generate_document()

四、Search Plugin – 搜索与信息检索

在处理知识工作时,能够快速检索信息至关重要。这个插件提供了统一的搜索接口,支持多种搜索源。

支持的数据源

  • 网络搜索引擎
  • 本地文件索引
  • 数据库查询
  • API 接口调用
  • 自定义数据源

基本用法:多源搜索

from knowledge_work_plugins.search import SearchPlugin, SearchConfig, SearchQuery

# 创建搜索插件实例
searcher = SearchPlugin()

def multi_source_search():
    # 构建搜索查询
    query = SearchQuery(
        keywords=["人工智能", "大语言模型"],
        filters={
            "date_range": ("2024-01-01", None),  # 从2024年开始
            "source_type": ["article", "paper"],  # 只搜索文章和论文
            "language": "zh"
        },
        num_results=20  # 返回20条结果
    )

    # 执行搜索
    results = searcher.search(query)

    # 处理结果
    formatted_results = []
    for result in results:
        formatted_results.append({
            "title": result.title,
            "url": result.url,
            "snippet": result.snippet[:200],  # 截取前200字符
            "source": result.source,
            "relevance_score": result.score
        })

    return formatted_results

search_results = multi_source_search()
for i, result in enumerate(search_results[:5], 1):
    print(f"{i}. {result['title']}")
    print(f"   来源: {result['source']} | 相关度: {result['relevance_score']:.2f}")
    print()

进阶用法:本地文件索引搜索

def local_file_search():
    # 配置本地索引
    index_config = {
        "paths": [
            "documents/",
            "notes/",
            "archives/"
        ],
        "file_types": ["pdf", "docx", "txt", "md", "html"],
        "exclude_patterns": ["*.tmp", ".git/*", "__pycache__/*"],
        "index_metadata": True  # 索引文件元数据
    }

    # 创建或更新索引
    indexer = searcher.create_index("knowledge_base", config=index_config)
    indexer.build()

    # 或者增量更新(只索引新增或修改的文件)
    indexer.update()

    # 搜索本地文件
    local_results = searcher.search_local(
        index_name="knowledge_base",
        query="机器学习 实战",
        search_content=True,    # 搜索文件内容
        search_metadata=True,   # 搜索文件元数据
        filters={
            "file_type": ["pdf", "md"],
            "modified_after": "2024-01-01"
        }
    )

    # 获取详细上下文(显示搜索词周围的文字)
    detailed_results = []
    for result in local_results[:10]:
        context = searcher.get_context(
            file_path=result.file_path,
            query="机器学习 实战",
            context_window=200,  # 上下文窗口大小
            max_matches=3        # 最多显示3个匹配位置
        )
        detailed_results.append({
            "file": result.file_path,
            "matches": context.matches
        })

    return detailed_results

local_search = local_file_search()
for item in local_search:
    print(f"文件: {item['file']}")
    for match in item['matches']:
        print(f"  - ...{match['text']}...")

实战教程:从头构建一个 AI 助手工作流

现在你已经了解了各个插件的功能,接下来让我们通过一个完整的实战案例,将这些插件串联起来,构建一个真正有用的 AI 助手工作流。

场景描述

假设你是一个市场分析师,需要完成以下任务:

  1. 从网上搜集某竞品的最新动态
  2. 整理和分析相关数据
  3. 生成一份市场分析报告

完整代码实现

"""
市场分析自动化工作流
这个脚本演示了如何将多个插件组合使用,完成一个完整的分析任务
"""

import asyncio
import json
from datetime import datetime
from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.search import SearchPlugin, SearchQuery
from knowledge_work_plugins.data_analysis import DataAnalysisPlugin
from knowledge_work_plugins.document import DocumentPlugin

class MarketAnalysisWorkflow:
    """市场分析工作流类"""

    def __init__(self):
        self.browser = BrowserPlugin()
        self.searcher = SearchPlugin()
        self.analyzer = DataAnalysisPlugin()
        self.doc_processor = DocumentPlugin()
        self.target_company = None
        self.raw_data = {}
        self.analysis_results = {}

    async def step1_collect_competitor_news(self, company_name):
        """
        第一步:收集竞品新闻动态
        """
        print("=" * 50)
        print(f"第一步:收集 {company_name} 最新动态")
        print("=" * 50)

        self.target_company = company_name

        # 搜索最新新闻
        query = SearchQuery(
            keywords=[company_name, "最新", "动态", "2024"],
            filters={
                "source_type": ["news", "article"],
                "language": "zh"
            },
            num_results=30
        )

        news_results = self.searcher.search(query)

        # 提取详细内容
        detailed_news = []
        async with self.browser as browser:
            page = await browser.new_page()

            for result in news_results[:10]:  # 取前10条
                try:
                    await page.goto(result.url, timeout=10000)
                    await page.wait_for_load_state("domcontentloaded")

                    # 提取文章内容
                    content = await page.inner_text("article") or await page.inner_text("main")

                    detailed_news.append({
                        "title": result.title,
                        "url": result.url,
                        "source": result.source,
                        "published_date": result.metadata.get("published", "未知"),
                        "content": content[:500]  # 截取前500字符
                    })
                    print(f"  ✓ 已提取: {result.title[:40]}...")

                except Exception as e:
                    print(f"  ✗ 提取失败: {result.title[:30]}... ({str(e)})")
                    continue

        self.raw_data["news"] = detailed_news
        print(f"\n成功收集 {len(detailed_news)} 条新闻\n")
        return detailed_news

    def step2_analyze_social_sentiment(self):
        """
        第二步:分析社交媒体情绪
        """
        print("=" * 50)
        print("第二步:社交媒体情绪分析")
        print("=" * 50)

        news_content = " ".join([n["content"] for n in self.raw_data["news"]])

        # 情绪分析关键词
        positive_keywords = [
            "增长", "突破", "创新", "领先", "成功", "优秀",
            "强劲", "扩张", "合作", "获赞", "好评", "高效"
        ]

        negative_keywords = [
            "下降", "亏损", "问题", "失败", "裁员", "危机",
            "下滑", "困境", "争议", "失误", "丑闻", "风险"
        ]

        # 统计关键词出现次数
        positive_count = sum(news_content.count(kw) for kw in positive_keywords)
        negative_count = sum(news_content.count(kw) for kw in negative_keywords)
        total = positive_count + negative_count

        if total > 0:
            positive_ratio = positive_count / total * 100
            negative_ratio = negative_count / total * 100
        else:
            positive_ratio = negative_ratio = 50

        sentiment_analysis = {
            "positive_score": round(positive_ratio, 2),
            "negative_score": round(negative_ratio, 2),
            "overall_sentiment": "正面" if positive_ratio > negative_ratio else "负面",
            "confidence": "高" if abs(positive_ratio - negative_ratio) > 20 else "中"
        }

        self.analysis_results["sentiment"] = sentiment_analysis

        print(f"  正面情绪占比: {sentiment_analysis['positive_score']}%")
        print(f"  负面情绪占比: {sentiment_analysis['negative_score']}%")
        print(f"  整体倾向: {sentiment_analysis['overall_sentiment']}")
        print(f"  分析置信度: {sentiment_analysis['confidence']}\n")

        return sentiment_analysis

    def step3_generate_report(self):
        """
        第三步:生成分析报告
        """
        print("=" * 50)
        print("第三步:生成市场分析报告")
        print("=" * 50)

        # 构建报告内容
        report_data = {
            "title": f"{self.target_company} 市场分析报告",
            "author": "AI 市场分析助手",
            "date": datetime.now().strftime("%Y-%m-%d"),
            "executive_summary": f"""本报告基于网络公开信息,对 {self.target_company} 进行了全面的市场动态分析。

主要发现:
1. 近期共收集到 {len(self.raw_data['news'])} 条相关动态
2. 社交媒体情绪整体呈 {self.analysis_results['sentiment']['overall_sentiment']} 趋势
3. 详细分析请参阅下文各章节""",

            "sections": [
                {
                    "heading": "一、近期新闻动态",
                    "content": self._format_news_section()
                },
                {
                    "heading": "二、舆论情绪分析",
                    "content": f"""根据对近期新闻内容的关键词分析:

• 正面情绪指标得分:{self.analysis_results['sentiment']['positive_score']}%
• 负面情绪指标得分:{self.analysis_results['sentiment']['negative_score']}%
• 整体舆论倾向:{self.analysis_results['sentiment']['overall_sentiment']}

分析置信度:{self.analysis_results['sentiment']['confidence']}"""
                },
                {
                    "heading": "三、分析结论",
                    "content": """基于以上分析,我们得出以下结论:

1. 该公司近期保持较高的媒体曝光度
2. 公众舆论整体反应较为积极
3. 建议持续关注其产品创新和市场拓展动态

风险提示:本分析基于公开信息整理,仅供参考,不构成投资建议。"""
                },
                {
                    "heading": "四、数据来源",
                    "content": self._format_sources_section()
                }
            ]
        }

        # 生成 Markdown 报告
        markdown_report = self.doc_processor.create_markdown(report_data)

        # 保存报告
        report_path = self.doc_processor.save(
            markdown_report,
            f"market_analysis_{self.target_company}.md"
        )

        # 转换为 PDF
        pdf_path = self.doc_processor.convert_to_pdf(
            report_path,
            f"market_analysis_{self.target_company}.pdf"
        )

        print(f"  ✓ Markdown 报告已保存: {report_path}")
        print(f"  ✓ PDF 报告已保存: {pdf_path}\n")

        return {"markdown": report_path, "pdf": pdf_path}

    def _format_news_section(self):
        """格式化新闻部分"""
        lines = []
        for i, news in enumerate(self.raw_data["news"][:5], 1):
            lines.append(f"{i}. **{news['title']}**")
            lines.append(f"   来源:{news['source']} | 日期:{news['published_date']}")
            lines.append(f"   {news['content'][:150]}...\n")
        return "\n".join(lines)

    def _format_sources_section(self):
        """格式化信息来源"""
        lines = []
        for news in self.raw_data["news"][:10]:
            lines.append(f"- {news['url']}")
        return "\n".join(lines)

    def run(self, company_name):
        """运行完整工作流"""
        print("\n" + "=" * 50)
        print(f"🚀 开始市场分析工作流: {company_name}")
        print("=" * 50 + "\n")

        try:
            # 执行各步骤
            self.step1_collect_competitor_news(company_name)
            self.step2_analyze_social_sentiment()
            report_paths = self.step3_generate_report()

            print("=" * 50)
            print("✅ 工作流执行完成!")
            print("=" * 50)

            return {
                "status": "success",
                "news_count": len(self.raw_data["news"]),
                "sentiment": self.analysis_results["sentiment"],
                "report_paths": report_paths
            }

        except Exception as e:
            print(f"\n❌ 工作流执行出错: {str(e)}")
            return {"status": "error", "message": str(e)}


# 运行工作流
async def main():
    workflow = MarketAnalysisWorkflow()
    result = workflow.run("某科技公司")

    if result["status"] == "success":
        print(f"\n📊 分析完成:共处理 {result['news_count']} 条新闻")
        print(f"📈 舆论倾向:{result['sentiment']['overall_sentiment']}")
        print(f"📄 报告位置:{result['report_paths']['pdf']}")


# 程序入口
if __name__ == "__main__":
    asyncio.run(main())

代码解析

这段代码展示了一个完整的工作流设计思路:

工作流设计原则

输入 → 采集 → 处理 → 分析 → 输出
         ↓
    ┌────┴────┐
    ↓         ↓
  浏览器     搜索
  插件       插件
         ↓
    ┌────┴────┐
    ↓         ↓
  数据       文档
  分析       处理
  插件       插件
         ↓
       输出

关键设计点

  1. 模块化封装:每个步骤都是一个独立的方法,便于单独调用和测试
  2. 数据流转:使用 self.raw_dataself.analysis_results 在各步骤间传递数据
  3. 错误处理:每个步骤都有适当的错误捕获,避免整个流程中断
  4. 进度反馈:使用打印语句实时显示执行进度

常见使用场景

场景一:竞品监控自动化

def competitor_monitoring():
    """
    自动化竞品监控系统
    每小时自动检查竞品动态,有重大新闻时发送通知
    """

    # 配置监控目标
    competitors = [
        {"name": "竞品A", "keywords": ["竞品A", "竞品A公司"]},
        {"name": "竞品B", "keywords": ["竞品B", "竞品B产品"]},
        {"name": "竞品C", "keywords": ["竞品C", "竞品C动态"]}
    ]

    searcher = SearchPlugin()
    doc_processor = DocumentPlugin()

    all_updates = []

    for competitor in competitors:
        # 构建复合查询
        query = SearchQuery(
            keywords=competitor["keywords"],
            filters={"time_range": "24h"},  # 只搜索24小时内
            num_results=10
        )

        results = searcher.search(query)

        # 检查是否有重大更新
        major_updates = [
            r for r in results 
            if any(kw in r.title.lower() for kw in ["发布", "融资", "收购", "合作"])
        ]

        if major_updates:
            all_updates.append({
                "competitor": competitor["name"],
                "major_news": major_updates
            })

    # 生成监控报告
    if all_updates:
        report = doc_processor.create_monitoring_report(all_updates)
        doc_processor.save(report, "daily_monitoring.md")
        print(f"检测到 {len(all_updates)} 个重大更新")
    else:
        print("过去24小时无重大更新")

    return all_updates

场景二:批量文档处理

def batch_document_processing():
    """
    批量处理文档工作流
    适用于需要将大量文档统一格式化的场景
    """

    doc_processor = DocumentPlugin()

    # 定义处理流水线
    pipeline = [
        {"action": "read", "format": "auto"},
        {"action": "clean", "remove_ads": True, "normalize_whitespace": True},
        {"action": "convert", "target_format": "markdown"},
        {"action": "extract", "elements": ["title", "headings", "tables"]},
        {"action": "save", "format": "json", "include_content": True}
    ]

    # 批量处理目录中的所有文档
    results = doc_processor.batch_process(
        input_dir="documents_to_process/",
        output_dir="processed_documents/",
        pipeline=pipeline,
        file_pattern="*.{pdf,docx,html}",
        parallel=True,  # 启用并行处理
        max_workers=4   # 最多4个并行任务
    )

    # 生成处理报告
    print(f"处理完成:")
    print(f"  - 成功: {results['success']}")
    print(f"  - 失败: {results['failed']}")
    print(f"  - 总耗时: {results['total_time']:.2f}秒")

    # 导出失败文件列表
    if results['failed'] > 0:
        doc_processor.save_error_log(
            results['errors'],
            "processing_errors.json"
        )

    return results

场景三:数据驱动的决策支持

def decision_support_system():
    """
    数据驱动的决策支持系统
    综合分析多个数据源,为决策提供依据
    """

    analyzer = DataAnalysisPlugin()
    searcher = SearchPlugin()

    # 收集内部数据
    sales_data = analyzer.read_csv("internal/sales_data.csv")
    customer_data = analyzer.read_csv("internal/customer_data.csv")

    # 收集外部市场数据
    market_query = SearchQuery(
        keywords=["市场规模", "行业趋势", "2024"],
        filters={"source_type": ["report", "article"]},
        num_results=15
    )
    market_intelligence = searcher.search(market_query)

    # 综合分析
    analysis = {
        "内部能力评估": analyzer.evaluate_performance(sales_data, customer_data),
        "市场机会分析": analyzer.analyze_market_opportunity(market_intelligence),
        "风险评估": analyzer.assess_risks(sales_data),
        "建议": analyzer.generate_recommendations(sales_data, market_intelligence)
    }

    # 生成决策报告
    report = analyzer.create_decision_report(analysis)

    return report

进阶技巧与最佳实践

性能优化技巧

1. 批量操作优化

# 反面例子:逐个处理,效率低下
for file in file_list:
    result = process_single_file(file)
    save_result(result)

# 正面例子:批量处理,效率更高
results = batch_process(file_list, parallel=True, max_workers=8)
save_batch_results(results)

2. 缓存策略

from knowledge_work_plugins.utils import Cache

# 使用缓存避免重复请求
cache = Cache(cache_dir=".cache", ttl=3600)  # 1小时过期

def get_data_with_cache(url):
    cache_key = hashlib.md5(url.encode()).hexdigest()

    if cache.exists(cache_key):
        return cache.get(cache_key)

    data = fetch_from_url(url)
    cache.set(cache_key, data)

    return data

3. 异步操作最佳实践

import asyncio

async def efficient_batch_processing(items):
    """
    使用信号量控制并发数量
    避免过多并发请求导致系统过载
    """
    semaphore = asyncio.Semaphore(5)  # 最多5个并发

    async def process_with_limit(item):
        async with semaphore:
            return await process_item(item)

    # 并发执行所有任务
    tasks = [process_with_limit(item) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 处理异常结果
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]

    return {"success": successful, "failed": failed}

错误处理策略

from knowledge_work_plugins.utils import Retry, CircuitBreaker

# 配置重试机制
retry_config = Retry(
    max_attempts=3,
    delay=1.0,           # 初始延迟1秒
    backoff=2.0,         # 指数退避
    exceptions=(ConnectionError, TimeoutError)
)

# 配置熔断器
circuit_breaker = CircuitBreaker(
    failure_threshold=5,     # 连续失败5次后断开
    recovery_timeout=60,     # 60秒后尝试恢复
    expected_exception=Exception
)

def robust_operation():
    """
    使用重试和熔断机制保护关键操作
    """
    with circuit_breaker:
        result = retry_config.execute(risky_operation)
        return result

安全最佳实践

# 1. 敏感信息管理
from knowledge_work_plugins.utils import SecretManager

# 使用密钥管理器而非硬编码
secrets = SecretManager()
api_key = secrets.get("ANTHROPIC_API_KEY")
db_password = secrets.get("DATABASE_PASSWORD")

# 2. 请求频率限制
from knowledge_work_plugins.utils import RateLimiter

# 限制每分钟最多请求60次
limiter = RateLimiter(max_requests=60, time_window=60)

def rate_limited_search(query):
    with limiter:
        return searcher.search(query)

# 3. 数据脱敏
from knowledge_work_plugins.utils import DataMasker

masker = DataMasker()
sensitive_data = masker.mask(
    {"name": "张三", "phone": "13812345678", "email": "zhang@example.com"},
    fields=["phone", "email"]
)
# 输出: {"name": "张三", "phone": "138****5678", "email": "z***@example.com"}

自定义插件开发指南

如果你需要的功能不在官方插件列表中,可以基于现有框架开发自己的插件。

插件结构模板

"""
自定义插件示例
文件名:my_custom_plugin.py
"""

from typing import Any, Dict, Optional
from dataclasses import dataclass

# 导入基础类
from knowledge_work_plugins.base import BasePlugin, PluginConfig, PluginMetadata


@dataclass
class MyPluginConfig(PluginConfig):
    """自定义配置类"""
    custom_option: str = "default_value"
    enable_feature_x: bool = True
    max_items: int = 100


class MyCustomPlugin(BasePlugin):
    """
    自定义插件类

    插件说明:
    这个插件用于...(描述插件功能)
    """

    # 插件元数据
    METADATA = PluginMetadata(
        name="my_custom_plugin",
        version="1.0.0",
        author="Your Name",
        description="自定义插件描述",
        tags=["custom", "example"],
        dependencies=["requests", "pandas"]
    )

    def __init__(self, config: Optional[MyPluginConfig] = None):
        """
        初始化插件

        Args:
            config: 插件配置,如果不提供则使用默认配置
        """
        super().__init__(config or MyPluginConfig())
        self.config: MyPluginConfig = self.config  # 类型标注

    def validate_config(self) -> bool:
        """
        验证配置有效性

        Returns:
            配置是否有效
        """
        if self.config.max_items <= 0:
            raise ValueError("max_items must be positive")
        return True

    def execute(self, input_data: Any, **kwargs) -> Dict[str, Any]:
        """
        执行插件核心逻辑

        Args:
            input_data: 输入数据

        Returns:
            处理结果
        """
        # 实现插件逻辑
        result = {
            "status": "success",
            "input_received": input_data,
            "config_used": {
                "custom_option": self.config.custom_option,
                "max_items": self.config.max_items
            }
        }

        return result

    def cleanup(self):
        """
        清理资源
        在插件使用完毕后调用,用于释放资源
        """
        # 关闭连接、清理缓存等
        pass


# 注册插件(使插件可以被插件管理器发现)
from knowledge_work_plugins.registry import register_plugin

register_plugin(MyCustomPlugin)

插件注册与发现

from knowledge_work_plugins.registry import PluginRegistry

# 查看所有可用插件
registry = PluginRegistry()
available_plugins = registry.list_plugins()

for plugin_info in available_plugins:
    print(f"{plugin_info.name} v{plugin_info.version}")
    print(f"  描述: {plugin_info.description}")
    print(f"  标签: {', '.join(plugin_info.tags)}")

# 加载指定插件
custom_plugin = registry.load_plugin("my_custom_plugin")
result = custom_plugin.execute(input_data={"test": "value"})

总结与资源链接

核心要点回顾

┌─────────────────────────────────────────────────────────┐
│                  插件系统核心要点                         │
├─────────────────────────────────────────────────────────┤
│  1. Browser Plugin: 浏览器自动化,适合网页数据采集         │
│  2. Data Analysis Plugin: 数据处理,适合结构化分析         │
│  3. Document Plugin: 文档处理,适合格式转换与生成          │
│  4. Search Plugin: 信息检索,适合多源搜索任务              │
│                                                         │
│  设计原则:                                               │
│  • 模块化:各插件可独立使用,也可组合                      │
│  • 异步优先:充分利用异步IO提升性能                        │
│  • 配置驱动:通过配置而非代码定制行为                       │
│  • 可扩展:支持自定义插件开发                              │
└─────────────────────────────────────────────────────────┘

适用场景总结

场景 推荐插件组合 典型应用
市场调研 Search + Browser + Document 竞品分析、行业报告
数据分析 Data Analysis + Document 销售报告、用户画像
内容创作 Document + Browser + Search 文章撰写、资料整理
学术研究 Search + Document + Data Analysis 文献综述、数据验证

相关资源链接

官方资源

  • GitHub 仓库:https://github.com/anthropics/knowledge-work-plugins
  • 官方文档:https://docs.anthropic.com/plugins
  • 示例代码库:https://github.com/anthropics/plugin-examples

学习资源

  • Anthropic API 文档:https://docs.anthropic.com/api
  • Python 异步编程指南:https://docs.python.org/3/library/asyncio.html
  • Playwright 浏览器自动化:https://playwright.dev/python/

社区与支持

  • Anthropic 开发者论坛:https://community.anthropic.com
  • GitHub Issues:https://github.com/anthropics/knowledge-work-plugins/issues
  • Discord 开发者频道:https://discord.gg/anthropic

相关开源项目

  • LangChain Plugins:扩展 LLMs 能力的插件生态
  • AutoGPT:自主 AI 代理框架
  • LangGraph:构建有状态多角色应用

写在最后

knowledge-work-plugins 项目代表了 Anthropic 对 AI 助手工作流的一次重要探索。通过将浏览器自动化、数据处理、文档操作等常用能力模块化,它让开发者能够更轻松地构建真正实用的 AI 应用。

更重要的是,这套框架的设计理念——模块化、组合式、可扩展——为未来 AI 插件生态的发展提供了一个很好的参考范例。

建议你动手实践一下本文中的示例代码,感受一下这套工具带来的效率提升。如果你开发出了有意思的插件或工作流,也欢迎分享给社区!

如果有任何问题或建议,欢迎在评论区留言讨论。

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注