别再为爬虫头疼了!这款开源工具让 AI 网页抓取变得像喝水一样简单

别再为爬虫头疼了!这款开源工具让 AI 网页抓取变得像喝水一样简单

别再为爬虫头疼了!这款开源工具让 AI 网页抓取变得像喝水一样简单

在当今数据为王的时代,无论是构建 AI 训练数据集、市场调研分析,还是竞品监控和内容聚合,网页数据抓取都是开发者必备的核心技能。然而,传统爬虫方案总是让人头疼不已:复杂的配置、频繁的反爬机制、难以处理的动态内容、提取的数据格式混乱……这些问题消耗了开发者大量的时间和精力。

今天要介绍的这个开源项目——crawl4ai,由开发者 unclecode 创建,专门为解决这些痛点而生。它将人工智能与网页爬取完美结合,让抓取网页数据变成一件优雅而高效的事情。无论你是需要快速提取网页内容的研究者,还是希望构建自动化数据管道的工程师,crawl4ai 都值得你花时间深入了解。

这篇文章将带你从零开始,全面掌握 crawl4ai 的使用方法,包括环境配置、核心功能、实战技巧以及最佳实践建议。通过大量的代码示例和详细的步骤说明,即使是编程新手也能轻松上手。准备好了吗?让我们开始这段学习之旅。

为什么 crawl4ai 值得关注

在深入了解具体使用方法之前,我们先来探讨一下 crawl4ai 相比其他爬虫方案究竟有哪些独特优势,以及它能够解决哪些实际问题。

首先,crawl4ai 的核心理念是“智能化”。传统的网页爬虫往往依赖规则和选择器来定位元素,这意味着当你需要从不同网站提取数据时,往往需要针对每个网站编写专门的解析规则。一旦网站改版或调整结构,这些规则就会失效,爬虫也会随之崩溃。而 crawl4ai 内置了 AI 能力,能够自动理解网页结构和内容语义,即使面对复杂多变的页面,也能准确提取目标数据。这种能力对于需要从海量不同网站采集数据的场景来说,价值尤为显著。

其次,crawl4ai 在处理动态内容方面表现出色。现代网页大量使用 JavaScript 渲染技术,许多重要内容只有在浏览器执行脚本后才能显示。传统基于 HTTP 请求的爬虫无法处理这种情况,而 crawl4ai 内置了无头浏览器支持,能够完整执行页面脚本,确保获取到所有渲染后的内容。这意味着你可以用它来抓取那些依赖 JavaScript 加载的单页应用、无限滚动列表、延迟加载的图片等动态内容。

第三,crawl4ai 提供了开箱即用的数据提取能力。它不仅能获取页面的原始 HTML,还能直接输出结构化的数据格式,包括 Markdown、清洗后的纯文本、JSON 等。特别是将网页内容转换为 Markdown 的能力,对于需要为大型语言模型提供训练数据或上下文的开发者来说,简直是神器级别的功能。你不需要再为解析 HTML 标签、去除广告和导航元素而烦恼,crawl4ai 会帮你处理好一切。

第四,crawl4ai 支持自定义提取逻辑。通过编写提示词(Prompt),你可以告诉 AI 你需要什么类型的信息,它会自动识别并提取。这种基于自然语言的提取方式大大降低了使用门槛,即使你不是 HTML 或 CSS 专家,也能准确获取所需数据。项目中还提供了 CSS 选择器和 XPath 作为备选方案,满足高级用户的精细控制需求。

第五,crawl4ai 内置了多种反反爬策略。它会自动处理常见的反爬机制,如检测并绕过机器人验证、模拟人类访问行为、处理 Cookie 和 Session 等。对于需要大规模抓取的商业应用来说,这些内置能力可以节省大量的开发和测试时间。

最后,也是非常重要的一点,crawl4ai 是完全开源的。这意味着你可以自由地使用、修改和分发它,同时也能深入了解其内部实现,根据自己的需求进行定制。对于重视代码透明度和可控性的团队来说,开源属性提供了额外的信心保障。

总结来说,crawl4ai 将 AI 能力与网页爬取技术完美融合,提供了一种现代化、高效、易用的数据采集解决方案。它特别适合以下几类用户:需要构建 AI 训练数据集的研究人员、进行市场调研和数据采集的产品经理、搭建内容聚合平台的开发者,以及任何有网页数据抓取需求的工程师。

环境搭建与安装

现在让我们开始动手实践。首先需要完成开发环境的准备工作,包括安装 Python、配置虚拟环境以及安装 crawl4ai 包本身。

确保你的系统上已经安装了 Python。crawl4ai 需要 Python 3.8 或更高版本。你可以通过在终端中运行以下命令来检查 Python 版本:

python3 --version

如果显示的版本号低于 3.8,或者提示命令未找到,你需要先安装 Python。推荐使用 Anaconda 或直接从 python.org 下载安装包。对于 macOS 用户,也可以使用 Homebrew 包管理器安装:

brew install python

对于 Linux 用户,可以使用系统的包管理器,例如 Ubuntu 或 Debian 系统:

sudo apt update
sudo apt install python3 python3-pip python3-venv

安装好 Python 后,建议使用虚拟环境来管理项目依赖。虚拟环境可以避免不同项目之间的包版本冲突,让项目环境更加干净可控。创建虚拟环境的方法如下:

python3 -m venv crawl4ai-env

这条命令会在当前目录下创建一个名为 crawl4ai-env 的虚拟环境文件夹。接下来,我们需要激活这个虚拟环境:

在 macOS 和 Linux 系统上:

source crawl4ai-env/bin/activate

在 Windows 系统上:

crawl4ai-env\Scripts\activate

激活成功后,你的终端提示符前面会显示虚拟环境名称,表明你现在处于隔离的 Python 环境中。

现在是最关键的步骤:安装 crawl4ai 包本身。推荐使用 pip 包管理器来安装:

