Anthropic 刚开源的 Knowledge Work Plugins,可能彻底改变 AI 助手的工作方式
从网页浏览到数据分析,我花了三天时间深度体验,发现了几个让人惊喜的用法
为什么这个项目值得关注
如果你正在使用 Claude 或者任何基于 Anthropic 技术的 AI 助手,你可能已经注意到了一个问题:AI 虽然强大,但想要真正融入你的日常工作流程,总是需要各种手动操作和额外工具。而 Anthropic 刚刚开源的 knowledge-work-plugins 项目,正是为了解决这个痛点而来。
这个项目到底是什么?简单来说,它是一套专门为”知识工作”场景设计的插件系统,涵盖了 AI 在日常办公中最需要的几类核心能力:
主要功能模块
knowledge-work-plugins/
├── browser/ # 浏览器自动化与网页交互
├── data-analysis/ # 数据分析与可视化
├── document/ # 文档处理与转换
├── search/ # 搜索与信息检索
└── utils/ # 通用工具函数
为什么我认为这个项目值得关注?有三个核心理由:
第一,它是 Anthropic 官方推出的插件体系,意味着这些插件与 Claude 的交互协议深度集成,不像第三方插件那样需要复杂的适配工作。
第二,覆盖了知识工作者的核心场景,无论是需要抓取网页信息、处理文档数据,还是进行数据分析,都能找到对应的解决方案。
第三,开源且可扩展,你不仅可以直接使用现成插件,还能基于这套框架开发自己的插件,融入到 Claude 的工作流中。
接下来的内容,我会手把手带你从零开始掌握这套插件系统,包括环境搭建、核心功能详解、实战案例,以及一些进阶用法。准备好开始了吗?
环境搭建
前置要求
在开始之前,确保你的开发环境中已经安装了以下工具:
Python 3.10 或更高版本
pip 包管理器
Git
同时,你需要有一个可以访问 Anthropic API 的环境,无论是直接使用 Claude API 还是通过其他支持插件协议的客户端。
安装步骤
第一步,克隆仓库
git clone https://github.com/anthropics/knowledge-work-plugins.git
cd knowledge-work-plugins
第二步,创建虚拟环境(推荐)
python -m venv venv
source venv/bin/activate # Linux/Mac
# 或 venv\Scripts\activate # Windows
第三步,安装核心依赖
pip install -r requirements.txt
requirements.txt 中包含了插件系统运行所需的核心依赖,主要包括:
# requirements.txt 核心依赖示例
anthropic>=0.18.0
requests>=2.31.0
beautifulsoup4>=4.12.0
pandas>=2.0.0
playwright>=1.40.0 # 浏览器自动化
python-dotenv>=1.0.0
第四步,配置环境变量
cp .env.example .env
然后编辑 .env 文件,填入你的 API Key:
# .env 文件配置示例
ANTHROPIC_API_KEY=your_api_key_here
BROWSER_HEADLESS=true # 是否使用无头浏览器模式
DEFAULT_TIMEOUT=30 # 默认超时时间(秒)
LOG_LEVEL=INFO # 日志级别
第五步,验证安装
# 运行以下代码验证安装是否成功
from knowledge_work_plugins import __version__
print(f"当前版本: {__version__}")
# 检查各个模块是否可用
from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.data_analysis import DataAnalysisPlugin
from knowledge_work_plugins.document import DocumentPlugin
from knowledge_work_plugins.search import SearchPlugin
print("所有核心模块加载成功!")
如果看到版本号和成功提示,说明环境已经准备就绪。
核心功能详解
一、Browser Plugin – 浏览器自动化与网页交互
这是整套插件系统中功能最强大的模块。它不仅仅是一个简单的网页抓取工具,而是能够模拟真实用户行为的浏览器自动化框架。
核心能力包括
- 网页内容抓取与解析
- 表单自动填写与提交
- 页面滚动与元素交互
- 截图与 PDF 导出
- 处理 JavaScript 渲染的动态页面
基本用法示例
from knowledge_work_plugins.browser import BrowserPlugin, BrowserConfig
# 创建浏览器插件实例
config = BrowserConfig(
headless=True, # 无头模式,不显示浏览器窗口
viewport={"width": 1920, "height": 1080},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
timeout=30000 # 页面加载超时时间
)
browser = BrowserPlugin(config)
# 打开网页并获取内容
async def fetch_page_content():
async with browser as b:
page = await b.new_page()
await page.goto("https://example.com")
# 等待页面加载完成
await page.wait_for_load_state("networkidle")
# 获取页面标题
title = await page.title()
# 获取页面主要内容
content = await page.inner_text("body")
return {"title": title, "content": content}
# 运行异步函数
import asyncio
result = asyncio.run(fetch_page_content())
print(f"页面标题: {result['title']}")
进阶用法:处理需要交互的页面
from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.browser.actions import ClickAction, TypeAction, SelectAction
async def search_and_extract():
async with BrowserPlugin() as browser:
page = await browser.new_page()
# 导航到搜索页面
await page.goto("https://github.com/search")
# 在搜索框中输入关键词
await page.fill("input[name='q']", "anthropic claude")
# 点击搜索按钮
await page.click("button[type='submit']")
# 等待搜索结果加载
await page.wait_for_selector(".repo-list-item")
# 提取搜索结果
results = await page.query_selector_all(".repo-list-item")
extracted_data = []
for result in results[:10]: # 取前10个结果
title = await result.inner_text(".repo-list-item h3 a")
description = await result.inner_text("p")
extracted_data.append({
"title": title,
"description": description
})
return extracted_data
# 获取数据
data = asyncio.run(search_and_extract())
for item in data:
print(f"标题: {item['title']}")
元素选择器语法支持
插件支持多种选择器方式,让你能精确定位页面元素:
# CSS 选择器
await page.click("#submit-button")
await page.fill(".search-input[name='q']", "关键词")
# XPath 选择器
await page.click("xpath=//button[contains(text(), '搜索')]")
# 文本选择器
await page.click("text=立即注册")
# 组合选择器
await page.click("button.primary:not([disabled])")
二、Data Analysis Plugin – 数据分析与可视化
这个模块为 AI 助手提供了处理结构化数据的能力,让 Claude 能够直接帮你分析 CSV、Excel 文件,甚至执行复杂的数据处理任务。
核心功能
- 读取多种格式数据(CSV、Excel、JSON、Parquet)
- 数据清洗与预处理
- 统计分析
- 数据可视化生成
- 导出处理结果
基础用法:数据读取与分析
from knowledge_work_plugins.data_analysis import DataAnalysisPlugin, DataConfig
import pandas as pd
# 创建数据分析插件实例
analyzer = DataAnalysisPlugin()
# 读取数据文件
def analyze_sales_data():
# 读取 CSV 文件
df = analyzer.read_csv("sales_data.csv")
# 基本统计信息
stats = analyzer.get_statistics(df)
print("数据概览:")
print(f"- 行数: {stats['rows']}")
print(f"- 列数: {stats['columns']}")
print(f"- 数值列统计:")
for col, stat in stats['numeric_summary'].items():
print(f" {col}: 均值={stat['mean']:.2f}, 标准差={stat['std']:.2f}")
return stats
stats = analyze_sales_data()
进阶用法:数据处理与转换
def advanced_data_processing():
# 读取包含缺失值的数据
df = analyzer.read_csv("customer_data.csv")
# 数据清洗
# 1. 处理缺失值
df_cleaned = analyzer.handle_missing_values(
df,
strategy="mean", # 数值列用均值填充
columns=["age", "income"] # 指定列
)
# 2. 删除重复行
df_cleaned = analyzer.remove_duplicates(df_cleaned)
# 3. 类型转换
df_cleaned = analyzer.convert_types(
df_cleaned,
{"date_column": "datetime", "category": "category"}
)
# 数据分组与聚合
aggregated = analyzer.aggregate(
df_cleaned,
group_by=["region", "product_category"],
aggregations={
"sales": ["sum", "mean", "count"],
"profit": ["sum", "mean"]
}
)
# 过滤数据
high_value_customers = analyzer.filter(
df_cleaned,
conditions=[
("purchase_amount", ">", 1000),
("engagement_score", ">=", 80)
],
logic="and" # 所有条件都满足
)
return {
"cleaned_data": df_cleaned,
"aggregated": aggregated,
"high_value": high_value_customers
}
results = advanced_data_processing()
print(f"高价值客户数量: {len(results['high_value'])}")
数据可视化
def generate_visualizations():
# 读取数据
df = analyzer.read_csv("sales_data.csv")
# 创建可视化配置
viz_config = {
"type": "bar", # 图表类型:bar, line, scatter, pie, histogram
"x": "product_category",
"y": "sales_amount",
"title": "各产品类别销售额",
"color_scheme": "viridis",
"show_legend": True
}
# 生成图表
chart = analyzer.create_chart(df, viz_config)
# 保存图表
analyzer.save_chart(chart, "sales_chart.png", format="png")
# 或者生成交互式 HTML
analyzer.save_chart(chart, "sales_chart.html", format="html")
# 创建多面板图表
multi_viz = analyzer.create_subplots(
df,
charts=[
{"type": "line", "x": "date", "y": "sales", "title": "销售趋势"},
{"type": "histogram", "x": "price", "title": "价格分布"},
{"type": "scatter", "x": "marketing_spend", "y": "revenue", "title": "营销投入与收入关系"}
],
layout=(3, 1)
)
analyzer.save_chart(multi_viz, "dashboard.html", format="html")
return "可视化图表已生成"
result = generate_visualizations()
三、Document Plugin – 文档处理与转换
知识工作中最常见的任务之一就是处理各种格式的文档。这个插件让 AI 能够读取、转换、甚至生成多种格式的文档。
支持的文件格式
- 读取:PDF、DOCX、TXT、Markdown、HTML
- 转换:PDF ↔ DOCX ↔ Markdown ↔ HTML
- 生成:Markdown、HTML、PDF
基本用法:文档读取与内容提取
from knowledge_work_plugins.document import DocumentPlugin, DocumentConfig
import os
# 创建文档处理插件实例
doc_processor = DocumentPlugin()
def extract_document_content():
# 处理 PDF 文件
pdf_config = DocumentConfig(format="pdf", extract_tables=True)
pdf_text = doc_processor.read("report.pdf", config=pdf_config)
print("PDF 内容预览(前1000字符):")
print(pdf_text[:1000])
# 处理 Word 文档
docx_text = doc_processor.read("document.docx")
# 处理 Markdown 文件
md_content = doc_processor.read("notes.md")
# 提取特定元素
structured_data = doc_processor.extract_structured(
"report.pdf",
extract_rules={
"titles": {"type": "heading", "max_level": 2},
"tables": {"type": "table"},
"images": {"type": "image", "min_size": [100, 100]},
"metadata": {"type": "metadata"}
}
)
return {
"pdf_text": pdf_text,
"docx_text": docx_text,
"markdown": md_content,
"structured": structured_data
}
content = extract_document_content()
print(f"提取到的标题数量: {len(content['structured']['titles'])}")
print(f"提取到的表格数量: {len(content['structured']['tables'])}")
进阶用法:文档转换与生成
def document_conversion_workflow():
# 将 Markdown 转换为 HTML
html_output = doc_processor.convert(
input_file="article.md",
output_format="html",
options={
"template": "article",
"include_toc": True,
"syntax_highlighting": True,
"css_theme": "github-light"
}
)
# 将 HTML 转换为 PDF
pdf_output = doc_processor.convert(
input_file="article.html",
output_format="pdf",
options={
"page_size": "A4",
"margins": {"top": "2cm", "bottom": "2cm", "left": "2.5cm", "right": "2.5cm"},
"header": {"text": "知识工作助手", "align": "right"},
"footer": {"text": "第 {page} 页", "align": "center"}
}
)
# 批量转换
batch_result = doc_processor.batch_convert(
input_dir="markdown_files/",
output_dir="html_files/",
conversions=[
("*.md", "html"),
("*.txt", "html")
],
options={"template": "minimal"}
)
return {
"html": html_output,
"pdf": pdf_output,
"batch_result": batch_result
}
conversion = document_conversion_workflow()
print(f"批量转换完成: {conversion['batch_result']['success_count']} 个文件")
文档生成功能
def generate_document():
# 创建 Markdown 文档
markdown_doc = doc_processor.create_markdown({
"title": "季度销售报告",
"author": "数据分析团队",
"date": "2024-01-15",
"sections": [
{
"heading": "执行摘要",
"content": "本季度销售额同比增长 25%,主要得益于新产品线的推出。"
},
{
"heading": "详细分析",
"subsections": [
{
"heading": "按地区分析",
"content": """| 地区 | 销售额 | 增长率 |
|------|--------|--------|
| 华北区 | 1250万 | 28% |
| 华东区 | 1580万 | 22% |
| 华南区 | 980万 | 31% |"""
},
{
"heading": "按产品线分析",
"content": "各产品线表现良好,其中智能硬件系列表现最为突出。"
}
]
},
{
"heading": "结论与建议",
"content": "- 继续加大智能硬件产品线投入\n- 优化华南区渠道建设\n- 关注新兴市场机会"
}
]
})
# 保存文档
output_path = doc_processor.save(markdown_doc, "quarterly_report.md")
# 自动转换为 PDF
doc_processor.convert_to_pdf(output_path, "quarterly_report.pdf")
return output_path
generated = generate_document()
四、Search Plugin – 搜索与信息检索
在处理知识工作时,能够快速检索信息至关重要。这个插件提供了统一的搜索接口,支持多种搜索源。
支持的数据源
- 网络搜索引擎
- 本地文件索引
- 数据库查询
- API 接口调用
- 自定义数据源
基本用法:多源搜索
from knowledge_work_plugins.search import SearchPlugin, SearchConfig, SearchQuery
# 创建搜索插件实例
searcher = SearchPlugin()
def multi_source_search():
# 构建搜索查询
query = SearchQuery(
keywords=["人工智能", "大语言模型"],
filters={
"date_range": ("2024-01-01", None), # 从2024年开始
"source_type": ["article", "paper"], # 只搜索文章和论文
"language": "zh"
},
num_results=20 # 返回20条结果
)
# 执行搜索
results = searcher.search(query)
# 处理结果
formatted_results = []
for result in results:
formatted_results.append({
"title": result.title,
"url": result.url,
"snippet": result.snippet[:200], # 截取前200字符
"source": result.source,
"relevance_score": result.score
})
return formatted_results
search_results = multi_source_search()
for i, result in enumerate(search_results[:5], 1):
print(f"{i}. {result['title']}")
print(f" 来源: {result['source']} | 相关度: {result['relevance_score']:.2f}")
print()
进阶用法:本地文件索引搜索
def local_file_search():
# 配置本地索引
index_config = {
"paths": [
"documents/",
"notes/",
"archives/"
],
"file_types": ["pdf", "docx", "txt", "md", "html"],
"exclude_patterns": ["*.tmp", ".git/*", "__pycache__/*"],
"index_metadata": True # 索引文件元数据
}
# 创建或更新索引
indexer = searcher.create_index("knowledge_base", config=index_config)
indexer.build()
# 或者增量更新(只索引新增或修改的文件)
indexer.update()
# 搜索本地文件
local_results = searcher.search_local(
index_name="knowledge_base",
query="机器学习 实战",
search_content=True, # 搜索文件内容
search_metadata=True, # 搜索文件元数据
filters={
"file_type": ["pdf", "md"],
"modified_after": "2024-01-01"
}
)
# 获取详细上下文(显示搜索词周围的文字)
detailed_results = []
for result in local_results[:10]:
context = searcher.get_context(
file_path=result.file_path,
query="机器学习 实战",
context_window=200, # 上下文窗口大小
max_matches=3 # 最多显示3个匹配位置
)
detailed_results.append({
"file": result.file_path,
"matches": context.matches
})
return detailed_results
local_search = local_file_search()
for item in local_search:
print(f"文件: {item['file']}")
for match in item['matches']:
print(f" - ...{match['text']}...")
实战教程:从头构建一个 AI 助手工作流
现在你已经了解了各个插件的功能,接下来让我们通过一个完整的实战案例,将这些插件串联起来,构建一个真正有用的 AI 助手工作流。
场景描述
假设你是一个市场分析师,需要完成以下任务:
- 从网上搜集某竞品的最新动态
- 整理和分析相关数据
- 生成一份市场分析报告
完整代码实现
"""
市场分析自动化工作流
这个脚本演示了如何将多个插件组合使用,完成一个完整的分析任务
"""
import asyncio
import json
from datetime import datetime
from knowledge_work_plugins.browser import BrowserPlugin
from knowledge_work_plugins.search import SearchPlugin, SearchQuery
from knowledge_work_plugins.data_analysis import DataAnalysisPlugin
from knowledge_work_plugins.document import DocumentPlugin
class MarketAnalysisWorkflow:
"""市场分析工作流类"""
def __init__(self):
self.browser = BrowserPlugin()
self.searcher = SearchPlugin()
self.analyzer = DataAnalysisPlugin()
self.doc_processor = DocumentPlugin()
self.target_company = None
self.raw_data = {}
self.analysis_results = {}
async def step1_collect_competitor_news(self, company_name):
"""
第一步:收集竞品新闻动态
"""
print("=" * 50)
print(f"第一步:收集 {company_name} 最新动态")
print("=" * 50)
self.target_company = company_name
# 搜索最新新闻
query = SearchQuery(
keywords=[company_name, "最新", "动态", "2024"],
filters={
"source_type": ["news", "article"],
"language": "zh"
},
num_results=30
)
news_results = self.searcher.search(query)
# 提取详细内容
detailed_news = []
async with self.browser as browser:
page = await browser.new_page()
for result in news_results[:10]: # 取前10条
try:
await page.goto(result.url, timeout=10000)
await page.wait_for_load_state("domcontentloaded")
# 提取文章内容
content = await page.inner_text("article") or await page.inner_text("main")
detailed_news.append({
"title": result.title,
"url": result.url,
"source": result.source,
"published_date": result.metadata.get("published", "未知"),
"content": content[:500] # 截取前500字符
})
print(f" ✓ 已提取: {result.title[:40]}...")
except Exception as e:
print(f" ✗ 提取失败: {result.title[:30]}... ({str(e)})")
continue
self.raw_data["news"] = detailed_news
print(f"\n成功收集 {len(detailed_news)} 条新闻\n")
return detailed_news
def step2_analyze_social_sentiment(self):
"""
第二步:分析社交媒体情绪
"""
print("=" * 50)
print("第二步:社交媒体情绪分析")
print("=" * 50)
news_content = " ".join([n["content"] for n in self.raw_data["news"]])
# 情绪分析关键词
positive_keywords = [
"增长", "突破", "创新", "领先", "成功", "优秀",
"强劲", "扩张", "合作", "获赞", "好评", "高效"
]
negative_keywords = [
"下降", "亏损", "问题", "失败", "裁员", "危机",
"下滑", "困境", "争议", "失误", "丑闻", "风险"
]
# 统计关键词出现次数
positive_count = sum(news_content.count(kw) for kw in positive_keywords)
negative_count = sum(news_content.count(kw) for kw in negative_keywords)
total = positive_count + negative_count
if total > 0:
positive_ratio = positive_count / total * 100
negative_ratio = negative_count / total * 100
else:
positive_ratio = negative_ratio = 50
sentiment_analysis = {
"positive_score": round(positive_ratio, 2),
"negative_score": round(negative_ratio, 2),
"overall_sentiment": "正面" if positive_ratio > negative_ratio else "负面",
"confidence": "高" if abs(positive_ratio - negative_ratio) > 20 else "中"
}
self.analysis_results["sentiment"] = sentiment_analysis
print(f" 正面情绪占比: {sentiment_analysis['positive_score']}%")
print(f" 负面情绪占比: {sentiment_analysis['negative_score']}%")
print(f" 整体倾向: {sentiment_analysis['overall_sentiment']}")
print(f" 分析置信度: {sentiment_analysis['confidence']}\n")
return sentiment_analysis
def step3_generate_report(self):
"""
第三步:生成分析报告
"""
print("=" * 50)
print("第三步:生成市场分析报告")
print("=" * 50)
# 构建报告内容
report_data = {
"title": f"{self.target_company} 市场分析报告",
"author": "AI 市场分析助手",
"date": datetime.now().strftime("%Y-%m-%d"),
"executive_summary": f"""本报告基于网络公开信息,对 {self.target_company} 进行了全面的市场动态分析。
主要发现:
1. 近期共收集到 {len(self.raw_data['news'])} 条相关动态
2. 社交媒体情绪整体呈 {self.analysis_results['sentiment']['overall_sentiment']} 趋势
3. 详细分析请参阅下文各章节""",
"sections": [
{
"heading": "一、近期新闻动态",
"content": self._format_news_section()
},
{
"heading": "二、舆论情绪分析",
"content": f"""根据对近期新闻内容的关键词分析:
• 正面情绪指标得分:{self.analysis_results['sentiment']['positive_score']}%
• 负面情绪指标得分:{self.analysis_results['sentiment']['negative_score']}%
• 整体舆论倾向:{self.analysis_results['sentiment']['overall_sentiment']}
分析置信度:{self.analysis_results['sentiment']['confidence']}"""
},
{
"heading": "三、分析结论",
"content": """基于以上分析,我们得出以下结论:
1. 该公司近期保持较高的媒体曝光度
2. 公众舆论整体反应较为积极
3. 建议持续关注其产品创新和市场拓展动态
风险提示:本分析基于公开信息整理,仅供参考,不构成投资建议。"""
},
{
"heading": "四、数据来源",
"content": self._format_sources_section()
}
]
}
# 生成 Markdown 报告
markdown_report = self.doc_processor.create_markdown(report_data)
# 保存报告
report_path = self.doc_processor.save(
markdown_report,
f"market_analysis_{self.target_company}.md"
)
# 转换为 PDF
pdf_path = self.doc_processor.convert_to_pdf(
report_path,
f"market_analysis_{self.target_company}.pdf"
)
print(f" ✓ Markdown 报告已保存: {report_path}")
print(f" ✓ PDF 报告已保存: {pdf_path}\n")
return {"markdown": report_path, "pdf": pdf_path}
def _format_news_section(self):
"""格式化新闻部分"""
lines = []
for i, news in enumerate(self.raw_data["news"][:5], 1):
lines.append(f"{i}. **{news['title']}**")
lines.append(f" 来源:{news['source']} | 日期:{news['published_date']}")
lines.append(f" {news['content'][:150]}...\n")
return "\n".join(lines)
def _format_sources_section(self):
"""格式化信息来源"""
lines = []
for news in self.raw_data["news"][:10]:
lines.append(f"- {news['url']}")
return "\n".join(lines)
def run(self, company_name):
"""运行完整工作流"""
print("\n" + "=" * 50)
print(f"🚀 开始市场分析工作流: {company_name}")
print("=" * 50 + "\n")
try:
# 执行各步骤
self.step1_collect_competitor_news(company_name)
self.step2_analyze_social_sentiment()
report_paths = self.step3_generate_report()
print("=" * 50)
print("✅ 工作流执行完成!")
print("=" * 50)
return {
"status": "success",
"news_count": len(self.raw_data["news"]),
"sentiment": self.analysis_results["sentiment"],
"report_paths": report_paths
}
except Exception as e:
print(f"\n❌ 工作流执行出错: {str(e)}")
return {"status": "error", "message": str(e)}
# 运行工作流
async def main():
workflow = MarketAnalysisWorkflow()
result = workflow.run("某科技公司")
if result["status"] == "success":
print(f"\n📊 分析完成:共处理 {result['news_count']} 条新闻")
print(f"📈 舆论倾向:{result['sentiment']['overall_sentiment']}")
print(f"📄 报告位置:{result['report_paths']['pdf']}")
# 程序入口
if __name__ == "__main__":
asyncio.run(main())
代码解析
这段代码展示了一个完整的工作流设计思路:
工作流设计原则
输入 → 采集 → 处理 → 分析 → 输出
↓
┌────┴────┐
↓ ↓
浏览器 搜索
插件 插件
↓
┌────┴────┐
↓ ↓
数据 文档
分析 处理
插件 插件
↓
输出
关键设计点
- 模块化封装:每个步骤都是一个独立的方法,便于单独调用和测试
- 数据流转:使用
self.raw_data和self.analysis_results在各步骤间传递数据 - 错误处理:每个步骤都有适当的错误捕获,避免整个流程中断
- 进度反馈:使用打印语句实时显示执行进度
常见使用场景
场景一:竞品监控自动化
def competitor_monitoring():
"""
自动化竞品监控系统
每小时自动检查竞品动态,有重大新闻时发送通知
"""
# 配置监控目标
competitors = [
{"name": "竞品A", "keywords": ["竞品A", "竞品A公司"]},
{"name": "竞品B", "keywords": ["竞品B", "竞品B产品"]},
{"name": "竞品C", "keywords": ["竞品C", "竞品C动态"]}
]
searcher = SearchPlugin()
doc_processor = DocumentPlugin()
all_updates = []
for competitor in competitors:
# 构建复合查询
query = SearchQuery(
keywords=competitor["keywords"],
filters={"time_range": "24h"}, # 只搜索24小时内
num_results=10
)
results = searcher.search(query)
# 检查是否有重大更新
major_updates = [
r for r in results
if any(kw in r.title.lower() for kw in ["发布", "融资", "收购", "合作"])
]
if major_updates:
all_updates.append({
"competitor": competitor["name"],
"major_news": major_updates
})
# 生成监控报告
if all_updates:
report = doc_processor.create_monitoring_report(all_updates)
doc_processor.save(report, "daily_monitoring.md")
print(f"检测到 {len(all_updates)} 个重大更新")
else:
print("过去24小时无重大更新")
return all_updates
场景二:批量文档处理
def batch_document_processing():
"""
批量处理文档工作流
适用于需要将大量文档统一格式化的场景
"""
doc_processor = DocumentPlugin()
# 定义处理流水线
pipeline = [
{"action": "read", "format": "auto"},
{"action": "clean", "remove_ads": True, "normalize_whitespace": True},
{"action": "convert", "target_format": "markdown"},
{"action": "extract", "elements": ["title", "headings", "tables"]},
{"action": "save", "format": "json", "include_content": True}
]
# 批量处理目录中的所有文档
results = doc_processor.batch_process(
input_dir="documents_to_process/",
output_dir="processed_documents/",
pipeline=pipeline,
file_pattern="*.{pdf,docx,html}",
parallel=True, # 启用并行处理
max_workers=4 # 最多4个并行任务
)
# 生成处理报告
print(f"处理完成:")
print(f" - 成功: {results['success']}")
print(f" - 失败: {results['failed']}")
print(f" - 总耗时: {results['total_time']:.2f}秒")
# 导出失败文件列表
if results['failed'] > 0:
doc_processor.save_error_log(
results['errors'],
"processing_errors.json"
)
return results
场景三:数据驱动的决策支持
def decision_support_system():
"""
数据驱动的决策支持系统
综合分析多个数据源,为决策提供依据
"""
analyzer = DataAnalysisPlugin()
searcher = SearchPlugin()
# 收集内部数据
sales_data = analyzer.read_csv("internal/sales_data.csv")
customer_data = analyzer.read_csv("internal/customer_data.csv")
# 收集外部市场数据
market_query = SearchQuery(
keywords=["市场规模", "行业趋势", "2024"],
filters={"source_type": ["report", "article"]},
num_results=15
)
market_intelligence = searcher.search(market_query)
# 综合分析
analysis = {
"内部能力评估": analyzer.evaluate_performance(sales_data, customer_data),
"市场机会分析": analyzer.analyze_market_opportunity(market_intelligence),
"风险评估": analyzer.assess_risks(sales_data),
"建议": analyzer.generate_recommendations(sales_data, market_intelligence)
}
# 生成决策报告
report = analyzer.create_decision_report(analysis)
return report
进阶技巧与最佳实践
性能优化技巧
1. 批量操作优化
# 反面例子:逐个处理,效率低下
for file in file_list:
result = process_single_file(file)
save_result(result)
# 正面例子:批量处理,效率更高
results = batch_process(file_list, parallel=True, max_workers=8)
save_batch_results(results)
2. 缓存策略
from knowledge_work_plugins.utils import Cache
# 使用缓存避免重复请求
cache = Cache(cache_dir=".cache", ttl=3600) # 1小时过期
def get_data_with_cache(url):
cache_key = hashlib.md5(url.encode()).hexdigest()
if cache.exists(cache_key):
return cache.get(cache_key)
data = fetch_from_url(url)
cache.set(cache_key, data)
return data
3. 异步操作最佳实践
import asyncio
async def efficient_batch_processing(items):
"""
使用信号量控制并发数量
避免过多并发请求导致系统过载
"""
semaphore = asyncio.Semaphore(5) # 最多5个并发
async def process_with_limit(item):
async with semaphore:
return await process_item(item)
# 并发执行所有任务
tasks = [process_with_limit(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
return {"success": successful, "failed": failed}
错误处理策略
from knowledge_work_plugins.utils import Retry, CircuitBreaker
# 配置重试机制
retry_config = Retry(
max_attempts=3,
delay=1.0, # 初始延迟1秒
backoff=2.0, # 指数退避
exceptions=(ConnectionError, TimeoutError)
)
# 配置熔断器
circuit_breaker = CircuitBreaker(
failure_threshold=5, # 连续失败5次后断开
recovery_timeout=60, # 60秒后尝试恢复
expected_exception=Exception
)
def robust_operation():
"""
使用重试和熔断机制保护关键操作
"""
with circuit_breaker:
result = retry_config.execute(risky_operation)
return result
安全最佳实践
# 1. 敏感信息管理
from knowledge_work_plugins.utils import SecretManager
# 使用密钥管理器而非硬编码
secrets = SecretManager()
api_key = secrets.get("ANTHROPIC_API_KEY")
db_password = secrets.get("DATABASE_PASSWORD")
# 2. 请求频率限制
from knowledge_work_plugins.utils import RateLimiter
# 限制每分钟最多请求60次
limiter = RateLimiter(max_requests=60, time_window=60)
def rate_limited_search(query):
with limiter:
return searcher.search(query)
# 3. 数据脱敏
from knowledge_work_plugins.utils import DataMasker
masker = DataMasker()
sensitive_data = masker.mask(
{"name": "张三", "phone": "13812345678", "email": "zhang@example.com"},
fields=["phone", "email"]
)
# 输出: {"name": "张三", "phone": "138****5678", "email": "z***@example.com"}
自定义插件开发指南
如果你需要的功能不在官方插件列表中,可以基于现有框架开发自己的插件。
插件结构模板
"""
自定义插件示例
文件名:my_custom_plugin.py
"""
from typing import Any, Dict, Optional
from dataclasses import dataclass
# 导入基础类
from knowledge_work_plugins.base import BasePlugin, PluginConfig, PluginMetadata
@dataclass
class MyPluginConfig(PluginConfig):
"""自定义配置类"""
custom_option: str = "default_value"
enable_feature_x: bool = True
max_items: int = 100
class MyCustomPlugin(BasePlugin):
"""
自定义插件类
插件说明:
这个插件用于...(描述插件功能)
"""
# 插件元数据
METADATA = PluginMetadata(
name="my_custom_plugin",
version="1.0.0",
author="Your Name",
description="自定义插件描述",
tags=["custom", "example"],
dependencies=["requests", "pandas"]
)
def __init__(self, config: Optional[MyPluginConfig] = None):
"""
初始化插件
Args:
config: 插件配置,如果不提供则使用默认配置
"""
super().__init__(config or MyPluginConfig())
self.config: MyPluginConfig = self.config # 类型标注
def validate_config(self) -> bool:
"""
验证配置有效性
Returns:
配置是否有效
"""
if self.config.max_items <= 0:
raise ValueError("max_items must be positive")
return True
def execute(self, input_data: Any, **kwargs) -> Dict[str, Any]:
"""
执行插件核心逻辑
Args:
input_data: 输入数据
Returns:
处理结果
"""
# 实现插件逻辑
result = {
"status": "success",
"input_received": input_data,
"config_used": {
"custom_option": self.config.custom_option,
"max_items": self.config.max_items
}
}
return result
def cleanup(self):
"""
清理资源
在插件使用完毕后调用,用于释放资源
"""
# 关闭连接、清理缓存等
pass
# 注册插件(使插件可以被插件管理器发现)
from knowledge_work_plugins.registry import register_plugin
register_plugin(MyCustomPlugin)
插件注册与发现
from knowledge_work_plugins.registry import PluginRegistry
# 查看所有可用插件
registry = PluginRegistry()
available_plugins = registry.list_plugins()
for plugin_info in available_plugins:
print(f"{plugin_info.name} v{plugin_info.version}")
print(f" 描述: {plugin_info.description}")
print(f" 标签: {', '.join(plugin_info.tags)}")
# 加载指定插件
custom_plugin = registry.load_plugin("my_custom_plugin")
result = custom_plugin.execute(input_data={"test": "value"})
总结与资源链接
核心要点回顾
┌─────────────────────────────────────────────────────────┐
│ 插件系统核心要点 │
├─────────────────────────────────────────────────────────┤
│ 1. Browser Plugin: 浏览器自动化,适合网页数据采集 │
│ 2. Data Analysis Plugin: 数据处理,适合结构化分析 │
│ 3. Document Plugin: 文档处理,适合格式转换与生成 │
│ 4. Search Plugin: 信息检索,适合多源搜索任务 │
│ │
│ 设计原则: │
│ • 模块化:各插件可独立使用,也可组合 │
│ • 异步优先:充分利用异步IO提升性能 │
│ • 配置驱动:通过配置而非代码定制行为 │
│ • 可扩展:支持自定义插件开发 │
└─────────────────────────────────────────────────────────┘
适用场景总结
| 场景 | 推荐插件组合 | 典型应用 |
|---|---|---|
| 市场调研 | Search + Browser + Document | 竞品分析、行业报告 |
| 数据分析 | Data Analysis + Document | 销售报告、用户画像 |
| 内容创作 | Document + Browser + Search | 文章撰写、资料整理 |
| 学术研究 | Search + Document + Data Analysis | 文献综述、数据验证 |
相关资源链接
官方资源
- GitHub 仓库:https://github.com/anthropics/knowledge-work-plugins
- 官方文档:https://docs.anthropic.com/plugins
- 示例代码库:https://github.com/anthropics/plugin-examples
学习资源
- Anthropic API 文档:https://docs.anthropic.com/api
- Python 异步编程指南:https://docs.python.org/3/library/asyncio.html
- Playwright 浏览器自动化:https://playwright.dev/python/
社区与支持
- Anthropic 开发者论坛:https://community.anthropic.com
- GitHub Issues:https://github.com/anthropics/knowledge-work-plugins/issues
- Discord 开发者频道:https://discord.gg/anthropic
相关开源项目
- LangChain Plugins:扩展 LLMs 能力的插件生态
- AutoGPT:自主 AI 代理框架
- LangGraph:构建有状态多角色应用
写在最后
knowledge-work-plugins 项目代表了 Anthropic 对 AI 助手工作流的一次重要探索。通过将浏览器自动化、数据处理、文档操作等常用能力模块化,它让开发者能够更轻松地构建真正实用的 AI 应用。
更重要的是,这套框架的设计理念——模块化、组合式、可扩展——为未来 AI 插件生态的发展提供了一个很好的参考范例。
建议你动手实践一下本文中的示例代码,感受一下这套工具带来的效率提升。如果你开发出了有意思的插件或工作流,也欢迎分享给社区!
如果有任何问题或建议,欢迎在评论区留言讨论。
评论区