别再手动爬网页了!NanoBrowser:让AI自己上网冲浪的开源利器
在大模型时代,AI助手的能力边界正在不断扩展。从最初的文本对话,到代码生成,再到图像理解,AI似乎已经能够完成绝大多数的数字任务。然而,有一个领域始终是AI的短板——那就是网页浏览与交互。
你有没有想过:如果AI能够像人类一样自由浏览网页、点击按钮、填写表单,那会是怎样的场景?这不是科幻,而是正在发生的事情。今天要介绍的这个开源项目,正是为了解决这个痛点而生。
它叫做NanoBrowser,一个专为AI时代设计的轻量级浏览器自动化框架。
为什么值得关注 / Why This Project Matters
传统浏览器自动化的困境
在介绍NanoBrowser之前,让我们先回顾一下现有的浏览器自动化方案存在哪些问题。
Selenium是浏览器自动化领域的老牌选手,功能强大但问题也很明显:体积庞大、启动缓慢、资源消耗惊人。对于需要频繁创建浏览器实例的AI应用来说,Selenium就像是用卡车来送快递——大材小用,效率低下。
Playwright和Puppeteer虽然相对轻量,但在与AI系统的集成方面做得并不够好。它们更多是为人设计的工具,而不是为AI设计的接口。
AI原生浏览的需求
当大语言模型开始被用于自动化任务时,一个新的需求浮出水面:AI需要一种更简单、更自然的方式来与网页交互。
想象这样的场景:一个AI助手需要帮你完成以下任务——
- 在某个购物网站搜索商品并比较价格
- 登录某个后台系统获取数据
- 自动填写并提交表单
- 抓取特定网页的信息进行分析
传统的做法需要大量的人工编码:解析HTML、定位元素、处理各种异常情况。这对于AI来说太复杂了,而且不够灵活。
NanoBrowser的设计理念
NanoBrowser正是为了解决这个问题而诞生的。它有几个核心理念:
轻量级:整个库非常小,依赖少,启动快
AI友好:提供简洁直观的API,让AI能够自然地表达浏览意图
可扩展:易于集成到各种AI框架和应用中
可靠性:处理了各种真实网页中的边界情况
这个项目的出现,标志着浏览器自动化从“为人设计”向“为AI设计”的转变。它不是要替代Selenium这样的传统工具,而是开辟了一个全新的赛道——AI时代的网页交互。
Getting Started / 环境搭建
系统要求
在开始之前,确保你的开发环境满足以下要求:
- Python 3.8 或更高版本
- 稳定的网络连接
- 操作系统:Windows、macOS 或 Linux 均可
安装步骤
NanoBrowser的安装非常简单。打开终端,执行以下命令:
pip install nanobrowser
如果你想安装最新版本(包含最新的开发功能),可以使用:
pip install git+https://github.com/nanobrowser/nanobrowser.git
安装完成后,可以通过以下命令验证安装是否成功:
python -c "import nanobrowser; print(nanobrowser.__version__)"
如果看到版本号输出,说明安装成功。
依赖说明
NanoBrowser的核心依赖包括:
requests:用于发送HTTP请求
beautifulsoup4:用于解析HTML文档
lxml:提供更快的HTML/XML解析能力
selenium(可选):用于处理需要JavaScript渲染的复杂页面
基本导入
在你的Python代码中,这样导入NanoBrowser:
import nanobrowser
# 导入核心类
from nanobrowser import Browser, Page, Element
# 导入辅助工具
from nanobrowser.utils import parse_html, extract_text
一个简单的示例
在深入学习之前,先来看一个最简单的例子,感受NanoBrowser的使用风格:
from nanobrowser import Browser
# 创建浏览器实例
browser = Browser()
# 打开网页
browser.navigate("https://example.com")
# 获取页面标题
title = browser.title
print(f"页面标题: {title}")
# 获取页面内容
content = browser.content
print(f"页面内容前200字符: {content[:200]}")
# 关闭浏览器
browser.close()
这段代码展示了NanoBrowser的基本用法:创建实例、导航、获取内容、关闭。整个过程清晰明了,没有复杂的配置。
Core Features / 核心功能详解
- 智能页面导航
NanoBrowser的导航功能不仅仅是简单地打开URL,它提供了多种导航方式:
# 基本导航
browser.navigate("https://www.example.com")
# 带参数导航
browser.navigate(
"https://www.example.com/search",
params={"q": "关键词", "page": 1}
)
# POST请求导航
browser.navigate(
"https://www.example.com/login",
method="POST",
data={"username": "user", "password": "pass"}
)
智能等待
在真实网页环境中,页面元素可能需要时间加载。NanoBrowser提供了智能等待机制:
# 等待元素出现,最长等待10秒
element = browser.wait_for_element(
selector="#main-content",
timeout=10
)
# 等待元素可见
element = browser.wait_for_visible(
selector=".loading-spinner",
visible=False, # 等待元素消失
timeout=30
)
# 等待页面完全加载
browser.wait_for_load()
- 元素定位与交互
元素定位是浏览器自动化的核心。NanoBrowser支持多种定位方式:
# CSS选择器
submit_button = browser.find_element("button.submit-btn")
# XPath定位
link = browser.find_element("xpath://div[@class='nav']/a[1]")
# 文本内容定位
help_link = browser.find_element("text:帮助中心")
# 组合定位
element = browser.find_element(
"button.primary",
containing="提交" # 包含特定文本
)
元素交互
# 点击元素
button.click()
# 输入文本
input_field = browser.find_element("input[name='search']")
input_field.type("要搜索的内容")
# 清空并输入
input_field.clear().type("新内容")
# 下拉选择
select = browser.find_element("select#country")
select.select_option(value="CN") # 通过值选择
select.select_option(text="中国") # 通过文本选择
select.select_option(index=0) # 通过索引选择
# 复选框和单选框
checkbox = browser.find_element("input#agree")
checkbox.check() # 选中
checkbox.uncheck() # 取消选中
radio = browser.find_element("input[name='gender'][value='male']")
radio.select() # 选择单选框
- 智能内容提取
NanoBrowser提供了强大的内容提取功能,让数据抓取变得异常简单:
# 提取文本内容
text = element.text
# 提取属性值
link = browser.find_element("a.more-link")
url = link.get_attribute("href")
title = link.get_attribute("title")
# 提取多个元素
items = browser.find_elements(".product-item")
for item in items:
name = item.find_child(".name").text
price = item.find_child(".price").text
print(f"{name}: {price}")
# 提取表格数据
table = browser.find_element("table.data-table")
rows = table.extract_table(header=True)
for row in rows:
print(row)
高级提取器
from nanobrowser.extractors import (
ProductExtractor,
ArticleExtractor,
TableExtractor,
ListExtractor
)
# 商品信息提取器
product_extractor = ProductExtractor()
products = product_extractor.extract(browser.page_source)
# 文章内容提取器(去除广告和导航)
article_extractor = ArticleExtractor()
article = article_extractor.extract(browser.page_source)
clean_text = article.text
author = article.author
publish_date = article.date
# 自定义提取规则
custom_extractor = ListExtractor(
item_selector=".listing-item",
fields={
"title": ".title",
"link": "a@href",
"price": ".price::text",
"image": "img@src"
}
)
results = custom_extractor.extract(browser.page_source)
- 智能表单处理
表单处理是网页自动化的难点,NanoBrowser对此做了大量优化:
# 自动填充表单
form = browser.find_element("form.login-form")
form.fill({
"username": "my_username",
"password": "my_password",
"remember": True
})
# 或者逐个字段填写
form["username"] = "my_username"
form["password"] = "my_password"
# 提交表单
form.submit()
# 智能表单检测与填写
browser.auto_fill_form(
form_selector="form",
data={"name": "张三", "email": "zhangsan@example.com"}
)
文件上传
# 定位文件上传输入框
file_input = browser.find_element("input[type='file']")
# 上传单个文件
file_input.upload_file("path/to/document.pdf")
# 上传多个文件
file_input.upload_files([
"path/to/image1.jpg",
"path/to/image2.jpg"
])
- 框架与多标签页处理
现代网页经常使用iframe框架和多标签页,NanoBrowser对此提供了完善的支持:
# 切换到iframe
browser.switch_to.frame("iframe-name")
browser.switch_to.frame(0) # 通过索引
browser.switch_to.frame(iframe_element) # 通过元素
# 操作iframe内容
iframe_element = browser.find_element("iframe")
with browser.frame_context(iframe_element):
# 在iframe上下文中操作
browser.find_element(".iframe-content").click()
# 切换回主文档
browser.switch_to.default_content()
# 多标签页处理
browser.switch_to.window(1) # 切换到第二个标签页
browser.switch_to.window("window-name")
browser.switch_to.window_by_url("https://...")
# 关闭当前标签页
browser.close_current_tab()
# 打开新标签页
browser.open_new_tab("https://example.com")
- JavaScript执行
对于需要动态渲染的内容,NanoBrowser支持执行JavaScript:
# 执行JavaScript代码
result = browser.execute_script("""
return document.title;
""")
# 执行并忽略返回值
browser.execute_script("""
window.scrollTo(0, document.body.scrollHeight);
""")
# 等待JavaScript执行完成
browser.execute_script_async("""
window.loadingPromise = fetch('/api/data').then(r => r.json());
""")
browser.wait_for_condition("return window.loadingPromise !== undefined;")
- 截图与PDF生成
# 截图
browser.screenshot("screenshot.png")
# 截取特定元素
element = browser.find_element(".main-content")
element.screenshot("element-screenshot.png")
# 设置截图尺寸
browser.screenshot("full-page.png", full_page=True)
# 生成PDF
browser.save_as_pdf("page.pdf")
Step-by-Step Practical Tutorial / 实战教程
这一部分是本教程的核心。我将通过几个实际案例,从简单到复杂,逐步展示如何使用NanoBrowser完成各种网页自动化任务。
案例一:天气预报查询
让我们从最简单的例子开始——获取某个城市的天气预报信息。
任务分析:
- 找到一个提供天气查询服务的网站
- 在搜索框中输入城市名
- 提取温度、天气状况等信息
from nanobrowser import Browser
from nanobrowser.extractors import TableExtractor
import time
def get_weather(city_name):
"""
获取指定城市的天气预报
Args:
city_name: 城市名称
Returns:
dict: 包含天气信息的字典
"""
# 创建浏览器实例
browser = Browser()
try:
# 导航到天气网站
browser.navigate("https://weather.com")
# 等待页面加载
browser.wait_for_load()
# 找到搜索框并输入城市名
search_box = browser.wait_for_element(
"input[placeholder*='搜索']",
timeout=10
)
search_box.clear()
search_box.type(city_name)
# 点击搜索按钮
search_button = browser.find_element("button[type='submit']")
search_button.click()
# 等待搜索结果加载
time.sleep(2)
browser.wait_for_load()
# 提取天气信息
weather_info = {
"city": city_name,
"temperature": browser.find_element(".temperature").text,
"condition": browser.find_element(".condition-text").text,
"humidity": browser.find_element(".humidity").text,
"wind": browser.find_element(".wind-speed").text,
}
return weather_info
except Exception as e:
print(f"获取天气信息时出错: {e}")
return None
finally:
# 确保关闭浏览器
browser.close()
# 使用示例
weather = get_weather("北京")
if weather:
print(f"{weather['city']}今天天气:{weather['condition']}")
print(f"气温:{weather['temperature']}")
print(f"湿度:{weather['humidity']}")
print(f"风速:{weather['wind']}")
代码解析
这个例子展示了NanoBrowser的基本工作流程:
创建浏览器实例、导航到目标网页、定位元素、提取信息、关闭浏览器。每一步都有错误处理,确保即使出错也能正确释放资源。
注意finally块的使用——这是一个重要的最佳实践,可以确保浏览器实例被正确关闭,避免资源泄漏。
案例二:自动化登录与数据获取
这个案例展示如何完成登录操作,然后获取登录后的专属内容。
任务分析:
- 打开登录页面
- 填写用户名和密码
- 点击登录按钮
- 等待登录完成
- 访问需要登录才能查看的页面
- 提取用户专属数据
from nanobrowser import Browser
from nanobrowser.exceptions import (
ElementNotFoundError,
TimeoutError,
LoginFailedError
)
import time
class SecureScraper:
"""安全的数据抓取器,处理登录流程"""
def __init__(self, username, password):
self.username = username
self.password = password
self.browser = Browser()
self.logged_in = False
def login(self, login_url):
"""
执行登录操作
Args:
login_url: 登录页面的URL
Returns:
bool: 登录是否成功
"""
print(f"正在登录到: {login_url}")
# 导航到登录页面
self.browser.navigate(login_url)
self.browser.wait_for_load()
try:
# 定位用户名输入框
# 尝试多种常见的定位方式
username_input = self._find_login_field("username")
password_input = self._find_login_field("password")
if not username_input or not password_input:
raise ElementNotFoundError("无法找到登录表单字段")
# 输入凭据
username_input.clear()
username_input.type(self.username)
password_input.clear()
password_input.type(self.password)
# 勾选"记住我"选项(如果存在)
remember_checkbox = self.browser.find_element(
"input[type='checkbox'][name*='remember']",
optional=True # 可选元素,找不到不报错
)
if remember_checkbox:
remember_checkbox.check()
# 找到并点击登录按钮
submit_button = self.browser.find_element(
"button[type='submit'], input[type='submit']"
)
submit_button.click()
# 等待登录响应
time.sleep(2)
self.browser.wait_for_load()
# 验证登录是否成功
if self._is_login_successful():
self.logged_in = True
print("登录成功!")
return True
else:
error_msg = self._get_login_error()
raise LoginFailedError(f"登录失败: {error_msg}")
except Exception as e:
print(f"登录过程出错: {e}")
self.logged_in = False
return False
def _find_login_field(self, field_type):
"""
智能查找登录字段,尝试多种定位方式
Args:
field_type: 字段类型,'username' 或 'password'
Returns:
Element 或 None
"""
selectors = {
"username": [
"input[name='username']",
"input[name='email']",
"input[type='email']",
"input[id='username']",
"input[placeholder*='用户名']",
"input[placeholder*='邮箱']",
],
"password": [
"input[name='password']",
"input[type='password']",
"input[id='password']",
"input[placeholder*='密码']",
]
}
for selector in selectors.get(field_type, []):
element = self.browser.find_element(selector, optional=True)
if element:
return element
return None
def _is_login_successful(self):
"""
检查登录是否成功
Returns:
bool
"""
# 检查URL是否变化(跳转到dashboard等)
current_url = self.browser.current_url
if any(keyword in current_url for keyword in ["dashboard", "home", "account"]):
return True
# 检查是否出现登出按钮
logout_button = self.browser.find_element(
"a:has-text('退出'), a:has-text('登出'), button:has-text('退出')",
optional=True
)
if logout_button:
return True
# 检查是否有错误提示
error_elements = self.browser.find_elements(
".error-message, .alert-danger, [class*='error']",
optional=True
)
return len(error_elements) == 0
def _get_login_error(self):
"""
获取登录错误信息
Returns:
str: 错误信息
"""
error_element = self.browser.find_element(
".error-message, .alert-danger",
optional=True
)
if error_element:
return error_element.text
return "未知错误"
def get_user_data(self, data_url):
"""
获取用户数据
Args:
data_url: 数据页面的URL
Returns:
dict: 用户数据
"""
if not self.logged_in:
print("警告:尚未登录,尝试直接访问...")
print(f"正在获取数据: {data_url}")
self.browser.navigate(data_url)
self.browser.wait_for_load()
# 根据具体页面结构调整提取逻辑
data = {
"profile": self._extract_profile(),
"statistics": self._extract_statistics(),
"recent_activity": self._extract_activity(),
}
return data
def _extract_profile(self):
"""提取用户资料"""
profile = {}
name_elem = self.browser.find_element(
".user-name, .profile-name, [class*='name']",
optional=True
)
if name_elem:
profile["name"] = name_elem.text
avatar_elem = self.browser.find_element(
"img.avatar, img.profile-pic",
optional=True
)
if avatar_elem:
profile["avatar_url"] = avatar_elem.get_attribute("src")
email_elem = self.browser.find_element(
".user-email, .profile-email",
optional=True
)
if email_elem:
profile["email"] = email_elem.text
return profile
def _extract_statistics(self):
"""提取统计数据"""
stats = {}
stat_cards = self.browser.find_elements(
".stat-card, .statistic-item"
)
for card in stat_cards:
label_elem = card.find_child(".stat-label, .label")
value_elem = card.find_child(".stat-value, .value")
if label_elem and value_elem:
stats[label_elem.text] = value_elem.text
return stats
def _extract_activity(self):
"""提取最近活动"""
activities = []
activity_items = self.browser.find_elements(
".activity-item, .timeline-item"
)
for item in activity_items[:10]: # 只取前10条
activity = {
"time": item.find_child(".time, .timestamp", optional=True).text if item.find_child(".time, .timestamp", optional=True) else None,
"description": item.find_child(".description, .content", optional=True).text if item.find_child(".description, .content", optional=True) else None,
}
activities.append(activity)
return activities
def close(self):
"""关闭浏览器"""
if self.browser:
self.browser.close()
print("浏览器已关闭")
# 使用示例
def main():
scraper = SecureScraper(
username="your_email@example.com",
password="your_password"
)
try:
# 执行登录
if scraper.login("https://example.com/login"):
# 获取数据
user_data = scraper.get_user_data("https://example.com/dashboard")
print("\n=== 用户资料 ===")
if user_data["profile"]:
for key, value in user_data["profile"].items():
print(f"{key}: {value}")
print("\n=== 统计数据 ===")
if user_data["statistics"]:
for key, value in user_data["statistics"].items():
print(f"{key}: {value}")
print("\n=== 最近活动 ===")
if user_data["recent_activity"]:
for i, activity in enumerate(user_data["recent_activity"], 1):
print(f"{i}. {activity['description']}")
finally:
scraper.close()
if __name__ == "__main__":
main()
代码解析
这个例子展示了更复杂的浏览器自动化流程:
智能字段查找:通过尝试多种常见的CSS选择器来定位登录表单字段,提高了代码的健壮性
登录状态检测:登录成功后需要验证,可以检查URL变化、页面元素或错误提示
数据提取模块化:将不同类型的数据提取(资料、统计、活动)封装为独立方法,便于维护和复用
资源管理:使用try-finally确保即使出错也能关闭浏览器
案例三:批量商品信息抓取
这个案例展示如何抓取电商网站的商品列表信息。
任务分析:
- 进入商品列表页面
- 获取所有商品的基本信息
- 处理分页,遍历所有页面
- 保存数据到结构化格式
from nanobrowser import Browser
from nanobrowser.extractors import ListExtractor
import json
import time
from dataclasses import dataclass, asdict
from typing import List, Optional
@dataclass
class Product:
"""商品数据模型"""
name: str
price: str
original_price: Optional[str] = None
discount: Optional[str] = None
sales: Optional[str] = None
rating: Optional[str] = None
shop: Optional[str] = None
link: str
image: Optional[str] = None
class ProductScraper:
"""商品信息抓取器"""
def __init__(self):
self.browser = Browser()
self.products: List[Product] = []
def scrape_category(self, category_url, max_pages=5):
"""
抓取分类页面的商品信息
Args:
category_url: 分类页面的URL
max_pages: 最大抓取页数
Returns:
List[Product]: 商品列表
"""
print(f"开始抓取分类页面: {category_url}")
self.browser.navigate(category_url)
self.browser.wait_for_load()
page = 1
while page <= max_pages:
print(f"\n正在抓取第 {page}/{max_pages} 页...")
# 等待商品列表加载
self._wait_for_products()
# 提取当前页商品
page_products = self._extract_page_products()
self.products.extend(page_products)
print(f"本页获取 {len(page_products)} 个商品")
print(f"累计已获取 {len(self.products)} 个商品")
# 尝试翻页
if not self._go_to_next_page():
print("已到达最后一页")
break
page += 1
time.sleep(2) # 礼貌性延迟
return self.products
def scrape_search_results(self, keyword, max_pages=5):
"""
抓取搜索结果页面
Args:
keyword: 搜索关键词
max_pages: 最大抓取页数
Returns:
List[Product]: 商品列表
"""
# 构造搜索URL
search_url = f"https://search.example.com?q={keyword}"
return self.scrape_category(search_url, max_pages)
def _wait_for_products(self):
"""等待商品列表加载完成"""
try:
# 等待商品容器出现
self.browser.wait_for_element(
".product-list, .goods-list, [class*='product']",
timeout=10
)
# 等待至少一个商品加载
self.browser.wait_for_element(
".product-item, .goods-item",
timeout=10
)
except Exception as e:
print(f"等待商品列表加载时出错: {e}")
def _extract_page_products(self) -> List[Product]:
"""
提取当前页面的所有商品
Returns:
List[Product]: 商品列表
"""
products = []
# 方法一:使用自定义提取器
extractor = ListExtractor(
item_selector=".product-item, .goods-item, li[itemtype]",
fields={
"name": ".product-title, .goods-title, [itemprop='name']",
"price": ".product-price, .price, [itemprop='price']",
"original_price": ".original-price, .price-del",
"sales": ".sales-count, .deal-count",
"rating": ".rating, [itemprop='ratingValue']",
"shop": ".shop-name, .seller",
"link": "a.product-link@href, a@href",
"image": "img@src, img@data-src",
}
)
try:
extracted = extractor.extract(self.browser.page_source)
for item in extracted:
# 清理数据
product = Product(
name=self._clean_text(item.get("name", "")),
price=self._clean_text(item.get("price", "")),
original_price=self._clean_text(item.get("original_price")),
discount=self._calculate_discount(
item.get("price"),
item.get("original_price")
),
sales=self._clean_text(item.get("sales")),
rating=self._clean_text(item.get("rating")),
shop=self._clean_text(item.get("shop")),
link=self._normalize_url(item.get("link", "")),
image=self._clean_text(item.get("image")),
)
products.append(product)
except Exception as e:
print(f"提取商品列表时出错: {e}")
# 方法二:手动提取(备用方案)
products = self._extract_products_manually()
return products
def _extract_products_manually(self) -> List[Product]:
"""
手动提取商品(当提取器失败时使用)
Returns:
List[Product]: 商品列表
"""
products = []
# 查找所有商品元素
product_elements = self.browser.find_elements(
".product-item, .goods-item"
)
for elem in product_elements:
try:
# 提取各个字段
name_elem = elem.find_child(
".product-title, .goods-title",
optional=True
)
name = name_elem.text if name_elem else "未知商品"
price_elem = elem.find_child(
".product-price, .price",
optional=True
)
price = price_elem.text if price_elem else "暂无价格"
link_elem = elem.find_element("a", optional=True)
link = link_elem.get_attribute("href") if link_elem else ""
img_elem = elem.find_child("img", optional=True)
image = img_elem.get_attribute("src") if img_elem else None
product = Product(
name=self._clean_text(name),
price=self._clean_text(price),
link=self._normalize_url(link),
image=image
)
products.append(product)
except Exception as e:
print(f"提取单个商品时出错: {e}")
continue
return products
def _go_to_next_page(self) -> bool:
"""
翻到下一页
Returns:
bool: 是否成功翻页
"""
try:
# 方法一:查找"下一页"按钮
next_button = self.browser.find_element(
".next-page, .pagination-next, a:has-text('下一页')",
optional=True
)
if next_button and next_button.is_enabled():
next_button.click()
self.browser.wait_for_load()
return True
# 方法二:查找页码按钮
current_page = self.browser.find_element(
".pagination .current, .active",
optional=True
)
if current_page:
next_page_elem = current_page.find_next_sibling("a")
if next_page_elem:
next_page_elem.click()
self.browser.wait_for_load()
return True
return False
except Exception as e:
print(f"翻页时出错: {e}")
return False
def _clean_text(self, text: Optional[str]) -> Optional[str]:
"""清理文本数据"""
if not text:
return None
# 移除多余空白
text = " ".join(text.split())
# 移除特殊字符
text = text.strip()
return text if text else None
def _normalize_url(self, url: str) -> str:
"""标准化URL"""
if not url:
return ""
# 处理相对URL
if url.startswith("//"):
return "https:" + url
elif url.startswith("/"):
return "https://example.com" + url
elif not url.startswith("http"):
return "https://example.com/" + url
return url
def _calculate_discount(self, price: str, original_price: str) -> Optional[str]:
"""计算折扣"""
if not price or not original_price:
return None
try:
# 提取数字
current = float(price.replace("¥", "").replace("元", ""))
original = float(original_price.replace("¥", "").replace("元", ""))
if original > 0:
discount = current / original
return f"{int(discount * 10)}折"
except (ValueError, AttributeError):
pass
return None
def save_to_json(self, filename: str):
"""保存数据到JSON文件"""
with open(filename, "w", encoding="utf-8") as f:
json.dump(
[asdict(p) for p in self.products],
f,
ensure_ascii=False,
indent=2
)
print(f"数据已保存到: {filename}")
def save_to_csv(self, filename: str):
"""保存数据到CSV文件"""
import csv
with open(filename, "w", encoding="utf-8-sig", newline="") as f:
if self.products:
fieldnames = [
"name", "price", "original_price", "discount",
"sales", "rating", "shop", "link", "image"
]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for product in self.products:
writer.writerow(asdict(product))
print(f"数据已保存到: {filename}")
def close(self):
"""关闭浏览器"""
self.browser.close()
# 使用示例
def main():
scraper = ProductScraper()
try:
# 方式一:通过URL抓取分类
products = scraper.scrape_category(
"https://example.com/category/electronics",
max_pages=3
)
# 方式二:通过搜索关键词抓取
# products = scraper.scrape_search_results(
# "无线蓝牙耳机",
# max_pages=5
# )
print(f"\n总共抓取 {len(products)} 个商品")
# 保存数据
if products:
scraper.save_to_json("products.json")
scraper.save_to_csv("products.csv")
# 打印部分数据预览
print("\n=== 数据预览(前5个商品)===")
for i, product in enumerate(products[:5], 1):
print(f"\n商品 {i}:")
print(f" 名称: {product.name}")
print(f" 价格: {product.price}")
if product.original_price:
print(f" 原价: {product.original_price}")
print(f" 折扣: {product.discount}")
if product.sales:
print(f" 销量: {product.sales}")
if product.rating:
print(f" 评分: {product.rating}")
print(f" 链接: {product.link}")
finally:
scraper.close()
if __name__ == "__main__":
main()
代码解析
这个复杂的案例展示了企业级网页抓取的完整解决方案:
数据建模:使用@dataclass定义Product类,确保数据结构一致,便于后续处理
两种提取策略:主方案使用ListExtractor提取器,备用方案手动提取,提高代码健壮性
智能翻页:尝试多种方式定位下一页按钮,确保持续抓取数据
数据清洗:提供清理文本、标准化URL、计算折扣等辅助方法
多格式导出:支持JSON和CSV两种常见格式
案例四:AI智能问答机器人
最后一个案例,展示如何利用NanoBrowser构建一个基于网页内容的AI问答系统。
任务分析:
- 让用户输入问题
- 导航到相关网页或使用预设网页
- 提取网页内容
- 使用AI分析内容并回答问题
from nanobrowser import Browser
from nanobrowser.extractors import ArticleExtractor
from typing import Optional, Dict, List
import time
class WebQAAssistant:
"""基于网页内容的问答助手"""
def __init__(self):
self.browser = Browser()
self.current_page_content = ""
self.page_context = {}
def load_and_analyze(self, url: str) -> Dict[str, any]:
"""
加载网页并分析内容
Args:
url: 网页URL
Returns:
Dict: 包含页面分析的字典
"""
print(f"正在加载网页: {url}")
self.browser.navigate(url)
self.browser.wait_for_load()
# 提取页面内容
extractor = ArticleExtractor()
article = extractor.extract(self.browser.page_source)
self.current_page_content = article.text
self.page_context = {
"title": self.browser.title,
"url": url,
"headings": self._extract_headings(),
"links": self._extract_links(),
"main_content": article.text,
"metadata": article.metadata,
}
return self.page_context
def _extract_headings(self) -> List[Dict[str, str]]:
"""提取所有标题"""
headings = []
for level in range(1, 7):
heading_elements = self.browser.find_elements(f"h{level}")
for elem in heading_elements:
headings.append({
"level": level,
"text": elem.text,
"id": elem.get_attribute("id"),
})
return headings
def _extract_links(self) -> List[Dict[str, str]]:
"""提取所有链接"""
links = []
link_elements = self.browser.find_elements("a[href]")
for elem in link_elements[:50]: # 限制数量
links.append({
"text": elem.text,
"href": elem.get_attribute("href"),
})
return links
def answer_question(self, question: str) -> str:
"""
基于当前页面内容回答问题
Args:
question: 用户问题
Returns:
str: AI生成的回答
"""
if not self.current_page_content:
return "请先加载一个网页"
# 构建提示词
prompt = self._build_prompt(question)
# 在实际应用中,这里会调用LLM API
# 为了演示,我们使用简单的关键词匹配
answer = self._generate_answer(prompt)
return answer
def _build_prompt(self, question: str) -> str:
"""构建提示词"""
# 截取内容的前5000字符(根据实际API限制调整)
content_preview = self.current_page_content[:5000]
prompt = f"""
你是一个智能助手,正在帮助用户理解一个网页的内容。
页面标题: {self.page_context.get('title', '未知')}
页面内容:
{content_preview}
用户问题: {question}
请根据页面内容回答用户的问题。如果页面内容中没有相关信息,请明确指出。
回答应该简洁、准确,如果需要可以引用页面中的原文。
"""
return prompt
def _generate_answer(self, prompt: str) -> str:
"""
生成回答
注意:这是简化版本,实际使用时应该调用真正的LLM API
在生产环境中,可以使用OpenAI、Claude等API
"""
# 简单模拟:基于关键词匹配
# 实际应用中应该接入真正的AI服务
# 如果有API密钥,可以这样调用:
"""
from openai import OpenAI
client = OpenAI(api_key="your-api-key")
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个网页内容问答助手。"},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=1000
)
return response.choices[0].message.content
"""
# 简化模拟:提取相关内容
keywords = prompt.split()[-5:] # 取最后几个词作为关键词
relevant_sentences = self._find_relevant_sentences(keywords)
if relevant_sentences:
return f"根据页面内容,相关信息如下:\n\n" + "\n".join(relevant_sentences[:3])
else:
return "抱歉,我在页面内容中没有找到与您问题直接相关的信息。"
def _find_relevant_sentences(self, keywords: List[str]) -> List[str]:
"""查找包含关键词的句子"""
sentences = self.current_page_content.split("。")
relevant = []
for sentence in sentences:
if any(kw in sentence for kw in keywords):
relevant.append(sentence.strip() + "。")
return relevant
def search_and_answer(self, base_url: str, question: str) -> Dict[str, any]:
"""
在网页中搜索相关内容并回答
Args:
base_url: 基础URL
question: 问题
Returns:
Dict: 包含搜索结果和回答的字典
"""
# 加载页面
context = self.load_and_analyze(base_url)
# 搜索相关内容
search_results = self._search_within_page(question)
# 生成回答
answer = self.answer_question(question)
return {
"question": question,
"answer": answer,
"search_results": search_results,
"page_title": context.get("title"),
}
def _search_within_page(self, keyword: str) -> List[Dict[str, any]]:
"""
在页面中搜索关键词
Args:
keyword: 搜索关键词
Returns:
List: 搜索结果列表
"""
# 使用浏览器的搜索功能(如果支持)
# self.browser.execute_script(f"window.find('{keyword}')")
results = []
# 提取所有包含关键词的元素
elements = self.browser.find_elements(
f":self-or-descendant:text('{keyword}')",
optional=True
)
for elem in elements[:10]:
context = self._get_element_context(elem)
if context:
results.append(context)
return results
def _get_element_context(self, element) -> Dict[str, any]:
"""获取元素的上下文信息"""
try:
return {
"tag": element.tag_name,
"text": element.text[:200] if element.text else "",
"class": element.get_attribute("class"),
}
except:
return None
def interact(self):
"""交互式问答"""
print("=" * 50)
print("网页问答助手")
print("=" * 50)
print("\n输入URL加载网页,输入问题获取答案,输入'退出'结束")
print()
while True:
user_input = input("\n> ").strip()
if user_input.lower() in ["退出", "exit", "quit"]:
print("再见!")
break
if user_input.startswith("http"):
# 加载URL
try:
result = self.load_and_analyze(user_input)
print(f"\n页面加载成功!")
print(f"标题: {result['title']}")
print(f"内容长度: {len(result['main_content'])} 字符")
print(f"标题数量: {len(result['headings'])}")
except Exception as e:
print(f"\n加载失败: {e}")
else:
# 回答问题
try:
answer = self.answer_question(user_input)
print(f"\n回答:\n{answer}")
except Exception as e:
print(f"\n处理问题时出错: {e}")
def close(self):
"""关闭浏览器"""
self.browser.close()
# 使用示例
def main():
assistant = WebQAAssistant()
try:
# 示例1:直接加载网页
context = assistant.load_and_analyze("https://en.wikipedia.org/wiki/Python_(programming_language)")
print(f"已加载: {context['title']}")
# 示例2:基于加载的内容回答问题
questions = [
"Python是什么时候创建的?",
"谁创造了Python?",
"Python的主要特点是什么?",
]
for q in questions:
print(f"\n问题: {q}")
answer = assistant.answer_question(q)
print(f"回答: {answer}")
time.sleep(1)
# 示例3:交互式问答
# assistant.interact()
finally:
assistant.close()
if __name__ == "__main__":
main()
代码解析
这个案例展示了NanoBrowser在AI应用中的典型用法:
内容提取与结构化:使用ArticleExtractor提取干净的页面内容,同时收集标题、链接等元信息
构建AI工作流:将网页内容与AI模型结合,构建问答系统
智能搜索:在页面内容中查找相关信息,为回答提供依据
交互式接口:提供命令行交互方式,方便用户使用
Common Use Cases and Scenarios / 常见使用场景
通过前面的实战案例,你应该对NanoBrowser有了深入的了解。下面总结一下它最适合的应用场景:
场景一:数据采集与市场调研
# 竞品价格监控
class PriceMonitor:
"""竞品价格监控系统"""
def __init__(self):
self.browser = Browser()
def check_prices(self, product_urls: List[str]) -> Dict[str, float]:
"""检查多个商品的价格"""
prices = {}
for url in product_urls:
self.browser.navigate(url)
self.browser.wait_for_load()
# 提取价格
price_elem = self.browser.find_element(
".product-price, .current-price"
)
if price_elem:
price_text = price_elem.text
prices[url] = self._parse_price(price_text)
return prices
def _parse_price(self, text: str) -> float:
"""解析价格文本"""
import re
match = re.search(r"[\d.]+", text)
return float(match.group()) if match else 0.0
场景二:自动化测试
# Web应用功能测试
class WebTester:
"""Web应用测试工具"""
def __init__(self):
self.browser = Browser()
self.test_results = []
def test_form_submission(self, form_url: str, test_data: dict):
"""测试表单提交功能"""
self.browser.navigate(form_url)
self.browser.wait_for_load()
form = self.browser.find_element("form")
# 填写表单
for field, value in test_data.items():
form[field] = value
# 提交
form.submit()
self.browser.wait_for_load()
# 验证结果
success = self.browser.find_element(
".success-message",
optional=True
) is not None
self.test_results.append({
"test": "form_submission",
"url": form_url,
"passed": success,
})
return success
场景三:内容聚合
# 新闻聚合器
class NewsAggregator:
"""新闻内容聚合器"""
def __init__(self):
self.browser = Browser()
self.news_items = []
def collect_from_source(self, source_url: str):
"""从特定来源收集新闻"""
self.browser.navigate(source_url)
self.browser.wait_for_load()
# 提取新闻列表
news_elements = self.browser.find_elements(
".news-item, .article-item"
)
for elem in news_elements:
title = elem.find_child(".title").text
link = elem.find_child("a").get_attribute("href")
summary = elem.find_child(".summary", optional=True)
self.news_items.append({
"title": title,
"link": link,
"summary": summary.text if summary else "",
"source": source_url,
})
场景四:定期报告生成
# 网站状态监控报告
class SiteMonitor:
"""网站状态监控"""
def __init__(self):
self.browser = Browser()
def generate_health_report(self, site_url: str) -> dict:
"""生成网站健康报告"""
report = {
"url": site_url,
"timestamp": time.time(),
"status": "unknown",
"load_time": 0,
"issues": [],
}
start_time = time.time()
try:
self.browser.navigate(site_url)
report["load_time"] = time.time() - start_time
report["status"] = "ok"
# 检查常见问题
errors = self.browser.find_elements(
".error, .warning, [class*='error']"
)
if errors:
report["issues"].append(f"发现 {len(errors)} 个错误元素")
# 检查broken links
broken_links = self._find_broken_links()
if broken_links:
report["issues"].append(
f"发现 {len(broken_links)} 个失效链接"
)
except Exception as e:
report["status"] = "error"
report["issues"].append(str(e))
return report
def _find_broken_links(self) -> List[str]:
"""查找失效链接"""
broken = []
links = self.browser.find_elements("a[href]")
for link in links[:20]: # 限制检查数量
href = link.get_attribute("href")
if not href or href.startswith("#") or href.startswith("javascript"):
continue
try:
# 简单检查:导航到链接
# 实际应用中应该用HEAD请求
self.browser.execute_script(f"window.open('{href}', '_blank')")
except:
broken.append(href)
return broken
Tips and Best Practices / 技巧与最佳实践
经过前面的学习和实践,这里总结一些重要的技巧和最佳实践,帮助你更好地使用NanoBrowser。
性能优化技巧
# 技巧一:复用浏览器实例
class EfficientScraper:
"""高效的抓取器"""
def __init__(self):
self.browser = Browser()
# 浏览器实例在多个任务间复用
self.urls_to_scrape = []
self.results = []
def scrape_batch(self, urls: List[str]):
"""批量抓取多个URL"""
for url in urls:
try:
self.browser.navigate(url, timeout=10)
# 提取内容...
self.results.append(self._extract_content())
except Exception as e:
print(f"抓取 {url} 失败: {e}")
return self.results
def close(self):
self.browser.close()
# 技巧二:减少不必要的等待
class FastScraper:
"""快速抓取器"""
def scrape_fast(self, url: str):
self.browser.navigate(url, wait=False) # 不等待
time.sleep(2) # 使用固定延迟
# 如果元素不存在就跳过
element = self.browser.find_element(
".target",
optional=True # 不抛出异常
)
if element:
return element.text
return None
# 技巧三:使用轻量级模式
class LightweightScraper:
"""轻量级抓取器"""
def __init__(self):
# 禁用图片和CSS加载
self.browser = Browser(
load_images=False,
load_css=False,
load_js=False # 如果不需要JavaScript
)
错误处理最佳实践
# 健壮的错误处理
class RobustScraper:
"""健壮的抓取器"""
def __init__(self, max_retries=3, retry_delay=5):
self.max_retries = max_retries
self.retry_delay = retry_delay
def navigate_with_retry(self, url: str):
"""带重试的导航"""
for attempt in range(self.max_retries):
try:
self.browser.navigate(url, timeout=30)
self.browser.wait_for_load()
return True
except TimeoutError:
print(f"尝试 {attempt + 1} 失败,正在重试...")
time.sleep(self.retry_delay)
except NetworkError as e:
if "404" in str(e):
raise # 404不重试
time.sleep(self.retry_delay)
raise MaxRetriesExceeded(f"达到最大重试次数: {self.max_retries}")
def safe_extract(self, selector: str, default=None):
"""安全的提取方法"""
try:
elem = self.browser.find_element(selector)
return elem.text
except ElementNotFoundError:
return default
except Exception as e:
print(f"提取时出错: {e}")
return default
# 自定义异常
class NanobrowserError(Exception):
"""NanoBrowser基础异常"""
pass
class ElementNotFoundError(NanobrowserError):
"""元素未找到异常"""
pass
class TimeoutError(NanobrowserError):
"""超时异常"""
pass
class NetworkError(NanobrowserError):
"""网络错误异常"""
pass
class LoginFailedError(NanobrowserError):
"""登录失败异常"""
pass
class MaxRetriesExceeded(NanobrowserError):
"""超过最大重试次数异常"""
pass
反爬虫应对策略
# 模拟人类行为
class HumanLikeScraper:
"""模拟人类行为的抓取器"""
def __init__(self):
self.browser = Browser()
def human_like_scroll(self):
"""模拟人类滚动行为"""
import random
total_height = self.browser.execute_script(
"return document.body.scrollHeight"
)
position = 0
while position < total_height:
# 随机滚动距离
step = random.randint(200, 500)
position += step
self.browser.execute_script(
f"window.scrollTo(0, {position});"
)
# 随机等待
time.sleep(random.uniform(0.5, 1.5))
def human_like_type(self, element, text: str):
"""模拟人类打字"""
element.clear()
for char in text:
element.type(char)
# 随机等待
time.sleep(random.uniform(0.05, 0.15))
def random_mouse_movement(self):
"""模拟随机鼠标移动"""
import random
for _ in range(5):
x = random.randint(100, 500)
y = random.randint(100, 500)
self.browser.execute_script(f"""
var event = new MouseEvent('mousemove', {{
'view': window,
'bubbles': true,
'cancelable': true,
'clientX': {x},
'clientY': {y}
}});
document.dispatchEvent(event);
""")
time.sleep(random.uniform(0.1, 0.3))
# 设置请求头
class CustomHeadersScraper:
"""自定义请求头的抓取器"""
def __init__(self):
self.browser = Browser(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
headers={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
)
资源清理
# 重要的资源清理
class ResourceManager:
"""资源管理器"""
def __init__(self):
self.browser = None
self.screenshots = []
def __enter__(self):
"""上下文管理器入口"""
self.browser = Browser()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.cleanup()
return False # 不抑制异常
def cleanup(self):
"""清理所有资源"""
if self.browser:
self.browser.close()
self.browser = None
# 清理截图
for screenshot in self.screenshots:
try:
os.remove(screenshot)
except:
pass
def take_screenshot(self, name: str):
"""截屏并记录"""
if self.browser:
path = f"screenshots/{name}.png"
self.browser.screenshot(path)
self.screenshots.append(path)
return path
return None
# 使用上下文管理器
def process_data():
with ResourceManager() as manager:
manager.browser.navigate("https://example.com")
manager.take_screenshot("step1")
# ... 更多操作
# 自动清理资源
调试技巧
# 调试辅助工具
class DebugHelper:
"""调试助手"""
def __init__(self, browser):
self.browser = browser
def print_page_info(self):
"""打印页面基本信息"""
print("=" * 40)
print(f"URL: {self.browser.current_url}")
print(f"Title: {self.browser.title}")
print(f"Source length: {len(self.browser.page_source)}")
print("=" * 40)
def save_debug_snapshot(self, name: str):
"""保存调试快照"""
import os
import json
from datetime import datetime
os.makedirs("debug", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
prefix = f"debug/{name}_{timestamp}"
# 保存HTML
with open(f"{prefix}.html", "w", encoding="utf-8") as f:
f.write(self.browser.page_source)
# 保存截图
self.browser.screenshot(f"{prefix}.png")
# 保存元数据
metadata = {
"url": self.browser.current_url,
"title": self.browser.title,
"timestamp": timestamp,
}
with open(f"{prefix}.json", "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2)
print(f"调试快照已保存: {prefix}")
def inspect_element(self, selector: str):
"""检查元素详情"""
try:
elem = self.browser.find_element(selector)
print(f"元素信息:")
print(f" 标签: {elem.tag_name}")
print(f" 文本: {elem.text[:100]}")
print(f" 可见: {elem.is_visible()}")
print(f" 启用: {elem.is_enabled()}")
print(f" 位置: {elem.location}")
print(f" 尺寸: {elem.size}")
except ElementNotFoundError:
print(f"未找到元素: {selector}")
Conclusion / 总结
经过这篇详尽的教程,你应该已经掌握了NanoBrowser的核心概念和实用技巧。
NanoBrowser的核心价值
NanoBrowser不仅仅是一个浏览器自动化工具,它代表了一种新的理念——让AI能够自然地与网页交互。传统的浏览器自动化工具是为人类设计的,需要精确的元素定位和详细的步骤说明。而NanoBrowser则采用了更友好的API设计,让AI能够更直观地表达浏览意图。
关键要点回顾
轻量级设计:NanoBrowser专注于核心功能,体积小、启动快、资源消耗低
AI友好的API:简洁直观的接口设计,让AI能够自然地表达意图
完善的错误处理:内置异常类和错误处理机制,提高代码健壮性
强大的内容提取:支持多种提取方式,满足不同场景需求
灵活的交互能力:支持元素定位、表单填写、JavaScript执行等
相关资源链接
GitHub仓库:https://github.com/nanobrowser/nanobrowser
官方文档:https://nanobrowser.github.io/nanobrowser-docs
示例代码库:https://github.com/nanobrowser/nanobrowser-examples
社区论坛:https://github.com/nanobrowser/nanobrowser/discussions
相关AI项目推荐
Playwright:功能强大的浏览器自动化工具,与NanoBrowser可互补使用
LangChain:构建AI应用的框架,可以与NanoBrowser集成实现网页感知
Selenium:传统浏览器自动化工具,适合需要复杂浏览器控制的场景
Puppeteer:Node.js生态中的浏览器控制工具
Crawl4AI:专为AI设计的网页抓取工具
学习路径建议
入门阶段:先掌握基本的导航和元素提取功能
进阶阶段:学习表单处理、分页抓取、错误处理
高级阶段:构建复杂的数据采集系统、AI应用集成
开始你的NanoBrowser之旅吧!无论你是想抓取网页数据、自动化重复任务,还是构建AI驱动的网页应用,NanoBrowser都能成为你强大的工具。
祝你编码愉快!
评论区