pip install crawl4ai

如果你遇到安装问题,可能需要先升级 pip:

pip install --upgrade pip

安装过程会自动处理 crawl4ai 的所有依赖项,包括 Playwright——这是一个用于控制无头浏览器的关键库。安装完成后,你可以使用以下命令验证是否安装成功:

python -c "import crawl4ai; print(crawl4ai.__version__)"

如果命令输出了版本号,说明安装完全成功。

对于某些高级功能,你可能还需要安装额外的浏览器驱动。crawl4ai 会自动处理大部分工作,但如果遇到浏览器相关的问题,可以尝试手动安装 Playwright:

playwright install

这条命令会下载并安装必要的浏览器二进制文件。

另外,如果你计划使用异步编程特性(这是推荐的使用方式),请确保你的 Python 版本支持 asyncio。Python 3.7 及以上版本都完整支持 asyncio。

安装过程中可能遇到的问题及解决方案

在实际安装过程中,你可能会遇到一些常见问题。提前了解这些问题及其解决方案,可以帮助你更顺利地完成环境搭建。

第一个常见问题是依赖库安装失败。这通常发生在系统缺少某些开发工具或库文件的情况下。Linux 用户可能需要安装编译工具:

sudo apt install build-essential python3-dev

macOS 用户如果使用 Homebrew,可能需要安装 headers:

xcode-select --install

第二个问题是 Playwright 安装缓慢或失败。由于 Playwright 需要下载大型浏览器二进制文件,在网络不佳的环境下可能会超时或中断。一个有效的解决方法是设置国内镜像源,或者预先下载浏览器驱动。你也可以单独安装 Playwright:

pip install playwright
python -m playwright install chromium

这里以 Chromium 浏览器为例,如果你需要支持其他浏览器,可以运行相应的安装命令。

第三个问题是权限错误。在某些系统配置下,直接安装可能导致权限问题。确保你使用了虚拟环境,或者在命令前加上 sudo(不推荐),或者修改 pip 的安装目标目录。

第四个问题是 Python 版本不兼容。确保你的 Python 版本至少是 3.8。某些新功能可能需要更新的 Python 版本才能使用。

成功解决安装问题后,你现在应该拥有了一个可以正常工作的 crawl4ai 环境。接下来,我们将深入了解它的核心功能。

核心功能详解

在开始使用 crawl4ai 之前,我们需要花一些时间来理解它的核心功能和设计理念。这一部分将详细介绍 crawl4ai 的主要特性,帮助你建立完整的认知框架。

基础爬取能力

crawl4ai 的最基本功能是获取网页的完整内容。只需要提供目标 URL,它就能返回页面的 HTML 代码、渲染后的内容、提取的链接、图片等多种信息。基础用法非常简单:

from crawl4ai import AsyncWebCrawler

# 创建爬虫实例
async def main():
    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(url="https://example.com")

        # 访问各种提取结果
        print("原始HTML:", result.html[:500])  # 显示前500个字符
        print("清洗后的内容:", result.markdown[:500])
        print("提取的链接:", result.links)
        print("提取的图片:", result.media.get("images", []))

# 运行异步函数
import asyncio
asyncio.run(main())

这段代码展示了 crawl4ai 最核心的使用模式:创建 AsyncWebCrawler 实例,然后使用 arun 方法执行爬取任务。返回的 result 对象包含了丰富的信息,可以根据需要选择使用。

智能内容提取

crawl4ai 的核心亮点在于其智能内容提取能力。它不仅能获取原始 HTML,还能理解页面语义,自动识别并提取有意义的内容。

markdown 属性是最常用的输出格式之一。它将网页内容转换为干净的 Markdown 格式,自动去除了导航栏、广告、脚本等无关元素,只保留文章正文、标题、列表、代码块等核心内容。这对于后续的文本分析、NLP 处理或直接展示都非常方便。

async def extract_as_markdown():
    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(
            url="https://en.wikipedia.org/wiki/Python_(programming_language)"
        )

        # 获取清洗后的 Markdown 内容
        markdown_content = result.markdown

        # 可以直接用于 LLM 上下文
        print(f"内容长度: {len(markdown_content)} 字符")
        print(f"标题: {result.metadata.get('title', 'N/A')}")

对于需要更精细控制的情况,crawl4ai 提供了自定义提取功能。通过编写提示词,你可以让 AI 准确理解你需要什么信息:

async def custom_extraction():
    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(
            url="https://news.ycombinator.com/",
            extraction_strategy=CustomExtractionStrategy(
                prompt="从新闻列表中提取每个帖子的标题、分数和评论数"
            )
        )

        # 获取结构化提取结果
        extracted_data = result.extracted_content
        print("提取的新闻数据:", extracted_data)

这种基于自然语言的提取方式非常强大,因为它不需要你了解网页的具体结构,只需要描述你想要的输出格式即可。

多页面爬取与批量处理

在实际应用中,单页面爬取往往不够用,我们需要处理大量的 URL。crawl4ai 提供了便捷的多页面爬取支持:

async def batch_crawl():
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
        "https://example.com/page3",
        # ... 更多 URL
    ]

    async with AsyncWebCrawler() as crawler:
        # 批量爬取
        results = await crawler.arun_many(urls=urls)

        # 处理每个结果
        for i, result in enumerate(results):
            if result.success:
                print(f"页面 {i+1} 爬取成功: {len(result.markdown)} 字符")
                # 进一步处理 result.markdown
            else:
                print(f"页面 {i+1} 爬取失败: {result.error}")

arun_many 方法会自动处理并发控制,不会同时发起过多请求,既保证了效率,又避免了对目标服务器造成过大压力。

高级配置选项

crawl4ai 提供了丰富的配置选项,让你可以根据具体需求调整爬取行为:

from crawl4ai import CrawlerRunConfig

