别再手动操作GUI了!KeygraphHQ开源Shannon,让AI像人一样控制你的电脑
从截图到智能决策,一文搞懂下一代人机协作神器
为什么这个项目值得关注
在AI飞速发展的今天,大语言模型已经能够进行复杂的推理和生成,但如何让AI真正”动手”完成实际工作,一直是业界面临的重大挑战。传统的RPA(机器人流程自动化)需要繁琐的规则配置,而单纯的API调用又无法应对复杂多变的真实场景。
Shannon的出现,正是为了解决这个痛点。
Shannon是KeygraphHQ开源的一个人机协作工具,它的核心目标是让AI模型能够像人类一样”看懂”屏幕并与之交互。通过结合计算机视觉、自然语言处理和浏览器自动化技术,Shannon能够让AI根据屏幕截图理解当前状态,然后做出合理的操作决策。
这个项目的意义远不止于技术本身。它代表着AI从”只会思考”向”能够行动”迈出的关键一步。无论是自动化测试、文档处理、数据采集还是智能助手开发,Shannon都提供了前所未有的可能性。
项目概述与核心定位
在深入学习之前,我们先来了解Shannon的整体架构和设计理念。
Shannon是什么
Shannon是一个开源的AI计算机控制框架,它通过截取屏幕内容、结合视觉理解,使AI模型能够自主决策并执行相应的操作。整个系统设计遵循”观察-思考-行动”的闭环模式,模拟人类使用计算机的基本流程。
核心设计理念
Shannon的设计理念可以概括为三个关键点:
第一,端到端的视觉理解。 不同于传统自动化工具依赖的DOM解析或坐标定位,Shannon直接从屏幕截图中提取信息。这种方式的优势在于它能够适应任何界面布局,无论是Web应用、桌面软件还是移动端模拟器。
第二,自然语言驱动的操作指令。 用户不需要编写复杂的脚本,只需用自然语言描述任务目标,Shannon就能将其转化为可执行的计算机操作。
第三,可观测性与可控性并重。 系统在执行每一步操作前都会进行确认,确保AI的行为在预期范围内,同时保留完整的操作日志便于追踪和调试。
环境搭建
系统要求
在开始之前,确保你的开发环境满足以下基本要求:
- Python 3.9或更高版本
- 支持Playwright的浏览器环境(主要用于Web自动化场景)
- 至少8GB RAM的计算机
- 稳定的网络连接(用于API调用)
安装步骤
创建虚拟环境(推荐做法)
为了避免依赖冲突,建议在独立的环境中安装Shannon。使用conda或venv创建新的虚拟环境:
# 使用conda创建虚拟环境
conda create -n shannon-env python=3.10
conda activate shannon-env
# 或者使用venv
python -m venv shannon-env
source shannon-env/bin/activate # Linux/Mac
shannon-env\Scripts\activate # Windows
安装Shannon核心包
通过pip安装Shannon及其依赖:
# 安装稳定版本
pip install shannon-ai
# 或者安装最新开发版本
pip install git+https://github.com/KeygraphHQ/shannon.git
# 如果需要完整的依赖(包括Playwright)
pip install shannon-ai[full]
验证安装
安装完成后,运行以下代码验证环境是否正确配置:
# 验证Shannon安装
import shannon
print(f"Shannon版本: {shannon.__version__}")
# 检查核心组件
from shannon import Agent, BrowserController, ScreenCapture
print("所有核心组件加载成功!")
配置浏览器环境
Shannon的许多功能依赖于浏览器自动化,因此需要正确配置Playwright:
# 安装Playwright浏览器
from shannon.utils import setup_browsers
setup_browsers() # 这将自动下载Chromium浏览器
# 如果在中国大陆地区,可能需要配置镜像源
import subprocess
subprocess.run([
"playwright", "install", "chromium",
"--with-deps"
])
配置文件说明
Shannon的行为可以通过配置文件进行定制。创建配置文件shannon_config.yaml:
# shannon_config.yaml
general:
verbose: true # 输出详细日志
log_dir: "./logs" # 日志目录
screenshot_dir: "./screenshots" # 截图保存目录
agent:
model_provider: "anthropic" # 模型提供商
model_name: "claude-sonnet-4" # 使用的模型
max_iterations: 50 # 最大迭代次数
confirmation_mode: true # 操作前确认
browser:
headless: false # 是否无头模式运行
viewport:
width: 1920
height: 1080
user_agent: "Mozilla/5.0 ..." # 自定义User-Agent
slow_mo: 100 # 操作延迟(毫秒)
vision:
ocr_enabled: true # 启用OCR识别
element_detection: true # 启用元素检测
confidence_threshold: 0.8 # 识别置信度阈值
核心功能详解
1. ScreenCapture:屏幕捕获模块
ScreenCapture是Shannon的视觉输入核心,负责将屏幕内容转换为AI可以理解的图像数据。
基本用法
from shannon.vision import ScreenCapture
# 创建捕获器实例
capture = ScreenCapture()
# 捕获整个屏幕
full_screenshot = capture.capture_full_screen()
# 捕获特定区域(坐标: x, y, width, height)
region_screenshot = capture.capture_region(x=100, y=100, width=800, height=600)
# 捕获特定窗口
window_screenshot = capture.capture_window(window_name="Chrome")
# 保存截图供后续分析
capture.save_screenshot(full_screenshot, "output/screen_001.png")
高级配置
from shannon.vision import ScreenCapture, CaptureConfig
# 自定义捕获配置
config = CaptureConfig(
include_cursor=True, # 在截图中包含鼠标光标
include_timestamp=True, # 添加时间戳水印
compression=0.9, # JPEG压缩质量
grayscale=False, # 是否转换为灰度图
scale_factor=2.0 # 高DPI屏幕缩放
)
capture = ScreenCapture(config=config)
# 连续捕获(用于视频录制或动态分析)
for i, frame in capture.capture_continuous(interval=0.5):
# 每0.5秒捕获一帧
print(f"捕获帧 {i}, 时间戳: {frame.timestamp}")
if i >= 20: # 捕获20帧后停止
break
区域检测辅助
from shannon.vision import RegionDetector
detector = RegionDetector()
# 找出页面上的可交互元素
regions = detector.find_interactive_regions(screenshot)
for region in regions:
print(f"元素类型: {region.type}")
print(f"位置: ({region.x}, {region.y})")
print(f"尺寸: {region.width}x{region.height}")
print(f"文本内容: {region.text}")
print("---")
2. BrowserController:浏览器控制模块
BrowserController提供了对浏览器的完整控制能力,是Web自动化场景的核心组件。
初始化与基本操作
from shannon.browser import BrowserController
# 启动浏览器
browser = BrowserController(
headless=False, # True=无头模式,False=可见浏览器
context_dir="./browser_context" # 浏览器上下文存储目录
)
# 打开网页
browser.navigate_to("https://www.example.com")
# 获取当前页面截图
current_state = browser.take_screenshot()
# 返回上一页
browser.go_back()
# 前进到下一页
browser.go_forward()
# 刷新当前页面
browser.refresh()
# 关闭浏览器
browser.close()
元素定位与交互
Shannon支持多种元素定位策略,包括视觉定位和语义定位:
from shannon.browser import LocatorType
# 通过文本内容定位(最常用的方式)
search_input = browser.find_element(
"搜索框", # 元素的描述性名称
locator_type=LocatorType.VISUAL_TEXT
)
# 通过ARIA标签定位
button = browser.find_element(
"提交按钮",
locator_type=LocatorType.ARIA_LABEL
)
# 通过CSS选择器定位
element = browser.find_element(
"#main-content > div.form",
locator_type=LocatorType.CSS
)
# 通过 XPath 定位
element = browser.find_element(
"//button[contains(@class, 'submit')]",
locator_type=LocatorType.XPATH
)
执行用户操作
# 点击元素
browser.click(element)
# 双击元素
browser.double_click(element)
# 右键点击(上下文菜单)
browser.right_click(element)
# 输入文本
browser.type_text(element, "要输入的文字")
# 清空输入框并输入新内容
browser.clear_and_type(element, "新的内容")
# 悬停到元素上(hover)
browser.hover(element)
# 滚动页面
browser.scroll(direction="down", amount=3) # 向下滚动3页
browser.scroll(direction="up", amount=1) # 向上滚动1页
browser.scroll(direction="left", amount=2) # 向左滚动2页
# 按键盘快捷键
browser.press_key("Control", "c") # Ctrl+C 复制
browser.press_key("Control", "v") # Ctrl+V 粘贴
browser.press_key("Escape") # 按Escape键
browser.press_key("Enter") # 按回车键
处理弹窗和iframe
# 处理JavaScript alert弹窗
browser.handle_alert(accept=True, prompt_text="可选的输入内容")
# 处理确认对话框
browser.handle_confirm(accept=False) # 取消确认
# 切换到iframe
browser.switch_to_frame("iframe_name")
browser.switch_to_frame(frame_index=0)
# 切换回主文档
browser.switch_to_main_content()
# 处理新打开的标签页
browser.switch_to_new_tab()
current_tabs = browser.get_all_tabs()
print(f"当前共有 {len(current_tabs)} 个标签页")
browser.close_current_tab()
browser.switch_to_tab(0) # 切换到第一个标签页
3. Agent:智能决策代理
Agent是Shannon的大脑,它将视觉理解、自然语言推理和操作执行整合在一起,形成完整的自动化工作流。
创建代理实例
from shannon.agent import Agent, AgentConfig
from anthropic import Anthropic # 或使用其他模型提供商
# 配置API密钥(以Anthropic为例)
import os
os.environ["ANTHROPIC_API_KEY"] = "your-api-key-here"
# 创建代理配置
config = AgentConfig(
model_provider="anthropic",
model_name="claude-sonnet-4-20250514",
max_tokens=4096,
temperature=0.7,
confirmation_mode=True, # 开启操作确认,确保安全
screenshot_on_decision=True, # 决策前自动截图
verbose=True
)
# 创建代理实例
agent = Agent(
config=config,
browser=browser, # 关联浏览器控制器
capture=capture # 关联屏幕捕获器
)
定义任务
# 使用自然语言描述任务
task_description = """
请帮我完成以下工作:
1. 打开Google搜索
2. 搜索"人工智能最新发展"
3. 点击第一个搜索结果
4. 阅读页面内容,总结文章要点
5. 将总结保存到本地文件 summary.txt
"""
# 执行任务
result = agent.execute(task_description)
print(f"任务执行状态: {result.status}")
print(f"执行步数: {result.steps_taken}")
print(f"执行时间: {result.duration:.2f}秒")
print(f"最终结果:\n{result.final_output}")
任务执行结果结构
# 查看详细的执行日志
print("=== 详细执行日志 ===")
for step in result.execution_trace:
print(f"\n步骤 {step.number}: {step.action}")
print(f"思考过程: {step.thinking}")
print(f"执行结果: {step.outcome}")
print(f"截图: {step.screenshot_path}")
4. VisionProcessor:视觉理解模块
VisionProcessor负责分析和理解屏幕内容,提取关键信息。
图像分析
from shannon.vision import VisionProcessor
processor = VisionProcessor()
# 分析截图内容
analysis = processor.analyze_image(screenshot)
print("=== 页面分析结果 ===")
print(f"页面标题: {analysis.page_title}")
print(f"主要语言: {analysis.language}")
print(f"主要内容类型: {analysis.content_type}")
# 列出检测到的UI元素
print("\n=== 检测到的UI元素 ===")
for element in analysis.detected_elements:
print(f"[{element.role}] {element.text}")
print(f" 位置: ({element.bbox.x1}, {element.bbox.y1}) - ({element.bbox.x2}, {element.bbox.y2})")
OCR文本提取
# 提取图片中的文字(OCR功能)
extracted_text = processor.extract_text(screenshot, languages=["zh", "en"])
print("提取的文本内容:")
print(extracted_text)
# 提取特定区域的文字
region_text = processor.extract_text_from_region(
screenshot,
region={"x": 100, "y": 200, "width": 500, "height": 100},
languages=["zh", "en"]
)
print(f"指定区域文本: {region_text}")
表格识别
# 识别截图中的表格
tables = processor.extract_tables(screenshot)
for i, table in enumerate(tables):
print(f"\n=== 表格 {i+1} ===")
print(f"行数: {len(table.rows)}")
print(f"列数: {len(table.columns)}")
print("\n表格内容:")
for row in table.rows:
print(" | ".join(str(cell) for cell in row))
5. Memory:上下文记忆模块
Memory模块帮助Agent维护跨步骤的上下文信息,特别适合处理需要多轮交互的复杂任务。
基本用法
from shannon.memory import Memory, MemoryConfig
# 创建记忆模块
memory_config = MemoryConfig(
max_history=100, # 保留最近100步的记忆
summary_interval=10, # 每10步生成一次记忆摘要
include_screenshots=True, # 是否保存截图
include_thinking=True # 是否保存思考过程
)
memory = Memory(config=memory_config)
# 在Agent中使用记忆
agent = Agent(
config=config,
browser=browser,
capture=capture,
memory=memory
)
# 手动添加记忆
memory.add("用户偏好:喜欢简洁的界面设计")
memory.add("已完成:登录流程", screenshot="path/to/screenshot.png")
# 查询记忆
relevant_memories = memory.search("用户偏好")
print(f"相关记忆: {relevant_memories}")
# 查看记忆摘要
print(f"当前记忆总数: {memory.count()}")
print(f"记忆摘要:\n{memory.get_summary()}")
实战教程:构建自动化新闻助手
现在让我们通过一个完整的实战项目,将上述所有模块串联起来。
项目目标
我们要构建一个自动化新闻助手,它能够:
- 自动打开新闻网站
- 搜索用户指定的主题
- 抓取相关文章标题和摘要
- 生成阅读报告保存到本地
第一步:项目结构设计
# news_assistant.py
# ========================================
# 新闻助手主程序
# ========================================
import os
import json
from datetime import datetime
from shannon.browser import BrowserController
from shannon.vision import ScreenCapture, VisionProcessor
from shannon.agent import Agent, AgentConfig
from shannon.memory import Memory, MemoryConfig
class NewsAssistant:
"""自动化新闻助手"""
def __init__(self, config_path=None):
# 初始化组件
self.browser = BrowserController(headless=False)
self.capture = ScreenCapture()
self.processor = VisionProcessor()
# 配置记忆模块
memory_config = MemoryConfig(
max_history=50,
summary_interval=5,
include_screenshots=False
)
self.memory = Memory(config=memory_config)
# 配置Agent
agent_config = AgentConfig(
model_provider="anthropic",
model_name="claude-sonnet-4-20250514",
max_iterations=30,
confirmation_mode=False, # 自动化任务不需要确认
verbose=True
)
self.agent = Agent(
config=agent_config,
browser=self.browser,
capture=self.capture,
memory=self.memory
)
# 输出目录
self.output_dir = "./news_reports"
os.makedirs(self.output_dir, exist_ok=True)
def search_and_summarize(self, topic, max_articles=5):
"""
搜索主题新闻并生成摘要报告
Args:
topic: 搜索主题
max_articles: 最大文章数量
"""
# 构建任务描述
task = f"""
请帮我完成以下新闻搜索和摘要任务:
1. 打开百度新闻(news.baidu.com)
2. 在搜索框中输入「{topic}」并执行搜索
3. 等待搜索结果加载完成
4. 识别并点击前{max_articles}个新闻链接
5. 对每篇文章,记录:
- 文章标题
- 来源网站
- 发布日期(如果可见)
- 前两段内容摘要
6. 返回完整的新闻信息列表
注意:
- 如果遇到需要登录的页面,尝试寻找"登录后阅读全文"的提示
- 如果某个链接无法访问,记录错误并继续处理下一个
- 确保每个页面都有足够时间加载
"""
# 执行搜索任务
result = self.agent.execute(task)
# 处理结果
articles = self._parse_search_results(result)
# 生成报告
report = self._generate_report(topic, articles)
return report
def _parse_search_results(self, result):
"""解析搜索结果"""
articles = []
# 从Agent输出中提取文章信息
for step in result.execution_trace:
if "article" in step.action.lower():
try:
article_info = self._extract_article_info(step.outcome)
articles.append(article_info)
except Exception as e:
print(f"解析文章信息时出错: {e}")
return articles
def _extract_article_info(self, text):
"""从文本中提取文章信息"""
# 这里简化处理,实际项目中可能需要更复杂的解析逻辑
info = {
"title": "",
"source": "",
"date": "",
"summary": ""
}
lines = text.split("\n")
for line in lines:
if "标题" in line or "title" in line.lower():
info["title"] = line.split(":")[-1].strip()
elif "来源" in line or "source" in line.lower():
info["source"] = line.split(":")[-1].strip()
elif "日期" in line or "date" in line.lower():
info["date"] = line.split(":")[-1].strip()
elif "摘要" in line or "summary" in line.lower():
info["summary"] = line.split(":")[-1].strip()
return info
def _generate_report(self, topic, articles):
"""生成新闻报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{self.output_dir}/{topic}_{timestamp}.json"
report = {
"topic": topic,
"generated_at": datetime.now().isoformat(),
"total_articles": len(articles),
"articles": articles
}
# 保存JSON报告
with open(filename, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 同时生成Markdown格式报告
md_filename = f"{self.output_dir}/{topic}_{timestamp}.md"
with open(md_filename, "w", encoding="utf-8") as f:
f.write(f"# {topic} 新闻报告\n\n")
f.write(f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"共抓取 {len(articles)} 篇文章\n\n")
f.write("---\n\n")
for i, article in enumerate(articles, 1):
f.write(f"## {i}. {article.get('title', '未知标题')}\n\n")
f.write(f"- **来源**: {article.get('source', '未知')}\n")
f.write(f"- **日期**: {article.get('date', '未知')}\n\n")
f.write(f"### 摘要\n\n")
f.write(f"{article.get('summary', '无')}\n\n")
f.write("---\n\n")
print(f"报告已生成:")
print(f" JSON格式: {filename}")
print(f" Markdown格式: {md_filename}")
return report
def close(self):
"""清理资源"""
self.browser.close()
print("资源已清理")
# ========================================
# 程序入口
# ========================================
def main():
"""主函数"""
assistant = NewsAssistant()
try:
# 获取用户输入
topic = input("请输入要搜索的新闻主题: ").strip()
if not topic:
print("未输入有效主题,程序退出")
return
# 执行搜索
print(f"\n开始搜索「{topic}」相关新闻...\n")
report = assistant.search_and_summarize(topic, max_articles=5)
print(f"\n任务完成!共处理 {report['total_articles']} 篇文章")
except KeyboardInterrupt:
print("\n用户中断程序")
except Exception as e:
print(f"程序执行出错: {e}")
raise
finally:
assistant.close()
if __name__ == "__main__":
main()
第二步:扩展功能 – 增量更新
# incremental_updater.py
from datetime import datetime, timedelta
from shannon.memory import Memory
class IncrementalUpdater:
"""增量更新器 - 只获取最新新闻"""
def __init__(self, memory: Memory):
self.memory = memory
self.last_update_key = "last_news_update"
def should_update(self, check_interval_hours=1):
"""
判断是否需要更新
Args:
check_interval_hours: 检查间隔(小时)
"""
last_update = self.memory.search(self.last_update_key)
if not last_update:
return True
try:
last_time = datetime.fromisoformat(last_update[0])
elapsed = datetime.now() - last_time
return elapsed > timedelta(hours=check_interval_hours)
except:
return True
def mark_updated(self):
"""标记为已更新"""
self.memory.add(
f"{self.last_update_key}: {datetime.now().isoformat()}"
)
第三步:添加错误处理和重试机制
# retry_utils.py
import time
from functools import wraps
from typing import Callable, Any, Optional
class RetryConfig:
"""重试配置"""
def __init__(
self,
max_attempts: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
exponential_base: float = 2.0
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
def retry_on_exception(config: Optional[RetryConfig] = None):
"""
装饰器:在发生异常时自动重试
Example:
@retry_on_exception(RetryConfig(max_attempts=5))
def fetch_page(url):
# 可能失败的代码
pass
"""
if config is None:
config = RetryConfig()
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(1, config.max_attempts + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < config.max_attempts:
delay = min(
config.base_delay * (config.exponential_base ** (attempt - 1)),
config.max_delay
)
print(f"尝试 {attempt} 失败,{delay:.1f}秒后重试...")
time.sleep(delay)
else:
print(f"已达到最大重试次数 {config.max_attempts}")
raise last_exception
return wrapper
return decorator
# 使用示例
class RobustBrowserController:
"""带重试机制的浏览器控制器"""
def __init__(self, browser):
self.browser = browser
self.retry_config = RetryConfig(
max_attempts=3,
base_delay=2.0,
max_delay=30.0
)
@retry_on_exception()
def safe_navigate(self, url):
"""安全导航(带重试)"""
self.browser.navigate_to(url)
# 验证页面是否加载成功
screenshot = self.browser.take_screenshot()
if self._is_page_loaded(screenshot):
return True
raise Exception("页面未正确加载")
@retry_on_exception()
def safe_click(self, element):
"""安全点击(带重试)"""
self.browser.click(element)
# 等待操作完成
time.sleep(1)
return True
def _is_page_loaded(self, screenshot):
"""检查页面是否加载完成"""
# 实现页面加载检测逻辑
# 可以检查是否存在loading动画、错误提示等
return True
常见使用场景
场景一:自动化数据采集
Shannon在数据采集场景中有着天然的优势,特别是对于那些没有开放API的网站。
# data_collector.py
from shannon.browser import BrowserController
from shannon.vision import VisionProcessor
from shannon.agent import Agent, AgentConfig
class ProductDataCollector:
"""电商产品数据采集器"""
def __init__(self):
self.browser = BrowserController(headless=True)
self.vision = VisionProcessor()
self.agent = Agent(
config=AgentConfig(confirmation_mode=False),
browser=self.browser
)
def collect_product_info(self, product_url):
"""
从商品页面采集信息
采集字段:
- 商品名称
- 价格
- 销量
- 评论数
- 商品评分
- 店铺名称
"""
task = f"""
请从以下商品页面采集完整的产品信息:
URL: {product_url}
需要采集的字段:
1. 商品标题/名称
2. 当前价格(注意区分原价和促销价)
3. 累计销量
4. 商品评价数量
5. 评分星级(如果有)
6. 店铺/商家名称
7. 商品规格参数(如颜色、尺寸等)
返回格式化为结构化的JSON数据。
"""
result = self.agent.execute(task)
return self._parse_to_json(result.final_output)
def collect_search_results(self, keyword, pages=5):
"""
采集搜索结果页面
Args:
keyword: 搜索关键词
pages: 采集页数
"""
self.browser.navigate_to(f"https://search.example.com?q={keyword}")
all_products = []
for page in range(1, pages + 1):
print(f"正在采集第 {page}/{pages} 页...")
# 采集当前页
task = f"""
请识别当前页面上的所有商品列表,
提取每个商品的:标题、价格、链接。
返回JSON数组格式。
"""
result = self.agent.execute(task)
products = self._parse_product_list(result.final_output)
all_products.extend(products)
# 翻到下一页
if page < pages:
next_button = self.browser.find_element("下一页按钮")
self.browser.click(next_button)
self.browser.wait_for_page_load()
return all_products
def _parse_to_json(self, text):
"""解析文本为JSON"""
import json
import re
# 尝试提取JSON内容
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except:
pass
return {"raw_text": text}
def _parse_product_list(self, text):
"""解析商品列表"""
import json
import re
try:
# 尝试直接解析JSON
return json.loads(text)
except:
# 如果不是JSON,尝试提取其中的列表部分
list_match = re.search(r'\[.*\]', text, re.DOTALL)
if list_match:
try:
return json.loads(list_match.group())
except:
pass
return []
场景二:自动化表单填写
表单填写是日常工作中最耗时的任务之一,Shannon可以帮你自动化这类操作。
# form_automation.py
from shannon.browser import BrowserController, LocatorType
class FormAutomator:
"""表单自动化填写器"""
def __init__(self, browser: BrowserController):
self.browser = browser
def fill_contact_form(self, form_url, data):
"""
自动填写联系表单
Args:
form_url: 表单页面URL
data: 表单数据字典
"""
self.browser.navigate_to(form_url)
# 姓名
if "name" in data:
field = self.browser.find_element("姓名输入框")
self.browser.type_text(field, data["name"])
# 邮箱
if "email" in data:
field = self.browser.find_element("邮箱输入框")
self.browser.type_text(field, data["email"])
# 电话
if "phone" in data:
field = self.browser.find_element("电话输入框")
self.browser.type_text(field, data["phone"])
# 下拉选择框
if "country" in data:
dropdown = self.browser.find_element("国家选择框")
self.browser.click(dropdown)
option = self.browser.find_element(data["country"])
self.browser.click(option)
# 多行文本
if "message" in data:
field = self.browser.find_element("留言内容框")
self.browser.type_text(field, data["message"])
# 单选按钮
if "contact_method" in data:
radio = self.browser.find_element(data["contact_method"])
self.browser.click(radio)
# 复选框
if data.get("subscribe"):
checkbox = self.browser.find_element("订阅新闻通讯复选框")
if not self._is_checked(checkbox):
self.browser.click(checkbox)
# 截图保存填写状态
self.browser.take_screenshot("form_filled.png")
# 找到并点击提交按钮
submit_button = self.browser.find_element("提交按钮")
self.browser.click(submit_button)
# 等待处理并检查结果
return self._check_submission_result()
def _is_checked(self, checkbox):
"""检查复选框是否已选中"""
# 实际实现需要检查元素的checked属性
return False
def _check_submission_result(self):
"""检查提交结果"""
# 等待可能出现的结果页面
import time
time.sleep(2)
screenshot = self.browser.take_screenshot()
# 简单判断是否成功(实际需要更复杂的逻辑)
if "成功" in self.browser.get_page_text():
return {"success": True, "message": "表单提交成功"}
elif "错误" in self.browser.get_page_text():
return {"success": False, "message": "表单提交失败"}
else:
return {"success": None, "message": "无法确定提交结果"}
场景三:智能测试助手
在软件开发过程中,Shannon可以作为自动化测试的辅助工具。
# test_assistant.py
from shannon.browser import BrowserController
from shannon.agent import Agent, AgentConfig
import json
from datetime import datetime
class UITestAssistant:
"""UI自动化测试助手"""
def __init__(self):
self.browser = BrowserController(headless=False)
self.results = []
def run_smoke_test(self, app_url):
"""
运行冒烟测试
测试项目:
1. 页面是否正常加载
2. 导航菜单是否可点击
3. 关键功能入口是否存在
4. 登录流程是否正常
"""
test_cases = [
{
"name": "页面加载测试",
"task": "打开页面,检查是否有错误提示"
},
{
"name": "导航测试",
"task": "检查页面导航菜单是否存在且可点击"
},
{
"name": "搜索功能测试",
"task": "找到搜索框,输入'test'并提交,检查是否有搜索结果"
},
{
"name": "页面滚动测试",
"task": "滚动页面到最底部,检查内容是否正常显示"
}
]
results = []
for case in test_cases:
print(f"\n正在执行: {case['name']}")
result = self._run_single_test(app_url, case["task"])
results.append({
"name": case["name"],
"passed": result["passed"],
"details": result["details"],
"screenshot": result.get("screenshot")
})
return self._generate_test_report(results)
def _run_single_test(self, url, task):
"""运行单个测试用例"""
agent = Agent(
config=AgentConfig(confirmation_mode=False, verbose=False),
browser=self.browser
)
full_task = f"""
访问 {url} 并执行以下测试:
{task}
记录:
1. 测试是否通过
2. 具体观察到的现象
3. 发现的问题(如果有)
"""
try:
result = agent.execute(full_task)
screenshot = self.browser.take_screenshot()
# 简单判断是否通过
passed = "错误" not in result.final_output and "失败" not in result.final_output
return {
"passed": passed,
"details": result.final_output,
"screenshot": screenshot
}
except Exception as e:
return {
"passed": False,
"details": f"测试执行出错: {str(e)}",
"screenshot": None
}
def _generate_test_report(self, results):
"""生成测试报告"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"test_report_{timestamp}.json"
passed_count = sum(1 for r in results if r["passed"])
total_count = len(results)
report = {
"timestamp": timestamp,
"summary": {
"total": total_count,
"passed": passed_count,
"failed": total_count - passed_count,
"pass_rate": f"{passed_count/total_count*100:.1f}%"
},
"details": results
}
with open(report_file, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n测试完成!")
print(f"总计: {total_count} 个测试")
print(f"通过: {passed_count}")
print(f"失败: {total_count - passed_count}")
print(f"通过率: {report['summary']['pass_rate']}")
print(f"详细报告: {report_file}")
return report
场景四:文档处理工作流
# document_workflow.py
from shannon.browser import BrowserController
from shannon.agent import Agent, AgentConfig
from shannon.vision import VisionProcessor
import os
class DocumentWorkflow:
"""文档处理工作流"""
def __init__(self):
self.browser = BrowserController(headless=True)
self.vision = VisionProcessor()
self.output_dir = "./processed_documents"
os.makedirs(self.output_dir, exist_ok=True)
def process_invoice(self, invoice_image_path):
"""
从发票图片中提取信息
提取字段:
- 发票代码
- 发票号码
- 开票日期
- 购买方信息
- 销售方信息
- 商品明细
- 金额合计
- 税额
"""
# 读取发票图片
from PIL import Image
image = Image.open(invoice_image_path)
# 预处理(可选)
# image = self._preprocess_image(image)
# 使用OCR提取文本
text = self.vision.extract_text(image, languages=["zh", "en"])
# 使用AI解析结构化信息
agent = Agent(
config=AgentConfig(confirmation_mode=False)
)
task = f"""
请从以下发票文本中提取结构化信息:
{text}
返回JSON格式的发票信息,包含:
- invoice_code (发票代码)
- invoice_number (发票号码)
- invoice_date (开票日期)
- buyer (购买方信息)
- seller (销售方信息)
- items (商品明细列表)
- total_amount (金额合计)
- tax_amount (税额)
"""
result = agent.execute(task)
return self._parse_invoice_data(result.final_output)
def batch_process_documents(self, input_dir, output_format="json"):
"""
批量处理文件夹中的文档
Args:
input_dir: 输入文件夹路径
output_format: 输出格式(json/csv/excel)
"""
supported_extensions = [".png", ".jpg", ".jpeg", ".pdf"]
files = [
f for f in os.listdir(input_dir)
if os.path.splitext(f)[1].lower() in supported_extensions
]
results = []
for i, file in enumerate(files, 1):
print(f"处理中 [{i}/{len(files)}]: {file}")
file_path = os.path.join(input_dir, file)
try:
# 根据文件类型选择处理方法
if file.lower().endswith(".pdf"):
# PDF处理
result = self._process_pdf(file_path)
else:
# 图片处理
result = self.process_invoice(file_path)
results.append({
"file": file,
"success": True,
"data": result
})
except Exception as e:
results.append({
"file": file,
"success": False,
"error": str(e)
})
# 保存批量处理结果
output_file = os.path.join(
self.output_dir,
f"batch_result_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
)
with open(output_file, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
success_count = sum(1 for r in results if r["success"])
print(f"\n批量处理完成!成功: {success_count}/{len(files)}")
return results
def _process_pdf(self, pdf_path):
"""处理PDF文件"""
# PDF处理逻辑
# 可以使用PyMuPDF或pdfplumber等库
pass
技巧与最佳实践
1. 性能优化技巧
在实际项目中,性能往往是我们需要重点关注的问题。以下是一些优化建议:
减少不必要的截图
# 不推荐:频繁截图
for i in range(100):
screenshot = capture.capture_full_screen() # 每次迭代都截图
agent.process(screenshot)
# 推荐:按需截图
agent = Agent(
config=AgentConfig(
screenshot_on_decision=True, # 仅在决策时截图
screenshot_on_error=True # 仅在出错时截图
)
)
# 或者手动控制截图频率
capture = ScreenCapture()
screenshot_count = 0
for i in range(100):
# 只在必要时截图
if should_capture():
screenshot = capture.capture_full_screen()
screenshot_count += 1
agent.process(state)
使用缓存
from functools import lru_cache
class OptimizedAgent:
"""优化后的Agent"""
def __init__(self):
self._page_cache = {}
self._analysis_cache = {}
@lru_cache(maxsize=100)
def _get_cached_analysis(self, screenshot_hash):
"""缓存页面分析结果"""
return self._analyze_page(screenshot_hash)
def process_step(self, step_data):
"""处理步骤(带缓存)"""
cache_key = hash(step_data)
if cache_key in self._page_cache:
return self._page_cache[cache_key]
# 执行实际处理
result = self._do_processing(step_data)
# 缓存结果
self._page_cache[cache_key] = result
return result
2. 可靠性提升技巧
添加状态检查
def safe_operation(operation_func, max_retries=3):
"""
安全操作包装器
在执行关键操作前检查前置条件,
操作后验证结果
"""
def wrapper(*args, **kwargs):
# 检查前置条件
if not check_prerequisites():
raise PrerequisitesNotMetError()
last_error = None
for attempt in range(max_retries):
try:
result = operation_func(*args, **kwargs)
# 验证结果
if validate_result(result):
return result
else:
raise ResultValidationError("结果验证失败")
except Exception as e:
last_error = e
if attempt < max_retries - 1:
time.sleep(1 * (attempt + 1)) # 递增等待
continue
raise last_error
return wrapper
class RobustBrowserController:
"""健壮的浏览器控制器"""
def __init__(self):
self.browser = BrowserController()
self.state_verifier = StateVerifier()
def click_with_verification(self, element, expected_state):
"""
带状态验证的点击操作
Args:
element: 要点击的元素
expected_state: 期望点击后的页面状态
"""
# 获取点击前状态
before_screenshot = self.browser.take_screenshot()
# 执行点击
self.browser.click(element)
# 等待状态变化
time.sleep(0.5)
# 获取点击后状态
after_screenshot = self.browser.take_screenshot()
# 验证状态变化
if self.state_verifier.verify_change(before_screenshot, after_screenshot, expected_state):
return True
# 状态未按预期变化,尝试恢复
return self._handle_state_mismatch(element)
def _handle_state_mismatch(self, element):
"""处理状态不匹配的情况"""
# 重试点击
self.browser.click(element)
time.sleep(1)
# 或者尝试JavaScript点击
self.browser.execute_javascript("arguments[0].click();", element)
return True
优雅降级处理
class GracefulDegradationHandler:
"""优雅降级处理器"""
def __init__(self):
self.fallback_strategies = {}
def register_fallback(self, operation, fallback_func):
"""注册降级策略"""
self.fallback_strategies[operation] = fallback_func
def execute_with_fallback(self, primary_func, operation_name):
"""执行带降级的操作"""
try:
# 尝试主要方法
return primary_func()
except PrimaryMethodError as e:
print(f"主要方法失败,尝试降级策略: {e}")
if operation_name in self.fallback_strategies:
return self.fallback_strategies[operation_name]()
else:
raise NoFallbackAvailableError(f"没有可用的降级策略: {operation_name}")
# 使用示例
handler = GracefulDegradationHandler()
# 注册降级策略
handler.register_fallback("element_location", lambda: None)
handler.register_fallback("ocr_extraction", lambda: "OCR降级: 使用默认文本")
handler.register_fallback("page_load", lambda: None)
3. 调试技巧
开启详细日志
import logging
# 配置详细日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('shannon_debug.log'),
logging.StreamHandler()
]
)
# 在代码中使用
logger = logging.getLogger(__name__)
def debug_operation(step_name):
"""调试装饰器"""
def decorator(func):
def wrapper(*args, **kwargs):
logger.debug(f"开始执行: {step_name}")
screenshot = capture.capture_full_screen()
logger.debug(f"当前截图已保存: {screenshot}")
result = func(*args, **kwargs)
logger.debug(f"完成执行: {step_name}, 结果: {result}")
return result
return wrapper
return decorator
@debug_operation("表单填写")
def fill_form(data):
# 填写表单逻辑
pass
保存调试快照
class DebugSnapshotManager:
"""调试快照管理器"""
def __init__(self, output_dir="./debug_snapshots"):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.snapshots = []
def save_snapshot(self, name, screenshot, state_info=None):
"""
保存调试快照
快照包含:
- 截图
- 状态信息
- 时间戳
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
snapshot_name = f"{name}_{timestamp}"
# 保存截图
screenshot_path = os.path.join(
self.output_dir,
f"{snapshot_name}.png"
)
self._save_image(screenshot, screenshot_path)
# 保存状态信息
state_path = os.path.join(
self.output_dir,
f"{snapshot_name}_state.json"
)
state_data = {
"name": name,
"timestamp": timestamp,
"state": state_info or {}
}
with open(state_path, "w", encoding="utf-8") as f:
json.dump(state_data, f, ensure_ascii=False, indent=2)
self.snapshots.append({
"name": snapshot_name,
"screenshot": screenshot_path,
"state": state_path
})
return snapshot_name
def generate_debug_report(self):
"""生成调试报告"""
report_path = os.path.join(self.output_dir, "debug_report.html")
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Shannon调试报告</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.snapshot { margin: 20px 0; border: 1px solid #ccc; padding: 10px; }
.snapshot img { max-width: 100%; }
</style>
</head>
<body>
<h1>Shannon调试报告</h1>
<p>生成时间: {timestamp}</p>
""".format(timestamp=datetime.now().isoformat())
for snap in self.snapshots:
html_content += f"""
<div class="snapshot">
<h2>{snap['name']}</h2>
<img src="{os.path.basename(snap['screenshot'])}" alt="截图">
<p>状态文件: {os.path.basename(snap['state'])}</p>
</div>
"""
html_content += "</body></html>"
with open(report_path, "w", encoding="utf-8") as f:
f.write(html_content)
return report_path
4. 安全使用建议
敏感信息处理
from shannon.security import SecretsManager
class SecureConfig:
"""安全配置管理"""
@staticmethod
def load_secrets(secrets_path=".secrets"):
"""
加载敏感配置
建议:
1. 使用环境变量或密钥管理服务
2. 不要将secrets文件提交到版本控制
3. 定期轮换密钥
"""
secrets = {}
# 从环境变量读取
api_key = os.environ.get("ANTHROPIC_API_KEY")
if api_key:
secrets["api_key"] = api_key
# 或者从加密文件读取
secrets_file = os.path.join(secrets_path, "config.enc")
if os.path.exists(secrets_file):
# 解密并读取
# 这里需要使用实际的加密库
pass
return secrets
@staticmethod
def mask_sensitive_data(data):
"""掩码敏感数据(用于日志输出)"""
import re
masked = str(data)
# 掩码API密钥
masked = re.sub(
r'(api[_-]?key["\']?\s*[:=]\s*["\']?)([a-zA-Z0-9_-]{8})([a-zA-Z0-9_-]{4})',
r'\1***\3',
masked
)
# 掩码邮箱
masked = re.sub(
r'([a-zA-Z0-9._%+-]+)@([a-zA-Z0-9.-]+\.[a-zA-Z]{2,})',
lambda m: f"{m.group(1)[:2]}***@{m.group(2)}",
masked
)
return masked
项目集成与扩展
与现有系统集成
REST API封装
# api_server.py
from flask import Flask, request, jsonify
from shannon.browser import BrowserController
from shannon.agent import Agent, AgentConfig
app = Flask(__name__)
# 全局资源管理
browser = None
agent = None
def init_resources():
"""初始化全局资源"""
global browser, agent
browser = BrowserController(headless=True)
agent = Agent(
config=AgentConfig(confirmation_mode=False),
browser=browser
)
@app.route("/api/execute", methods=["POST"])
def execute_task():
"""
执行任务API
请求体:
{
"task": "任务描述",
"options": {
"timeout": 300,
"screenshot": true
}
}
"""
data = request.get_json()
if not data or "task" not in data:
return jsonify({"error": "缺少task参数"}), 400
task = data["task"]
options = data.get("options", {})
try:
result = agent.execute(task)
response = {
"success": True,
"result": result.final_output,
"steps": len(result.execution_trace)
}
if options.get("screenshot"):
screenshot_path = browser.take_screenshot()
response["screenshot"] = screenshot_path
return jsonify(response)
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/browser/screenshot", methods=["GET"])
def get_screenshot():
"""获取当前屏幕截图"""
try:
screenshot = browser.take_screenshot()
return jsonify({
"success": True,
"path": screenshot
})
except Exception as e:
return jsonify({
"success": False,
"error": str(e)
}), 500
@app.route("/api/cleanup", methods=["POST"])
def cleanup():
"""清理资源"""
global browser
if browser:
browser.close()
return jsonify({"success": True})
if __name__ == "__main__":
init_resources()
app.run(host="0.0.0.0", port=5000)
与LangChain集成
# langchain_integration.py
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from shannon.agent import ShannonTool
# 创建Shannon工具
shannon_tool = ShannonTool(
name="browser_automation",
description="用于浏览器自动化操作,当需要打开网页、点击按钮、填写表单或抓取网页内容时使用",
browser=browser
)
# 创建LangChain代理
template = """
你是一个智能助手,可以使用工具来完成复杂任务。
可用工具:
{tools}
用户问题:{question}
请思考问题并决定是否需要使用工具。
"""
prompt = PromptTemplate(
template=template,
input_variables=["question", "tools"]
)
# 使用LangChain的代理
from langchain.agents import initialize_agent, AgentType
agent = initialize_agent(
tools=[shannon_tool],
llm=llm,
agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 执行自然语言命令
result = agent.run("帮我搜索ChatGPT最新资讯,然后总结前三条内容")
自定义扩展
创建自定义工具
from shannon.tools import BaseTool, ToolResult
class CustomScreenshotTool(BaseTool):
"""自定义截图工具"""
name = "custom_screenshot"
description = "执行自定义截图操作"
def __init__(self, capture: ScreenCapture):
self.capture = capture
def execute(self, **kwargs) -> ToolResult:
"""
执行截图
参数:
- region: 截图区域,格式为 {"x": 0, "y": 0, "width": 100, "height": 100}
- filename: 保存文件名
- format: 图片格式(png/jpg)
"""
region = kwargs.get("region")
filename = kwargs.get("filename", "screenshot")
img_format = kwargs.get("format", "png")
try:
if region:
screenshot = self.capture.capture_region(**region)
else:
screenshot = self.capture.capture_full_screen()
# 保存图片
output_path = f"{filename}.{img_format}"
self.capture.save_screenshot(screenshot, output_path)
return ToolResult(
success=True,
output=output_path,
message=f"截图已保存到: {output_path}"
)
except Exception as e:
return ToolResult(
success=False,
error=str(e)
)
def validate_params(self, params) -> bool:
"""参数验证"""
return True
总结与展望
项目核心价值回顾
通过本文的详细介绍,相信你已经对Shannon有了全面的认识。让我们简要回顾它的核心价值:
第一,降低AI应用开发门槛。 Shannon提供了直观的API和丰富的预置功能,让开发者无需深入了解底层自动化技术,就能构建强大的AI应用。无论是简单的网页操作还是复杂的多步骤工作流,都能通过简洁的代码实现。
第二,强大的视觉理解能力。 与传统的基于DOM或坐标的自动化工具不同,Shannon采用端到端的视觉理解方案。这使其能够完美适应各种界面变化,无论是响应式网页、动态内容还是全新的应用界面。
第三,可观测性与可控性。 Shannon在每个关键步骤都保留了完整的执行记录,包括截图、决策过程和操作结果。这不仅便于调试和追踪,也为AI行为的审计和优化提供了坚实基础。
第四,灵活的可扩展性。 从本文的示例可以看出,Shannon的设计充分考虑了扩展需求。无论是添加新的工具、集成不同的模型提供商,还是与现有系统对接,都有清晰的扩展路径。
未来发展方向
根据项目的发展动态和社区反馈,Shannon未来可能在以下方向持续演进:
多模态交互增强 – 进一步提升对复杂界面的理解能力,支持更多的交互模式,如手势操作、语音指令等。
模型无关架构 – 提供更抽象的接口层,使用户能够轻松切换不同的AI模型,获得更好的性价比或性能表现。
协作功能强化 – 支持多Agent协作处理复杂任务,通过分工配合提升处理效率。
企业级功能 – 完善权限管理、审计日志、高可用部署等企业级特性,满足生产环境需求。
相关资源链接
项目资源
- GitHub仓库:https://github.com/KeygraphHQ/shannon
- 官方文档:https://shannon.readthedocs.io
- 示例代码库:https://github.com/KeygraphHQ/shannon-examples
- 问题反馈:https://github.com/KeygraphHQ/shannon/issues
社区资源
- Discord社区:加入讨论,寻求帮助
- 技术博客:深入了解项目背后的设计理念
- 视频教程:直观的学习资源
相关AI项目推荐
如果Shannon引起了你的兴趣,以下几个相关项目也值得关注:
Playwright MCP – 微软开源的Model Context Protocol实现,为AI提供浏览器控制能力
AutoGPT – 自主AI Agent的先驱项目,展示了AI自主执行任务的潜力
LangChain – 大语言模型应用开发框架,Shannon可以与之良好集成
Browser Use – 另一个专注于AI浏览器自动化的开源项目
Puppeteer-Sharp – 如果你更倾向于传统的C#/。NET技术栈,这是.NET平台的浏览器自动化首选
结语
AI正在从”能说会道”向”能谋善断”进化,而像Shannon这样的工具正在推动这一进程。想象一下,当你需要完成重复性的电脑操作时,不再需要编写繁琐的脚本或录制宏,只需要用自然语言描述你的需求,AI就能帮你精准执行——这正是Shannon正在实现的目标。
无论你是AI应用开发者、自动化工程师,还是对新技术充满好奇的探索者,我都强烈建议你尝试一下Shannon。它不仅是一个强大的工具,更是一扇通往未来的窗户,让你亲身体验AI与计算机交互的无限可能。
现在就开始你的Shannon之旅吧!
评论区