# 创建自定义配置
config = CrawlerRunConfig(
    # 页面等待策略
    wait_for="css:.main-content",  # 等待特定元素加载
    delay_before_return_html=2,    # 等待额外秒数

    # 浏览器设置
    headless=True,                 # 是否无头模式
    viewport_width=1920,           # 视口宽度
    viewport_height=1080,          # 视口高度

    # 提取设置
    extraction_strategy=SomeStrategy(),  # 自定义提取策略
    word_count_threshold=10,      # 最小词数阈值

    # 反爬设置
    simulate_user=True,           # 模拟真实用户行为
    ignore_robots_txt=False,      # 是否遵守 robots.txt

    # 内容过滤
    remove_forms=True,            # 移除表单元素
    remove_popups=True,           # 移除弹窗
)

async with AsyncWebCrawler(config=config) as crawler:
    result = await crawler.arun(url="https://example.com")

这些配置选项让你能够精细控制爬取过程的各个环节,无论是提高提取质量还是处理反爬机制,都能找到合适的参数进行调整。

逐步实战教程

理论学习固然重要,但实践才是真正掌握技能的关键。在这一部分,我们将通过一系列实战案例,从简单到复杂,逐步掌握 crawl4ai 的各种使用技巧。

基础案例:爬取单个网页

让我们从最简单的案例开始:爬取一个普通网页的全部内容。这个案例适合初学者,用于熟悉基本 API 的使用方式。

"""
crawl4ai 基础使用示例
本例展示了爬取单个网页的基本流程
"""

import asyncio
from crawl4ai import AsyncWebCrawler

async def basic_crawl_example():
    """
    演示最基本的网页爬取功能
    """
    print("=" * 50)
    print("开始基础爬取示例")
    print("=" * 50)

    # 目标 URL - 使用一个公开的测试页面
    target_url = "https://example.com"

    # 创建爬虫实例并执行爬取
    async with AsyncWebCrawler() as crawler:
        result = await crawler.arun(url=target_url)

        # 检查爬取是否成功
        if result.success:
            print("\n✓ 爬取成功!")
            print(f"URL: {target_url}")
            print(f"状态: {result.status_code}")
            print(f"内容长度: {len(result.html)} 字符")
            print(f"Markdown 长度: {len(result.markdown)} 字符")

            # 显示元数据
            if result.metadata:
                print("\n页面元数据:")
                for key, value in result.metadata.items():
                    print(f"  {key}: {value}")

            # 显示部分内容预览
            print("\n内容预览(前1000字符):")
            print("-" * 40)
            print(result.markdown[:1000])
            print("-" * 40)
        else:
            print(f"\n✗ 爬取失败: {result.error}")

# 运行示例
if __name__ == "__main__":
    asyncio.run(basic_crawl_example())

保存这段代码为 basic_crawl.py,然后在终端中运行:

python basic_crawl.py

你应该能看到类似如下的输出:

==================================================
开始基础爬取示例
================================================== 爬取成功!
URL: https://example.com
状态: 200
内容长度: 648 字符
Markdown 长度: 312 字符

页面元数据:
  title: Example Domain
  description: 

内容预览(前1000字符):
----------------------------------------
# Example Domain

This domain is for use in illustrative examples in documents. You may use this
domain in literature without prior coordination or asking for permission.

[More information...](https://www.iana.org/domains/example)
----------------------------------------

这个简单的例子展示了 crawl4ai 的核心使用模式:创建爬虫实例,调用 arun 方法执行爬取,然后处理返回结果。接下来,让我们尝试一些更有趣的任务。

进阶案例:提取结构化数据

在这个案例中,我们尝试从网页中提取结构化的数据。这在需要采集特定信息(如产品价格、产品规格、新闻详情等)时非常有用。

"""
使用 crawl4ai 提取结构化数据
本例展示了如何从网页中提取特定的信息
"""

import asyncio
import json
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.extraction_strategy import LLMExtractionStrategy

# 定义提取提示词
PRODUCT_EXTRACTION_PROMPT = """
从网页中提取所有产品信息,包括:
1. 产品名称
2. 产品价格(注意提取货币单位)
3. 产品描述或简介
4. 产品评分(如果有)
5. 可用库存状态

以 JSON 格式返回数据,格式如下:
{
    "products": [
        {
            "name": "产品名称",
            "price": "价格",
            "description": "描述",
            "rating": "评分",
            "availability": "库存状态"
        }
    ]
}

如果某个字段不存在,在 JSON 中使用 null。
"""

async def extract_structured_data():
    """
    从模拟电商页面提取产品数据
    """
    print("=" * 50)
    print("开始结构化数据提取示例")
    print("=" * 50)

    # 创建 LLM 提取策略
    extraction_strategy = LLMExtractionStrategy(
        prompt=PRODUCT_EXTRACTION_PROMPT,
        extraction_type="schema",
        model="openai/gpt-4o",
        api_token=None  # 如果需要使用 OpenAI API,在此填入密钥
    )

    # 创建爬取配置
    config = CrawlerRunConfig(
        extraction_strategy=extraction_strategy,
        headless=True,
        verbose=True
    )

    # 示例 URL - 你可以替换为真实的电商页面
    # 这里使用 example.com 作为演示
    target_url = "https://example.com"

    async with AsyncWebCrawler(config=config) as crawler:
        result = await crawler.arun(url=target_url)

        if result.success:
            print("\n✓ 提取完成!")

            # 尝试解析提取的内容
            try:
                if result.extracted_content:
                    data = json.loads(result.extracted_content)
                    print("\n提取的数据:")
                    print(json.dumps(data, indent=2, ensure_ascii=False))
                else:
                    print("\n未提取到结构化数据")
                    print("原始内容:", result.markdown[:500])
            except json.JSONDecodeError:
                print("\n提取的内容不是有效的 JSON 格式")
                print("原始内容:", result.extracted_content[:500] if result.extracted_content else "None")
        else:
            print(f"\n✗ 操作失败: {result.error}")

# 运行示例
if __name__ == "__main__":
    asyncio.run(extract_structured_data())

这个例子展示了使用 LLM 提取策略来获取结构化数据的能力。你需要提供清晰的提取提示词,描述清楚需要提取的字段以及期望的输出格式。crawl4ai 会调用 AI 模型来理解页面内容并按照你的要求提取信息。

进阶案例:处理 JavaScript 渲染的页面

现代网站大量使用 JavaScript 来动态加载内容。对于这类网站,传统的 HTTP 请求方式无法获取到完整数据。crawl4ai 内置了浏览器自动化能力,可以完美处理这类情况。

"""
使用 crawl4ai 处理 JavaScript 渲染页面
本例展示了如何等待动态内容加载完成
"""

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

async def crawl_dynamic_page():
    """
    爬取包含 JavaScript 动态内容的页面
    """
    print("=" * 50)
    print("开始动态页面爬取示例")
    print("=" * 50)

    # 创建配置:等待动态内容加载
    config = CrawlerRunConfig(
        # 等待特定元素出现后再提取
        # 这对于 SPA(单页应用)特别有用
        wait_for="""css:#content .article,
                     css:.main-content,
                     xpath://div[@class="content"]""",

        # 或者等待一定时间(秒)
        # 建议同时设置延迟,以防元素选择器未匹配
        delay_before_return_html=3,

        # 页面滚动设置 - 对于无限滚动的页面很有用
        scroll_delay=0.5,        # 滚动间隔(秒)
        max_scrolls=5,           # 最大滚动次数

        # JavaScript 执行
        js_code=[
            # 可以注入自定义 JavaScript
            # 例如:模拟用户行为
            "window.scrollTo(0, document.body.scrollHeight);",
            "await new Promise(r => setTimeout(r, 1000));"  # 等待加载
        ],

        # 禁用 JavaScript(根据需要)
        # 如果页面不需要 JS,可以禁用以提高速度
        # page_timeout=30000  # 页面超时时间(毫秒)
    )

    # 目标 URL - 替换为你想测试的动态页面
    # 例如:新闻网站的无限滚动页面、社交媒体等
    target_url = "https://example.com"

    async with AsyncWebCrawler(config=config) as crawler:
        result = await crawler.arun(url=target_url)

        if result.success:
            print("\n✓ 动态内容爬取成功!")
            print(f"提取的内容长度: {len(result.markdown)} 字符")
            print("\n内容预览:")
            print("-" * 40)
            print(result.markdown[:1500])
            print("-" * 40)

            # 分析提取的媒体资源
            if result.media:
                print("\n提取的媒体资源:")
                for media_type, items in result.media.items():
                    print(f"  {media_type}: {len(items)} 个")
        else:
            print(f"\n✗ 爬取失败: {result.error}")

if __name__ == "__main__":
    asyncio.run(crawl_dynamic_page())

这个配置展示了几个处理动态页面的关键技巧:wait_for 参数用于等待特定元素加载完成,scroll 相关参数用于处理无限滚动,js_code 则允许你注入自定义 JavaScript 来完成更复杂的操作,如点击按钮、填写表单等。

进阶案例:批量爬取与数据保存

在实际项目中,我们通常需要从多个页面采集数据,并将其保存到本地或数据库中。这个案例展示了如何实现批量爬取和基本的数据持久化。

"""
crawl4ai 批量爬取示例
本例展示了如何批量爬取多个 URL 并保存数据
"""

import asyncio
import json
import os
from datetime import datetime
from pathlib import Path
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

class WebCrawlerPipeline:
    """
    网页爬取管道类
    封装了批量爬取和数据保存的逻辑
    """

    def __init__(self, output_dir="crawl_output"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.results = []

    def save_to_json(self, data, filename):
        """保存数据到 JSON 文件"""
        filepath = self.output_dir / filename
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        print(f"✓ 数据已保存到: {filepath}")

    def save_to_markdown(self, content, filename):
        """保存内容到 Markdown 文件"""
        filepath = self.output_dir / filename
        with open(filepath, 'w', encoding='utf-8') as f:
            f.write(content)
        print(f"✓ 内容已保存到: {filepath}")

async def batch_crawl_example():
    """
    演示批量爬取多个 URL
    """
    print("=" * 50)
    print("开始批量爬取示例")
    print("=" * 50)

    # 初始化爬取管道
    pipeline = WebCrawlerPipeline(output_dir="batch_output")

    # 定义要爬取的 URL 列表
    urls_to_crawl = [
        "https://example.com",
        "https://example.org",
        "https://example.net",
        # 在实际使用中,你可以添加更多 URL
        # 可以从文件、数据库或 API 获取 URL 列表
    ]

    print(f"\n计划爬取 {len(urls_to_crawl)} 个页面...")

    # 创建爬虫配置
    config = CrawlerRunConfig(
        headless=True,
        verbose=False,
        # 提取所有链接
        extract_links=True,
        # 页面加载超时
        page_timeout=30000,
    )

    # 执行批量爬取
    async with AsyncWebCrawler(config=config) as crawler:
        # arun_many 会自动处理并发和速率限制
        results = await crawler.arun_many(urls=urls_to_crawl)

    # 处理爬取结果
    successful = 0
    failed = 0
    all_data = []

    for i, result in enumerate(results):
        if result.success:
            successful += 1

            # 构建结果数据
            page_data = {
                "url": urls_to_crawl[i],
                "status_code": result.status_code,
                "title": result.metadata.get("title", "N/A") if result.metadata else "N/A",
                "content_length": len(result.markdown),
                "markdown": result.markdown,
                "crawled_at": datetime.now().isoformat(),
            }

            all_data.append(page_data)

            # 为每个成功的页面保存单独的 Markdown 文件
            filename = f"page_{i+1}_{result.metadata.get('title', 'untitled')[:20].replace(' ', '_')}.md"
            filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.'))
            pipeline.save_to_markdown(result.markdown, filename)

            print(f"✓ [{i+1}/{len(urls_to_crawl)}] {urls_to_crawl[i]}")
        else:
            failed += 1
            print(f"✗ [{i+1}/{len(urls_to_crawl)}] {urls_to_crawl[i]} - {result.error}")

    # 保存汇总数据
    pipeline.save_to_json(all_data, "crawl_summary.json")

    # 打印统计信息
    print("\n" + "=" * 50)
    print("爬取完成!统计信息:")
    print(f"  成功: {successful}/{len(urls_to_crawl)}")
    print(f"  失败: {failed}/{len(urls_to_crawl)}")
    print(f"  输出目录: {pipeline.output_dir}")
    print("=" * 50)

if __name__ == "__main__":
    asyncio.run(batch_crawl_example())

这个案例展示了一个完整的爬取管道,包括 URL 管理、批量爬取、错误处理、数据保存等功能。在实际应用中,你可以根据需要扩展这个管道,例如添加数据库存储、进度断点续传、分布式爬取等功能。

进阶案例:高级提取策略

crawl4ai 提供了多种提取策略,适用于不同的场景。这一案例将详细介绍几种常用策略的使用方法。

"""
crawl4ai 高级提取策略示例
本例展示了不同的提取策略及其适用场景
"""

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.extraction_strategy import (
    LLMExtractionStrategy,      # 基于大语言模型的提取
    CosineStrategy,             # 基于语义相似度的提取
    NoExtractionStrategy,       # 不进行额外提取
    RegexExtractionStrategy,    # 基于正则表达式的提取
    JsonCssExtractionStrategy   # 基于 CSS 选择器的提取
)

async def advanced_extraction_demo():
    """
    演示各种提取策略
    """
    print("=" * 50)
    print("高级提取策略示例")
    print("=" * 50)

    # 测试 URL
    test_url = "https://en.wikipedia.org/wiki/Web_scraping"

    # 策略 1: CSS 选择器提取
    # 适用于目标元素位置明确的情况
    async def css_extraction():
        print("\n--- CSS 选择器提取 ---")

        strategy = JsonCssExtractionStrategy(
            css_selector="p",  # 提取所有段落
            query="text",      # 获取文本内容
        )

        config = CrawlerRunConfig(extraction_strategy=strategy)

        async with AsyncWebCrawler(config=config) as crawler:
            result = await crawler.arun(url=test_url)

            if result.success and result.extracted_content:
                print("提取到 {} 个段落".format(
                    len(result.extracted_content) if isinstance(result.extracted_content, list) else 0
                ))
                print("内容预览:", str(result.extracted_content)[:200])

    # 策略 2: 正则表达式提取
    # 适用于模式固定的文本内容
    async def regex_extraction():
        print("\n--- 正则表达式提取 ---")

        # 提取所有 URL
        strategy = RegexExtractionStrategy(
            pattern=r'https?://[^\s<>"{}|\\^`\[\]]+',
            is_json=False
        )

        config = CrawlerRunConfig(extraction_strategy=strategy)

        async with AsyncWebCrawler(config=config) as crawler:
            result = await crawler.arun(url=test_url)

            if result.success and result.extracted_content:
                urls = result.extracted_content if isinstance(result.extracted_content, list) else []
                print(f"提取到 {len(urls)} 个 URL")
                if urls:
                    print("示例 URL:", urls[0])

    # 策略 3: LLM 智能提取
    # 适用于需要理解语义的内容提取
    async def llm_extraction():
        print("\n--- LLM 智能提取 ---")

        strategy = LLMExtractionStrategy(
            prompt="""分析这篇 Wikipedia 文章,提取以下信息:
            1. 文章主题
            2. 主要内容摘要(3-5句话)
            3. 关键概念(列出5-10个)

            以结构化格式返回结果。""",
            extraction_type="schema",
            model="openai/gpt-4o-mini"
        )

        config = CrawlerRunConfig(
            extraction_strategy=strategy,
            verbose=False
        )

        async with AsyncWebCrawler(config=config) as crawler:
            result = await crawler.arun(url=test_url)

            if result.success and result.extracted_content:
                print("LLM 提取结果:")
                print(result.extracted_content)

    # 执行所有策略演示
    try:
        await css_extraction()
    except Exception as e:
        print(f"CSS 提取出错: {e}")

    try:
        await regex_extraction()
    except Exception as e:
        print(f"正则提取出错: {e}")

    try:
        await llm_extraction()
    except Exception as e:
        print(f"LLM 提取出错: {e}")

if __name__ == "__main__":
    asyncio.run(advanced_extraction_demo())

这个案例展示了 crawl4ai 支持的多种提取策略:CSS 选择器适合结构化的网页内容,正则表达式适合提取特定格式的数据,LLM 智能提取则能够理解语义并提取更复杂的信息。根据不同的需求,你可以选择最合适的策略或组合使用。

常见使用场景

了解了 crawl4ai 的核心功能和高级特性后,让我们看看它在实际工作中有哪些典型的应用场景。

场景一:AI 训练数据采集

训练高质量的大语言模型需要海量、多样、优质的文本数据。crawl4ai 可以帮助你从互联网上高效采集各类文本内容:

"""
使用 crawl4ai 采集 AI 训练数据
"""

import asyncio
import json
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

class TrainingDataCollector:
    """
    训练数据采集器
    用于从多个来源收集高质量文本内容
    """

    def __init__(self):
        self.data_sources = {
            "技术博客": [
                "https://example-tech-blog.com/python-best-practices",
                "https://example-tech-blog.com/machine-learning-intro",
                # 添加更多博客 URL
            ],
            "新闻资讯": [
                "https://example-news.com/tech/2024",
                # 添加更多新闻 URL
            ],
            "技术文档": [
                "https://docs.example-framework.com/getting-started",
                # 添加更多文档 URL
            ],
        }
        self.collected_data = []

    async def collect(self):
        """执行数据采集"""
        config = CrawlerRunConfig(
            # 只提取主要内容,过滤噪音
            word_count_threshold=50,  # 忽略短内容
            remove_ads=True,
            remove_popups=True,
            remove_forms=True,
        )

        async with AsyncWebCrawler(config=config) as crawler:
            for category, urls in self.data_sources.items():
                print(f"\n采集 {category}...")

                results = await crawler.arun_many(urls=urls)

                for result in results:
                    if result.success:
                        # 清理和格式化数据
                        cleaned_content = {
                            "source": result.url,
                            "category": category,
                            "content": result.markdown,
                            "length": len(result.markdown),
                        }
                        self.collected_data.append(cleaned_content)

        return self.collected_data

    def save(self, filepath="training_data.jsonl"):
        """保存为 JSONL 格式(每行一个 JSON 对象)"""
        with open(filepath, 'w', encoding='utf-8') as f:
            for item in self.collected_data:
                f.write(json.dumps(item, ensure_ascii=False) + '\n')
        print(f"已保存 {len(self.collected_data)} 条数据到 {filepath}")

async def main():
    collector = TrainingDataCollector()
    await collector.collect()
    collector.save()

if __name__ == "__main__":
    asyncio.run(main())

场景二:竞品监控与价格追踪

电商领域经常需要监控竞品价格和商品信息。crawl4ai 可以自动化这个过程:

"""
竞品价格监控示例
"""

import asyncio
import json
from datetime import datetime
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.extraction_strategy import LLMExtractionStrategy

async def monitor_competitor_prices():
    """
    监控竞品价格变化
    """

    EXTRACTION_PROMPT = """
    从产品列表页面提取所有产品信息:
    - 产品名称
    - 当前价格(原价和促销价)
    - 折扣比例(如果有)
    - 库存状态
    - 用户评分

    以 JSON 数组格式返回。
    """

    competitor_urls = [
        "https://competitor1.com/products/electronics",
        "https://competitor2.com/category/smartphones",
        # 添加更多竞品页面
    ]

    strategy = LLMExtractionStrategy(
        prompt=EXTRACTION_PROMPT,
        extraction_type="schema"
    )

    config = CrawlerRunConfig(
        extraction_strategy=strategy,
        delay_before_return_html=2,
        user_agent="Price Monitor Bot"  # 自定义 User-Agent
    )

    all_products = []

    async with AsyncWebCrawler(config=config) as crawler:
        results = await crawler.arun_many(urls=competitor_urls)

        for result in results:
            if result.success and result.extracted_content:
                try:
                    products = json.loads(result.extracted_content)
                    for product in products:
                        product["source_url"] = result.url
                        product["crawled_at"] = datetime.now().isoformat()
                    all_products.extend(products)
                except json.JSONDecodeError:
                    print(f"解析失败: {result.url}")

    # 生成监控报告
    report = {
        "generated_at": datetime.now().isoformat(),
        "total_products": len(all_products),
        "products": all_products
    }

    print(f"\n监控完成!共采集 {len(all_products)} 个产品信息")
    print(json.dumps(report, indent=2, ensure_ascii=False))

if __name__ == "__main__":
    asyncio.run(monitor_competitor_prices())

场景三:内容聚合平台

构建内容聚合平台需要从多个来源采集并整合内容。crawl4ai 提供了高效的方式来完成这项任务:

"""
内容聚合平台数据采集模块
"""

import asyncio
import json
from pathlib import Path
from datetime import datetime
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig
from crawl4ai.content_filter import BM25ContentFilter

class ContentAggregator:
    """
    内容聚合器
    从多个来源采集文章并整合
    """

    def __init__(self, keywords=None):
        """
        初始化聚合器

        参数:
            keywords: 关注的关键字列表,用于内容过滤
        """
        self.keywords = keywords or []
        self.articles = []

    async def fetch_article(self, url, source_name):
        """获取单篇文章"""
        config = CrawlerRunConfig(
            word_count_threshold=100,  # 过滤短内容
            content_filter=BM25ContentFilter(
                user_query=" ".join(self.keywords) if self.keywords else None
            ),
            verbose=False
        )

        async with AsyncWebCrawler(config=config) as crawler:
            result = await crawler.arun(url=url)

            if result.success:
                return {
                    "title": result.metadata.get("title", "Untitled") if result.metadata else "Untitled",
                    "source": source_name,
                    "url": url,
                    "content": result.markdown,
                    "published": result.metadata.get("datePublished") if result.metadata else None,
                    "fetched_at": datetime.now().isoformat()
                }
        return None

    async def aggregate_from_source(self, source_url, source_name, article_links):
        """从单个来源聚合多篇文章"""
        print(f"从 {source_name} 聚合内容...")

        tasks = []
        for link in article_links[:10]:  # 限制每个来源采集数量
            tasks.append(self.fetch_article(link, source_name))

        results = await asyncio.gather(*tasks, return_exceptions=True)

        for result in results:
            if isinstance(result, dict):
                self.articles.append(result)
                print(f"  ✓ {result['title'][:50]}")
            else:
                print(f"  ✗ 错误: {result}")

    async def run(self, sources):
        """
        执行聚合任务

        参数:
            sources: 字典,键为来源名称,值为 (首页URL, 文章链接列表)
        """
        for source_name, (homepage, article_links) in sources.items():
            await self.aggregate_from_source(homepage, source_name, article_links)

    def save_to_files(self, output_dir="aggregated_content"):
        """保存聚合的内容"""
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)

        # 保存索引文件
        index = []

        for i, article in enumerate(self.articles):
            filename = f"article_{i+1}_{article['title'][:30].replace(' ', '_').replace('/', '-')}.md"
            filename = "".join(c for c in filename if c.isalnum() or c in ('_', '-', '.'))

            filepath = output_path / filename

            # 保存文章内容
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(f"# {article['title']}\n\n")
                f.write(f"**来源**: [{article['source']}]({article['url']})\n\n")
                f.write(f"**采集时间**: {article['fetched_at']}\n\n")
                f.write("---\n\n")
                f.write(article['content'])

            index.append({
                "file": filename,
                "title": article['title'],
                "source": article['source'],
                "url": article['url']
            })

        # 保存索引
        with open(output_path / "index.json", 'w', encoding='utf-8') as f:
            json.dump(index, f, indent=2, ensure_ascii=False)

        print(f"\n聚合完成!共 {len(self.articles)} 篇文章")
        print(f"内容保存在: {output_path}")

async def main():
    # 定义数据源
    sources = {
        "技术博客": (
            "https://example-blog.com",
            [
                "https://example-blog.com/post/python-tips",
                "https://example-blog.com/post/docker-guide",
                # 添加更多文章链接
            ]
        ),
        # 添加更多来源...
    }

    aggregator = ContentAggregator(keywords=["python", "教程", "技术"])
    await aggregator.run(sources)
    aggregator.save_to_files()

if __name__ == "__main__":
    asyncio.run(main())

这些场景展示了 crawl4ai 在不同领域的应用方式。你可以根据自己的实际需求,参考这些模板来构建适合你的解决方案。

技巧与最佳实践

在实际使用 crawl4ai 的过程中,掌握一些技巧和最佳实践可以大大提高效率和成功率。

速率控制与礼貌爬取

大规模爬取时,合理的速率控制非常重要,既能保护目标服务器,也能避免你的 IP 被封禁:

"""
速率控制最佳实践
"""

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

class RateLimitedCrawler:
    """
    带速率控制的爬虫
    """

    def __init__(self, requests_per_minute=30):
        self.delay = 60.0 / requests_per_minute  # 请求间隔
        self.semaphore = asyncio.Semaphore(5)    # 最大并发数
        self.last_request_time = 0

    async def crawl_with_delay(self, url):
        """带延迟的爬取"""
        async with self.semaphore:  # 控制并发
            # 计算需要等待的时间
            elapsed = asyncio.get_event_loop().time() - self.last_request_time
            if elapsed < self.delay:
                await asyncio.sleep(self.delay - elapsed)

            self.last_request_time = asyncio.get_event_loop().time()

            async with AsyncWebCrawler() as crawler:
                return await crawler.arun(url=url)

    async def crawl_many(self, urls):
        """批量爬取(带速率控制)"""
        tasks = [self.crawl_with_delay(url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    crawler = RateLimitedCrawler(requests_per_minute=30)  # 每分钟30个请求

    urls = [f"https://example.com/page{i}" for i in range(100)]

    # 这将以每分钟30个请求的速率爬取所有URL
    results = await crawler.crawl_many(urls)

错误处理与重试机制

网络请求不可避免会遇到各种错误。健壮的错误处理和重试机制可以大大提高爬取的可靠性:

"""
带重试机制的爬虫实现
"""

import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

class RobustCrawler:
    """
    带重试机制的爬虫
    """

    def __init__(self, max_retries=3, base_delay=1):
        self.max_retries = max_retries
        self.base_delay = base_delay

    async def crawl_with_retry(self, url):
        """带指数退避重试的爬取"""
        last_error = None

        for attempt in range(self.max_retries):
            try:
                async with AsyncWebCrawler() as crawler:
                    result = await crawler.arun(url=url)

                    if result.success:
                        return result
                    else:
                        # 记录非成功响应
                        last_error = result.error
                        print(f"尝试 {attempt + 1} 失败: {result.status_code}")

            except Exception as e:
                last_error = str(e)
                print(f"尝试 {attempt + 1} 异常: {e}")

            # 指数退避等待
            if attempt < self.max_retries - 1:
                delay = self.base_delay * (2 ** attempt)
                print(f"等待 {delay} 秒后重试...")
                await asyncio.sleep(delay)

        # 所有重试都失败
        print(f"URL {url} 爬取失败,已重试 {self.max_retries} 次")
        return None

    async def crawl_many_robust(self, urls):
        """批量爬取(带重试)"""
        tasks = [self.crawl_with_retry(url) for url in urls]
        return await asyncio.gather(*tasks)

async def main():
    crawler = RobustCrawler(max_retries=3, base_delay=2)

    urls = [
        "https://example.com/reliable",
        "https://example.com/slow",
        "https://example.com/may-fail",
    ]

    results = await crawler.crawl_many_robust(urls)

    success_count = sum(1 for r in results if r is not None)
    print(f"\n成功: {success_count}/{len(urls)}")

配置优化建议

根据不同的使用场景,合理配置可以显著提升效果:

"""
crawl4ai 配置优化建议
"""

# 场景 1: 快速内容预览
# 适用于只需要页面主要内容,不需要等待所有资源加载
fast_config = CrawlerRunConfig(
    page_timeout=10000,              # 较短超时
    delay_before_return_html=0.5,    # 短暂等待
    headless=True,
    verbose=False
)

# 场景 2: 深度内容提取
# 适用于需要完整渲染和所有动态内容
deep_config = CrawlerRunConfig(
    page_timeout=60000,             # 较长超时
    delay_before_return_html=3,      # 充分等待
    scroll_delay=1,                  # 滚动间隔
    max_scrolls=10,                  # 多次滚动
    wait_for="css:.content-loaded", # 等待特定元素
    headless=True
)

# 场景 3: 高质量数据提取
# 适用于需要 AI 辅助的语义提取
quality_config = CrawlerRunConfig(
    word_count_threshold=50,        # 过滤短内容
    remove_ads=True,
    remove_popups=True,
    remove_forms=True,
    content_filter="cosine",        # 启用内容过滤
    verbose=True
)

# 场景 4: 大规模爬取
# 适用于需要高效批量处理的场景
scale_config = CrawlerRunConfig(
    headless=True,
    page_timeout=15000,
    delay_before_return_html=1,
    max_scrolls=3,
    verbose=False,
    # 可以添加代理配置
    # proxy={"http": "http://proxy:8080"}
)

使用代理和轮换 User-Agent

大规模爬取时,使用代理池和轮换 User-Agent 可以有效避免被封禁:

"""
使用代理和轮换 User-Agent
"""

import random
import asyncio
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig

# 代理池(示例)
PROXY_POOL = [
    "http://proxy1:8080",
    "http://proxy2:8080",
    "http://proxy3:8080",
    # 添加更多代理
]

# User-Agent 池
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
]

class RotatingCrawler:
    """
    支持代理轮换和 User-Agent 轮换的爬虫
    """

    def __init__(self, use_proxy=True):
        self.use_proxy = use_proxy

    def get_random_config(self):
        """获取随机配置的爬取参数"""
        config_dict = {
            "headless": True,
            "user_agent": random.choice(USER_AGENTS),
        }

        if self.use_proxy and PROXY_POOL:
            config_dict["proxy"] = {
                "http": random.choice(PROXY_POOL),
                "https": random.choice(PROXY_POOL),
            }

        return CrawlerRunConfig(**config_dict)

    async def crawl(self, url):
        """使用随机配置爬取"""
        config = self.get_random_config()

        async with AsyncWebCrawler(config=config) as crawler:
            return await crawler.arun(url=url)

async def main():
    crawler = RotatingCrawler(use_proxy=True)

    urls = [f"https://example.com/page{i}" for i in range(50)]

    # 批量爬取,每个请求使用不同的代理和 User-Agent
    tasks = [crawler.crawl(url) for url in urls]
    results = await asyncio.gather(*tasks)

    success = sum(1 for r in results if r.success)
    print(f"成功率: {success}/{len(urls)}")

总结与相关资源

通过这篇教程,我们详细介绍了 crawl4ai 这个强大的 AI 网页爬取工具。让我来总结一下关键知识点,并提供进一步学习的资源。

核心要点回顾

首先,crawl4ai 将人工智能与网页爬取技术完美融合,提供了一种现代化、高效、易用的数据采集解决方案。它的核心优势包括:智能内容理解和提取、内置浏览器自动化处理 JavaScript 渲染、支持多种提取策略(CSS 选择器、正则表达式、LLM 语义提取等)、丰富的配置选项以及内置的反反爬机制。

在使用方面,最基本的模式是创建 AsyncWebCrawler 实例,然后调用 arun 方法执行爬取。返回的 result 对象包含了原始 HTML、Markdown 转换后的内容、元数据、提取的链接和媒体资源等丰富信息。

对于进阶使用,可以利用不同的提取策略来满足特定需求:Crawl4aiExtractionStrategy 用于智能提取、CSS 选择器适合结构化的页面、正则表达式适合模式固定的内容、LLM 提取则能够理解语义并提取复杂信息。

批量爬取时,arun_many 方法提供了便捷的多 URL 处理能力,配合信号量(Semaphore)可以实现并发控制,避免对目标服务器造成过大压力。

最佳实践方面,速率控制保护了目标服务器和你的 IP 安全,重试机制提高了爬取的可靠性,合理的配置优化可以平衡速度和效果,代理和 User-Agent 轮换则是大规模爬取的必备技巧。

crawl4ai 的应用场景非常广泛,包括但不限于 AI 训练数据采集、竞品监控、价格追踪、内容聚合、舆情分析、市场调研等。无论你是研究人员、工程师还是产品经理,都能从中受益。

相关 AI 项目推荐

如果你对网页数据采集和 AI 技术的结合感兴趣,以下项目也值得关注:

Playwright 是微软开发的无头浏览器自动化工具,crawl4ai 的浏览器功能就建立在 Playwright 基础之上。深入学习 Playwright 可以帮助你更好地理解和使用 crawl4ai 的高级特性。

Scrapy 是 Python 生态中最成熟的传统爬虫框架,擅长大规模、结构化的网页爬取任务。如果你需要处理非常大量的 URL 或者对爬取效率有极高要求,Scrapy 是很好的选择。

LangChain 提供了一系列用于构建 LLM 应用的工具,包括文档加载器(Document Loaders),其中就包含了对 crawl4ai 的集成。将 crawl4ai 与 LangChain 结合,可以方便地为 LLM 提供网页上下文。

Hugging Face Transformers 是目前最流行的开源 ML 库之一,提供了大量的预训练模型。你可以用 crawl4ai 采集的数据来微调这些模型,构建更专业的 AI 应用。

Beautiful Soup 专注于 HTML 解析,虽然功能不如 crawl4ai 强大,但对于简单的静态页面抓取任务,它仍然是一个轻量级的选择。

Selenium 是另一个流行的浏览器自动化工具,适合需要模拟复杂用户行为的场景。如果你需要执行登录、填写表单等操作,Selenium 可以作为补充工具。

学习资源

关于进一步学习,你可以通过以下方式获取帮助和最新信息:

GitHub 仓库(unclecode/crawl4ai)是项目的核心资源,包含完整的源代码、文档和示例。建议仔细阅读 README 文件和示例代码。

官方文档提供了 API 的详细说明和更多的使用案例,是深入学习的重要参考资料。

项目的问题反馈区(Issues)不仅可以帮助你解决遇到的问题,还能了解其他用户的最佳实践和踩坑经历。

社区讨论区(如 Discord、GitHub Discussions)可以与其他用户交流心得,获取帮助。

最后,建议在学习和使用过程中,多动手实践。尝试爬取不同的网站、尝试不同的配置选项、尝试解决实际问题——这些都是加深理解、提升技能的最佳方式。

希望这篇教程对你学习 crawl4ai 有所帮助。祝你爬取愉快!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注