别再手写爬虫了!Crawlee 才是现代网页抓取与自动化任务的最优解

别再手写爬虫了!Crawlee 才是现代网页抓取与自动化任务的最优解

别再手写爬虫了!Crawlee 才是现代网页抓取与自动化任务的最优解


前言

如果你曾经尝试过编写网页爬虫,一定会遇到这些令人头疼的问题:网站反爬机制越来越严格,请求频繁被拦截;页面是 JavaScript 动态渲染的,传统 requests 加 BeautifulSoup 的组合完全失效;好不容易爬完数据,代码结构却乱成一团,后续维护困难重重;好不容易在本地跑通的脚本,一部署到服务器就各种环境问题……

今天要介绍的 Crawlee,可能是解决这些痛点的终极方案。这是一个由知名网页数据平台 Apify 维护的开源项目,专门用于构建可靠的网页爬虫和浏览器自动化任务。它不仅仅是一个爬虫库,更是一套完整的解决方案,涵盖了从请求调度、数据提取到存储输出的全流程。

在这篇文章中,我将带你从零开始深入了解 Crawlee,通过大量实战代码演示如何用它来应对各种网页抓取场景。无论你是想抓取电商网站的商品信息、采集社交媒体的公开数据,还是需要自动化执行网页操作,Crawlee 都能成为你的得力助手。


为什么值得关注:Crawlee 的核心价值

从痛点出发理解 Crawlee 的设计哲学

传统的网页爬虫开发往往面临一个悖论:简单易用的工具(如 Scrapy)功能有限,而功能强大的方案(如 Playwright)又需要编写大量样板代码。Crawlee 的设计目标就是在这个频谱上找到一个最佳平衡点,让开发者既能享受简洁的 API,又能获得企业级的可靠性。

传统的爬虫开发模式存在几个根本性问题。首先是请求管理的不完善——大多数爬虫库只提供简单的请求发送功能,缺乏智能的重试机制、请求去重和优先级调度。其次是对反爬策略的应对不足——当网站要求登录、验证或者使用复杂的 JavaScript 渲染时,开发者往往需要引入额外的工具和配置。最后是数据流处理的混乱——爬取、解析、清洗、存储这些环节经常被混在一起,导致代码难以测试和维护。

Crawlee 的设计理念正好针对这些问题。它采用了事件驱动和管道式的架构,将爬虫的各个功能模块解耦,让每个环节都可以独立配置和扩展。同时,它内置了大量实用的功能,比如自动重试、智能代理轮换、请求去重、浏览器指纹管理等,这些在传统方案中需要大量自定义代码才能实现的功能,在 Crawlee 中只需要几行配置就能完成。

技术栈与生态优势

Crawlee 目前同时支持 Python 和 Node.js 两个主流后端语言,这对于不同技术背景的团队来说非常友好。Python 版本基于 asyncio 异步编程模型,能够高效处理大量并发请求;Node.js 版本则充分利用了 V8 引擎的性能优势,特别是在处理 JavaScript 渲染场景时表现出色。

作为一个活跃的开源项目,Crawlee 拥有完善的文档和社区支持。Apify 平台本身就是一个成熟的网页数据服务提供商,Crawlee 作为其开源核心,代表了团队多年实战经验的结晶。这意味着你在使用过程中遇到的问题,很可能有现成的解决方案或者已经被官方文档详细阐述过。

另一个值得关注的点是 Crawlee 与 Apify 生态的深度集成。如果你的爬虫项目最终需要部署和扩展到云端,可以无缝迁移到 Apify 平台,享受其提供的服务器less 基础设施、任务调度、结果存储等增值服务。这种渐进式的扩展能力,对于快速验证想法和小规模验证后的大规模部署都很有价值。

性能与可靠性的平衡

在爬虫领域,性能和可靠性往往是一对矛盾的需求。追求极致的抓取速度可能导致请求失败率上升、目标网站封禁风险增加;而过于保守的策略又会降低数据采集效率。Crawlee 通过智能的请求调度和自动重试机制,在这两个极端之间找到了良好的平衡点。

Crawlee 采用了保守的默认配置,包含了合理的并发限制、自动重试、智能延迟等策略,这些设计确保了你的爬虫在大多数网站上能够稳定运行而不会被轻易封禁。同时,所有的这些参数都是可配置的,当你对目标网站的行为模式有了充分了解后,可以根据实际情况进行调优,在保证稳定性的前提下最大化抓取效率。


环境搭建:快速启动 Crawlee

安装与依赖

在开始使用 Crawlee 之前,我们需要先搭建好开发环境。这里以 Python 版本为例进行介绍,因为 Python 在数据处理和爬虫领域有着广泛的应用基础。

# 使用 pip 安装 Crawlee 核心库
# pip install crawlee

# 如果需要处理 JavaScript 渲染的页面,还需要安装浏览器相关依赖
# pip install crawlee[playwright]

# 或者选择基于 Puppeteer 的版本
# pip install crawlee[chrome]

安装过程可能需要下载一些额外的二进制文件,特别是如果你选择了包含浏览器支持的版本。建议在国内环境下使用清华或者阿里云的 pip 镜像来加速下载过程。

# 配置 pip 镜像(如果需要)
# pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

安装完成后,我们可以通过一个简单的命令验证环境是否就绪:

import crawlee

# 检查版本信息
print(f"Crawlee 版本: {crawlee.__version__}")

# 检查可选依赖是否正常加载
try:
    from crawlee.browsers import PlaywrightBrowserPool
    print("Playwright 浏览器支持: 已安装")
except ImportError:
    print("Playwright 浏览器支持: 未安装")

try:
    from crawlee.http_clients import HttpxHttpClient
    print("HTTP 客户端: 已安装")
except ImportError:
    print("HTTP 客户端: 未安装")

创建第一个爬虫项目

为了保持代码的整洁和可维护性,建议为每个爬虫项目创建一个独立的目录结构。下面是一个推荐的初始化方式:

# project_structure.py

# 这是一个典型的 Crawlee 项目目录结构示例

project_structure = """
my_crawler_project/
├── src/
│   ├── __init__.py
│   ├── main.py              # 爬虫入口文件
│   ├── spider.py            # 爬虫主体逻辑
│   ├── pipelines.py         # 数据处理管道
│   └── models.py            # 数据模型定义
├── tests/
│   ├── __init__.py
│   ├── test_spider.py       # 单元测试
│   └── fixtures/            # 测试数据
├── data/
│   ├── input/               # 输入数据(如种子 URL 列表)
│   └── output/              # 爬取结果输出目录
├── logs/                    # 日志文件目录
├── requirements.txt        # 项目依赖
├── .env                     # 环境变量配置
└── README.md                # 项目说明文档
"""
print(project_structure)

# =====================================
# 以下是最基础的爬虫示例代码
# =====================================

basic_example = '''
from crawlee.beautifulsoup import BeautifulSoupCrawler, BeautifulSoupCrawlerRun

class MyFirstCrawler(BeautifulSoupCrawler):
    """一个最简单的 Crawlee 爬虫示例"""

    async def request_handler(self, context: BeautifulSoupCrawlerRun) -> None:
        """处理每个抓取到的页面"""
        # 获取当前页面的 URL
        url = context.request.url
        print(f"正在处理: {url}")

        # 获取页面标题
        title = context.page.title
        print(f"页面标题: {title}")

        # 提取一些文本内容作为示例
        paragraphs = context.page.find_all("p")
        for i, p in enumerate(paragraphs[:3]):
            print(f"段落 {i+1}: {p.get_text()[:100]}...")

        # 将结果添加到输出数据
        await context.push_data({
            "url": url,
            "title": title,
            "paragraphs": [p.get_text() for p in paragraphs[:5]]
        })

async def main():
    """爬虫入口函数"""
    # 创建爬虫实例
    crawler = MyFirstCrawler(
        max_request_retries=3,      # 最大重试次数
        max_concurrency=2,          # 最大并发数
    )

    # 添加待抓取的 URL
    await crawler.add_urls([
        "https://example.com",
        "https://example.org",
    ])

    # 启动爬虫
    await crawler.run()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())
'''
print(basic_example)

配置管理与环境变量

在生产环境中,爬虫的配置通常需要根据不同环境进行调整。Crawlee 支持通过环境变量和配置文件两种方式进行配置管理。下面展示如何设置常见的配置项:

# config_example.py

# =====================================
# Crawlee 常用配置项示例
# =====================================

# 基础配置
BASE_CONFIG = """
# 日志配置
LOG_LEVEL=INFO                    # 日志级别:DEBUG, INFO, WARNING, ERROR
LOG_FILE=./logs/crawler.log       # 日志文件路径

# 请求配置
MAX_REQUEST_RETRIES=3             # 请求失败后的最大重试次数
REQUEST_TIMEOUT_SECONDS=30        # 单个请求的超时时间(秒)
MAX_CONCURRENCY=5                 # 最大并发请求数

# 代理配置
# PROXY_URLS=                        # 代理服务器地址列表,每行一个
#   http://user:pass@proxy1.com:8080
#   http://user:pass@proxy2.com:8080

# 浏览器配置(如果使用无头模式)
HEADLESS=True                     # 是否使用无头浏览器模式
BROWSER_TIMEOUT=60000             # 浏览器操作超时时间(毫秒)
"""

# 使用 Python 的 dataclasses 定义配置类
from dataclasses import dataclass, field
from typing import List, Optional
import os

@dataclass
class CrawlerConfig:
    """爬虫配置类"""

    # 请求相关配置
    max_retries: int = 3
    request_timeout: int = 30
    max_concurrency: int = 5

    # 代理配置
    proxy_urls: List[str] = field(default_factory=list)
    rotate_proxies: bool = True

    # 浏览器配置
    headless: bool = True
    browser_timeout: int = 60000
    user_agent: Optional[str] = None

    # 存储配置
    output_directory: str = "./output"
    persist_storage: bool = True

    @classmethod
    def from_env(cls) -> "CrawlerConfig":
        """从环境变量加载配置"""
        return cls(
            max_retries=int(os.getenv("MAX_REQUEST_RETRIES", "3")),
            request_timeout=int(os.getenv("REQUEST_TIMEOUT_SECONDS", "30")),
            max_concurrency=int(os.getenv("MAX_CONCURRENCY", "5")),
            headless=os.getenv("HEADLESS", "true").lower() == "true",
        )

    def validate(self) -> bool:
        """验证配置的有效性"""
        if self.max_retries < 0:
            print("错误: max_retries 必须大于等于 0")
            return False
        if self.request_timeout <= 0:
            print("错误: request_timeout 必须大于 0")
            return False
        if self.max_concurrency <= 0:
            print("错误: max_concurrency 必须大于 0")
            return False
        return True

# =====================================
# 配置使用示例
# =====================================

def demo_config_usage():
    """演示配置的使用方式"""

    # 方式一:直接实例化
    config1 = CrawlerConfig(
        max_retries=5,
        max_concurrency=10,
    )

    # 方式二:从环境变量加载
    config2 = CrawlerConfig.from_env()

    # 验证配置
    print(f"配置1 是否有效: {config1.validate()}")
    print(f"配置2 是否有效: {config2.validate()}")

    # 显示当前配置
    print("当前爬虫配置:")
    print(f"  - 最大重试次数: {config2.max_retries}")
    print(f"  - 请求超时: {config2.request_timeout} 秒")
    print(f"  - 最大并发: {config2.max_concurrency}")
    print(f"  - 无头模式: {config2.headless}")

demo_config_usage()

核心功能详解:深入理解 Crawlee 的架构

请求调度系统

Crawlee 的请求调度系统是其核心功能之一,它负责管理所有待处理的请求、维护请求状态、处理去重和优先级排序。理解这个系统的工作原理,对于编写高效稳定的爬虫至关重要。

# request_scheduler.py

import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime

# =====================================
# 请求对象详解
# =====================================

@dataclass
class Request:
    """Crawlee 中的请求对象"""

    url: str                           # 请求的 URL(必需)
    unique_key: Optional[str] = None   # 用于去重的唯一标识
    method: str = "GET"                # HTTP 方法
    headers: Dict[str, str] = field(default_factory=dict)  # 请求头
    payload: Optional[bytes] = None    # 请求体(用于 POST 等方法)
    proxy_url: Optional[str] = None   # 指定使用的代理
    retry_count: int = 0              # 当前重试次数
    error_messages: List[str] = field(default_factory=list)  # 错误信息记录

    def __post_init__(self):
        """初始化后自动设置唯一标识"""
        if self.unique_key is None:
            # 默认使用 URL 作为唯一标识
            self.unique_key = f"{self.method}:{self.url}"

    def increment_retry(self) -> None:
        """增加重试计数"""
        self.retry_count += 1

    def add_error(self, error: str) -> None:
        """记录错误信息"""
        self.error_messages.append(f"[{datetime.now().isoformat()}] {error}")

# =====================================
# 请求工厂示例
# =====================================

class RequestFactory:
    """创建各类请求的工厂类"""

    @staticmethod
    def create_basic_request(url: str) -> Request:
        """创建一个基础请求"""
        return Request(url=url)

    @staticmethod
    def create_post_request(url: str, data: Dict[str, Any]) -> Request:
        """创建一个 POST 请求"""
        import json
        return Request(
            url=url,
            method="POST",
            headers={"Content-Type": "application/json"},
            payload=json.dumps(data).encode("utf-8")
        )

    @staticmethod
    def create_form_request(url: str, form_data: Dict[str, str]) -> Request:
        """创建一个表单提交请求"""
        from urllib.parse import urlencode
        return Request(
            url=url,
            method="POST",
            headers={"Content-Type": "application/x-www-form-urlencoded"},
            payload=urlencode(form_data).encode("utf-8")
        )

# =====================================
# 请求调度演示
# =====================================

async def demo_request_scheduling():
    """演示请求调度系统的工作方式"""

    print("=== 请求调度系统演示 ===\n")

    # 创建多个请求
    requests = [
        RequestFactory.create_basic_request("https://example.com/page1"),
        RequestFactory.create_basic_request("https://example.com/page2"),
        RequestFactory.create_post_request(
            "https://example.com/api/data",
            {"query": "search term"}
        ),
    ]

    # 添加元数据
    for i, req in enumerate(requests):
        req.headers["X-Request-ID"] = f"req-{i+1}"
        req.headers["User-Agent"] = (
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
            "AppleWebKit/537.36 (KHTML, like Gecko) "
            "Chrome/120.0.0.0 Safari/537.36"
        )

    # 显示请求信息
    print("待处理的请求列表:")
    for req in requests:
        print(f"  [{req.unique_key}] {req.method} {req.url}")
        print(f"    - 请求头: {list(req.headers.keys())}")
        print(f"    - 已重试次数: {req.retry_count}")

    # 模拟错误和重试流程
    print("\n模拟请求处理流程:")
    for req in requests:
        print(f"\n处理请求: {req.unique_key}")

        # 模拟第一次请求失败
        req.add_error("连接超时 - 504 Gateway Timeout")
        req.increment_retry()
        print(f"  记录错误: 连接超时")
        print(f"  当前重试次数: {req.retry_count}")

        # 模拟重试成功
        if req.retry_count <= 2:
            print(f"  重试成功!")
        else:
            print(f"  重试次数超限,标记为失败")

        # 显示错误历史
        print(f"  错误历史: {req.error_messages}")

asyncio.run(demo_request_scheduling())

数据处理管道

Crawlee 采用了管道模式来处理抓取到的数据。这种设计将数据提取、清洗、转换和存储等步骤解耦,使得每个步骤都可以独立编写、测试和复用。

# data_pipeline.py

from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from abc import ABC, abstractmethod
import json
import csv
from datetime import datetime
from pathlib import Path

# =====================================
# 数据模型定义
# =====================================

@dataclass
class ProductItem:
    """商品数据模型"""
    product_id: str
    title: str
    price: float
    currency: str = "USD"
    category: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    in_stock: bool = True
    scraped_at: str = ""
    source_url: str = ""

    def __post_init__(self):
        """初始化时自动添加时间戳"""
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            "product_id": self.product_id,
            "title": self.title,
            "price": self.price,
            "currency": self.currency,
            "category": self.category,
            "rating": self.rating,
            "review_count": self.review_count,
            "in_stock": self.in_stock,
            "scraped_at": self.scraped_at,
            "source_url": self.source_url,
        }

    def validate(self) -> List[str]:
        """验证数据完整性,返回错误信息列表"""
        errors = []
        if not self.product_id:
            errors.append("product_id 不能为空")
        if not self.title:
            errors.append("title 不能为空")
        if self.price < 0:
            errors.append(f"price 不能为负数: {self.price}")
        if self.rating is not None and not (0 <= self.rating <= 5):
            errors.append(f"rating 必须在 0-5 之间: {self.rating}")
        if self.review_count is not None and self.review_count < 0:
            errors.append(f"review_count 不能为负数: {self.review_count}")
        return errors

# =====================================
# 管道基类
# =====================================

class BasePipeline(ABC):
    """数据处理管道的基类"""

    @abstractmethod
    async def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        """处理单个数据项"""
        pass

    async def open(self) -> None:
        """管道打开时的初始化操作"""
        pass

    async def close(self) -> None:
        """管道关闭时的清理操作"""
        pass

# =====================================
# 具体管道实现
# =====================================

class ValidationPipeline(BasePipeline):
    """数据验证管道"""

    def __init__(self, required_fields: List[str]):
        self.required_fields = required_fields
        self.errors: List[Dict[str, Any]] = []

    async def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        """验证数据项"""
        # 检查必需字段
        missing_fields = [f for f in self.required_fields if f not in item or not item[f]]
        if missing_fields:
            self.errors.append({
                "item": item,
                "error": f"缺少必需字段: {missing_fields}"
            })
            # 返回 None 表示该数据项将被跳过
            return None

        return item

    async def close(self) -> None:
        """输出验证错误摘要"""
        if self.errors:
            print(f"\n验证管道发现 {len(self.errors)} 个错误:")
            for i, error in enumerate(self.errors[:5], 1):
                print(f"  {i}. {error['error']}")
                print(f"     URL: {error['item'].get('source_url', 'N/A')}")

class DataCleaningPipeline(BasePipeline):
    """数据清洗管道"""

    async def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        """清洗和标准化数据"""
        cleaned = item.copy()

        # 清理文本字段
        text_fields = ["title", "category", "description"]
        for field in text_fields:
            if field in cleaned and isinstance(cleaned[field], str):
                # 去除多余空白
                cleaned[field] = " ".join(cleaned[field].split())
                # 转义 HTML 实体
                cleaned[field] = (
                    cleaned[field]
                    .replace("&amp;", "&")
                    .replace("&lt;", "<")
                    .replace("&gt;", ">")
                    .replace("&quot;", '"')
                )

        # 处理价格字段
        if "price" in cleaned:
            if isinstance(cleaned["price"], str):
                # 移除货币符号和逗号
                price_str = cleaned["price"].replace("$", "").replace(",", "").strip()
                try:
                    cleaned["price"] = float(price_str)
                except ValueError:
                    cleaned["price"] = 0.0

        # 处理评分字段
        if "rating" in cleaned and isinstance(cleaned["rating"], str):
            try:
                # 提取数字部分(如 "4.5 out of 5")
                rating_str = cleaned["rating"].split()[0]
                cleaned["rating"] = float(rating_str)
            except (ValueError, IndexError):
                cleaned["rating"] = None

        return cleaned

class StoragePipeline(BasePipeline):
    """数据存储管道"""

    def __init__(self, output_dir: str = "./output"):
        self.output_dir = Path(output_dir)
        self.json_file = None
        self.csv_file = None
        self.csv_writer = None
        self.items: List[Dict[str, Any]] = []

    async def open(self) -> None:
        """初始化存储文件"""
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # 准备 JSON 文件
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.json_file = self.output_dir / f"data_{timestamp}.json"

        # 准备 CSV 文件
        self.csv_file = self.output_dir / f"data_{timestamp}.csv"

        print(f"存储管道已初始化:")
        print(f"  - JSON 输出: {self.json_file}")
        print(f"  - CSV 输出: {self.csv_file}")

    async def process_item(self, item: Dict[str, Any]) -> Dict[str, Any]:
        """存储数据项"""
        self.items.append(item)

        # 实时写入 JSON(追加模式)
        with open(self.json_file, "a", encoding="utf-8") as f:
            json.dump(item, f, ensure_ascii=False, indent=2)
            f.write("\n")

        # CSV 写入在 close 时统一处理
        return item

    async def close(self) -> None:
        """完成存储操作"""
        if self.items and self.csv_file:
            # 写入 CSV
            with open(self.csv_file, "w", newline="", encoding="utf-8") as f:
                if self.items:
                    fieldnames = list(self.items[0].keys())
                    writer = csv.DictWriter(f, fieldnames=fieldnames)
                    writer.writeheader()
                    writer.writerows(self.items)

            print(f"\n存储完成:")
            print(f"  - 共存储 {len(self.items)} 条数据")
            print(f"  - JSON 文件: {self.json_file}")
            print(f"  - CSV 文件: {self.csv_file}")

# =====================================
# 管道管理器
# =====================================

class PipelineManager:
    """管理多个管道的协调器"""

    def __init__(self):
        self.pipelines: List[BasePipeline] = []

    def add_pipeline(self, pipeline: BasePipeline) -> "PipelineManager":
        """添加一个管道"""
        self.pipelines.append(pipeline)
        return self

    async def open(self) -> None:
        """初始化所有管道"""
        for pipeline in self.pipelines:
            await pipeline.open()

    async def process(self, item: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """通过所有管道处理数据"""
        current_item = item
        for pipeline in self.pipelines:
            if current_item is None:
                return None
            current_item = await pipeline.process_item(current_item)
        return current_item

    async def close(self) -> None:
        """关闭所有管道"""
        for pipeline in self.pipelines:
            await pipeline.close()

# =====================================
# 管道使用演示
# =====================================

async def demo_pipeline():
    """演示数据处理管道的使用"""

    print("=== 数据处理管道演示 ===\n")

    # 创建测试数据
    test_items = [
        {
            "product_id": "SKU001",
            "title": "  无线蓝牙耳机   高品质降噪  ",
            "price": "$129.99",
            "rating": "4.5 out of 5",
            "source_url": "https://shop.example.com/product/001"
        },
        {
            "product_id": "SKU002",
            "title": "便携式移动电源 20000mAh",
            "price": "$49.99",
            "category": "电子产品",
            "rating": "4.2 out of 5",
            "review_count": 1250,
            "source_url": "https://shop.example.com/product/002"
        },
        {
            "product_id": "",
            "title": "不完整的数据",
            "price": "invalid",
            "source_url": "https://shop.example.com/product/003"
        },
    ]

    # 创建管道管理器
    manager = PipelineManager()
    manager.add_pipeline(ValidationPipeline(required_fields=["product_id", "title"]))
    manager.add_pipeline(DataCleaningPipeline())
    manager.add_pipeline(StoragePipeline(output_dir="./demo_output"))

    # 初始化管道
    await manager.open()

    # 处理测试数据
    print("处理数据项:")
    processed_count = 0
    for item in test_items:
        print(f"\n原始数据: {item}")
        result = await manager.process(item)
        if result:
            processed_count += 1
            print(f"处理后: {result}")
        else:
            print("该数据项被跳过(验证失败)")

    # 关闭管道
    await manager.close()

    print(f"\n处理完成: 共处理 {processed_count}/{len(test_items)} 条数据")

asyncio.run(demo_pipeline())

浏览器自动化与 JavaScript 渲染

现代网页越来越多地使用 JavaScript 来动态加载内容,传统基于 HTTP 请求的爬虫方法对此束手无策。Crawlee 通过集成 Playwright 和 Puppeteer 等浏览器自动化工具,提供了完整的 JavaScript 渲染支持。

# browser_automation.py

import asyncio
from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field
from datetime import datetime

# =====================================
# 浏览器爬虫基类
# =====================================

@dataclass
class BrowserConfig:
    """浏览器配置"""
    headless: bool = True                    # 是否无头模式运行
    viewport_width: int = 1920              # 视口宽度
    viewport_height: int = 1080             # 视口高度
    user_agent: Optional[str] = None        # 自定义 User-Agent
    download_path: Optional[str] = None     # 文件下载路径
    navigation_timeout: int = 30000         # 页面导航超时(毫秒)
    java_script_enabled: bool = True        # 是否启用 JavaScript

    @property
    def viewport(self) -> Dict[str, int]:
        """获取视口配置"""
        return {
            "width": self.viewport_width,
            "height": self.viewport_height
        }

# =====================================
# 页面交互操作
# =====================================

class PageInteraction:
    """封装常用的页面交互操作"""

    @staticmethod
    async def wait_for_selector(page, selector: str, timeout: int = 10000):
        """等待元素出现"""
        await page.wait_for_selector(selector, timeout=timeout)

    @staticmethod
    async def click_and_wait(page, selector: str, wait_after_ms: int = 500):
        """点击元素并等待"""
        await page.click(selector)
        await asyncio.sleep(wait_after_ms / 1000)

    @staticmethod
    async def fill_and_submit(page, input_selector: str, value: str, 
                             submit_selector: Optional[str] = None):
        """填写表单并提交"""
        await page.fill(input_selector, value)
        if submit_selector:
            await page.click(submit_selector)
        else:
            await page.press(input_selector, "Enter")

    @staticmethod
    async def scroll_to_bottom(page, step: int = 500, delay: float = 0.3):
        """滚动到页面底部(常用于触发懒加载)"""
        viewport_height = await page.evaluate("window.innerHeight")
        current_position = 0
        total_height = await page.evaluate("document.body.scrollHeight")

        while current_position < total_height:
            await page.evaluate(f"window.scrollTo(0, {current_position})")
            await asyncio.sleep(delay)
            current_position += step
            total_height = await page.evaluate("document.body.scrollHeight")

# =====================================
# JavaScript 渲染页面爬虫示例
# =====================================

class DynamicPageCrawler:
    """处理动态渲染页面的爬虫"""

    def __init__(self, config: Optional[BrowserConfig] = None):
        self.config = config or BrowserConfig()
        self.results: List[Dict[str, Any]] = []

    async def scrape_infinite_scroll_page(self, url: str) -> List[Dict[str, Any]]:
        """抓取无限滚动加载的页面"""
        print(f"开始抓取无限滚动页面: {url}")

        # 这里演示关键步骤,真实环境中需要导入 playwright
        steps = [
            "1. 启动浏览器并打开页面",
            "2. 等待初始内容加载完成",
            "3. 记录已加载的数据项数量",
            "4. 滚动到页面底部",
            "5. 等待新内容加载",
            "6. 重复步骤 3-5 直到没有新内容",
            "7. 提取所有可见数据"
        ]

        for step in steps:
            print(f"  {step}")
            await asyncio.sleep(0.1)

        # 模拟抓取结果
        self.results = [
            {"index": i, "title": f"Item {i}", "timestamp": datetime.now().isoformat()}
            for i in range(1, 51)
        ]

        return self.results

    async def scrape_spa_page(self, url: str) -> Dict[str, Any]:
        """抓取单页应用(SPA)"""
        print(f"开始抓取 SPA 页面: {url}")

        # SPA 页面抓取的关键是等待路由变化和内容更新
        steps = [
            "1. 启动浏览器导航到初始 URL",
            "2. 等待 Angular/Vue/React 应用初始化",
            "3. 触发导航或路由变化",
            "4. 等待路由变化完成和内容渲染",
            "5. 提取页面数据",
            "6. 处理可能的异步请求"
        ]

        for step in steps:
            print(f"  {step}")
            await asyncio.sleep(0.1)

        return {
            "url": url,
            "title": "SPA Page Title",
            "content": "Extracted content...",
            "scraped_at": datetime.now().isoformat()
        }

    async def scrape_login_required_page(self, url: str, 
                                         credentials: Dict[str, str]) -> Dict[str, Any]:
        """抓取需要登录的页面"""
        print(f"开始处理需要登录的页面: {url}")

        steps = [
            "1. 导航到登录页面",
            "2. 填写用户名和密码",
            "3. 提交登录表单",
            "4. 等待登录成功和重定向",
            "5. 导航到目标页面",
            "6. 等待内容加载",
            "7. 提取所需数据"
        ]

        for step in steps:
            print(f"  {step}")
            await asyncio.sleep(0.1)

        return {
            "url": url,
            "login_successful": True,
            "data": "Protected content...",
            "scraped_at": datetime.now().isoformat()
        }

# =====================================
# 高级浏览器配置示例
# =====================================

class AdvancedBrowserSetup:
    """高级浏览器配置技巧"""

    @staticmethod
    def get_stealth_config() -> Dict[str, Any]:
        """获取反检测配置"""
        return {
            "headless": True,
            "args": [
                "--disable-blink-features=AutomationControlled",
                "--disable-dev-shm-usage",
                "--no-sandbox",
                "--disable-setuid-sandbox",
                "--disable-infobars",
                "--window-position=0,0",
                "--ignore-certificate-errors",
                "--enable-features=NetworkService,NetworkServiceInProcess",
            ],
            "ignore_https_errors": True,
        }

    @staticmethod
    def get_mobile_config() -> Dict[str, Any]:
        """获取移动设备配置"""
        return {
            "viewport": {"width": 375, "height": 812},  # iPhone X 尺寸
            "user_agent": (
                "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) "
                "AppleWebKit/605.1.15 (KHTML, like Gecko) "
                "Version/14.0 Mobile/15E148 Safari/604.1"
            ),
            "hasTouch": True,
            "isMobile": True,
        }

    @staticmethod
    def get_custom_headers() -> Dict[str, str]:
        """获取自定义请求头"""
        return {
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Encoding": "gzip, deflate, br",
            "DNT": "1",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
        }

# =====================================
# 演示入口
# =====================================

async def demo_browser_automation():
    """浏览器自动化功能演示"""

    print("=== 浏览器自动化演示 ===\n")

    # 创建爬虫实例
    crawler = DynamicPageCrawler()

    # 演示无限滚动页面抓取
    print("\n--- 无限滚动页面抓取 ---")
    results = await crawler.scrape_infinite_scroll_page("https://example.com/feed")
    print(f"抓取结果: 共 {len(results)} 条数据")

    # 演示 SPA 页面抓取
    print("\n--- SPA 页面抓取 ---")
    spa_result = await crawler.scrape_spa_page("https://example.com/app")
    print(f"页面标题: {spa_result['title']}")

    # 演示登录页面抓取
    print("\n--- 登录页面抓取 ---")
    protected_result = await crawler.scrape_login_required_page(
        "https://example.com/dashboard",
        credentials={"username": "user@example.com", "password": "password123"}
    )
    print(f"登录状态: {'成功' if protected_result['login_successful'] else '失败'}")

    # 显示配置选项
    print("\n--- 浏览器配置选项 ---")
    print("反检测配置:", AdvancedBrowserSetup.get_stealth_config()["args"][:3])
    print("移动设备配置:", AdvancedBrowserSetup.get_mobile_config()["viewport"])
    print("自定义请求头:", list(AdvancedBrowserSetup.get_custom_headers().keys()))

asyncio.run(demo_browser_automation())

代理管理与轮换

在爬虫开发中,合理使用代理是避免被封禁的重要手段。Crawlee 提供了灵活的代理管理功能,支持代理池轮换、自动故障转移等高级特性。

# proxy_management.py

import asyncio
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime
import random

# =====================================
# 代理相关数据结构
# =====================================

class ProxyStatus(Enum):
    """代理状态枚举"""
    ACTIVE = "active"           # 可用
    TESTING = "testing"         # 测试中
    FAILED = "failed"           # 失败
    DISABLED = "disabled"       # 已禁用

@dataclass
class ProxyInfo:
    """代理信息"""
    url: str                            # 代理地址
    username: Optional[str] = None     # 认证用户名
    password: Optional[str] = None     # 认证密码
    status: ProxyStatus = ProxyStatus.ACTIVE
    success_count: int = 0              # 成功次数
    failure_count: int = 0              # 失败次数
    last_used: Optional[datetime] = None
    last_error: Optional[str] = None
    avg_response_time: float = 0.0      # 平均响应时间(毫秒)

    @property
    def is_healthy(self) -> bool:
        """判断代理是否健康"""
        return (
            self.status == ProxyStatus.ACTIVE 
            and self.failure_count < 5
        )

    @property
    def success_rate(self) -> float:
        """计算成功率"""
        total = self.success_count + self.failure_count
        if total == 0:
            return 1.0
        return self.success_count / total

# =====================================
# 代理池管理器
# =====================================

class ProxyPool:
    """代理池管理器"""

    def __init__(self):
        self.proxies: Dict[str, ProxyInfo] = {}
        self.current_index: int = 0

    def add_proxy(self, proxy_url: str, username: Optional[str] = None,
                  password: Optional[str] = None) -> None:
        """添加代理到池中"""
        self.proxies[proxy_url] = ProxyInfo(
            url=proxy_url,
            username=username,
            password=password
        )
        print(f"已添加代理: {proxy_url}")

    def add_proxies_from_config(self, config: List[Dict[str, str]]) -> None:
        """从配置批量添加代理"""
        for item in config:
            self.add_proxy(
                proxy_url=item["url"],
                username=item.get("username"),
                password=item.get("password")
            )

    def get_next_proxy(self) -> Optional[ProxyInfo]:
        """获取下一个可用的代理(轮询策略)"""
        available = [p for p in self.proxies.values() if p.is_healthy]
        if not available:
            print("警告: 没有可用的代理")
            return None

        # 轮询选择
        proxy = available[self.current_index % len(available)]
        self.current_index = (self.current_index + 1) % len(available)
        proxy.last_used = datetime.now()

        return proxy

    def get_random_proxy(self) -> Optional[ProxyInfo]:
        """随机获取一个可用的代理"""
        available = [p for p in self.proxies.values() if p.is_healthy]
        if not available:
            return None
        return random.choice(available)

    def record_success(self, proxy_url: str, response_time: float) -> None:
        """记录代理使用成功"""
        if proxy_url in self.proxies:
            proxy = self.proxies[proxy_url]
            proxy.success_count += 1
            # 增量更新平均响应时间
            proxy.avg_response_time = (
                (proxy.avg_response_time * proxy.success_count + response_time) 
                / (proxy.success_count + 1)
            )
            proxy.last_error = None
            print(f"代理成功 [{proxy_url}]: 响应时间 {response_time:.0f}ms")

    def record_failure(self, proxy_url: str, error: str) -> None:
        """记录代理使用失败"""
        if proxy_url in self.proxies:
            proxy = self.proxies[proxy_url]
            proxy.failure_count += 1
            proxy.last_error = error

            # 连续失败次数过多则标记为不可用
            if proxy.failure_count >= 5:
                proxy.status = ProxyStatus.FAILED
                print(f"代理已禁用 [{proxy_url}]: 失败次数过多")
            else:
                print(f"代理失败 [{proxy_url}]: {error}")

    def get_stats(self) -> Dict[str, Any]:
        """获取代理池统计信息"""
        total = len(self.proxies)
        healthy = sum(1 for p in self.proxies.values() if p.is_healthy)
        total_success = sum(p.success_count for p in self.proxies.values())
        total_failure = sum(p.failure_count for p in self.proxies.values())

        return {
            "total_proxies": total,
            "healthy_proxies": healthy,
            "total_requests": total_success + total_failure,
            "overall_success_rate": (
                total_success / (total_success + total_failure) 
                if (total_success + total_failure) > 0 else 0
            ),
            "avg_response_time": (
                sum(p.avg_response_time for p in self.proxies.values()) / total
                if total > 0 else 0
            )
        }

    def display_status(self) -> None:
        """显示所有代理状态"""
        print("\n代理池状态:")
        print("-" * 80)
        print(f"{'代理地址':<40} {'状态':<10} {'成功':<8} {'失败':<8} {'成功率':<10}")
        print("-" * 80)

        for proxy in self.proxies.values():
            print(
                f"{proxy.url:<40} "
                f"{proxy.status.value:<10} "
                f"{proxy.success_count:<8} "
                f"{proxy.failure_count:<8} "
                f"{proxy.success_rate:.1%}"
            )

        stats = self.get_stats()
        print("-" * 80)
        print(f"总计: {stats['healthy_proxies']}/{stats['total_proxies']} 代理可用")
        print(f"总体成功率: {stats['overall_success_rate']:.1%}")
        print(f"平均响应时间: {stats['avg_response_time']:.0f}ms")

# =====================================
# 代理使用示例
# =====================================

async def demo_proxy_management():
    """代理管理功能演示"""

    print("=== 代理管理演示 ===\n")

    # 创建代理池
    pool = ProxyPool()

    # 添加代理
    proxies_config = [
        {"url": "http://proxy1.example.com:8080", "username": "user1", "password": "pass1"},
        {"url": "http://proxy2.example.com:8080", "username": "user2", "password": "pass2"},
        {"url": "http://proxy3.example.com:8080"},
        {"url": "http://proxy4.example.com:8080"},
    ]
    pool.add_proxies_from_config(proxies_config)

    # 显示初始状态
    print("\n初始状态:")
    pool.display_status()

    # 模拟使用代理进行请求
    print("\n模拟请求处理:")
    for i in range(10):
        proxy = pool.get_random_proxy()
        if proxy:
            # 模拟请求结果
            success = random.random() > 0.3
            if success:
                response_time = random.uniform(100, 500)
                pool.record_success(proxy.url, response_time)
            else:
                pool.record_failure(proxy.url, "Connection timeout")
        await asyncio.sleep(0.05)

    # 显示最终状态
    print("\n最终状态:")
    pool.display_status()

    # 获取统计信息
    stats = pool.get_stats()
    print(f"\n统计摘要:")
    print(f"  总请求数: {stats['total_requests']}")
    print(f"  总体成功率: {stats['overall_success_rate']:.1%}")
    print(f"  平均响应时间: {stats['avg_response_time']:.0f}ms")

asyncio.run(demo_proxy_management())

实战教程:完整爬虫项目开发

项目概述:电商商品数据采集

在这一部分,我们将开发一个完整的电商网站商品数据采集系统。这个项目将综合运用前面介绍的各种功能,包括请求调度、数据管道、代理轮换等,最终实现一个稳定可靠的生产级爬虫。

# ecommerce_crawler.py

import asyncio
import json
import re
import csv
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime
from pathlib import Path
from urllib.parse import urljoin, urlparse

# =====================================
# 项目配置
# =====================================

@dataclass
class CrawlerConfig:
    """爬虫配置"""
    # 基础配置
    start_url: str = "https://demo.ecommerce-example.com"
    max_concurrency: int = 3
    max_retries: int = 3

    # 采集配置
    max_pages_per_category: int = 10
    request_delay_min: float = 1.0    # 最小请求间隔(秒)
    request_delay_max: float = 3.0    # 最大请求间隔(秒)

    # 代理配置
    use_proxy: bool = True
    proxy_urls: List[str] = field(default_factory=list)

    # 存储配置
    output_dir: str = "./ecommerce_data"

    @classmethod
    def from_file(cls, config_path: str) -> "CrawlerConfig":
        """从文件加载配置"""
        with open(config_path, "r", encoding="utf-8") as f:
            config_dict = json.load(f)
        return cls(**config_dict)

    def to_file(self, config_path: str) -> None:
        """保存配置到文件"""
        with open(config_path, "w", encoding="utf-8") as f:
            json.dump(asdict(self), f, indent=2, ensure_ascii=False)

# =====================================
# 数据模型
# =====================================

@dataclass
class Product:
    """商品数据模型"""
    product_id: str = ""
    title: str = ""
    price: float = 0.0
    original_price: Optional[float] = None
    currency: str = "USD"
    category: str = ""
    subcategory: str = ""
    brand: str = ""
    rating: float = 0.0
    review_count: int = 0
    seller: str = ""
    in_stock: bool = True
    description: str = ""
    specifications: Dict[str, str] = field(default_factory=dict)
    images: List[str] = field(default_factory=list)
    source_url: str = ""
    scraped_at: str = ""
    error_message: Optional[str] = None

    def __post_init__(self):
        """初始化时设置时间戳"""
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return asdict(self)

    def validate(self) -> bool:
        """验证数据完整性"""
        if not self.product_id:
            self.error_message = "缺少商品ID"
            return False
        if not self.title:
            self.error_message = "缺少商品标题"
            return False
        if self.price < 0:
            self.error_message = "价格不能为负数"
            return False
        return True

# =====================================
# HTML 解析工具
# =====================================

class HTMLParser:
    """HTML 解析工具类"""

    @staticmethod
    def extract_text(element, selector: str, default: str = "") -> str:
        """提取文本内容"""
        try:
            found = element.select_one(selector)
            if found:
                return found.get_text(strip=True)
        except Exception:
            pass
        return default

    @staticmethod
    def extract_attribute(element, selector: str, 
                         attribute: str, default: str = "") -> str:
        """提取元素属性"""
        try:
            found = element.select_one(selector)
            if found and found.has_attr(attribute):
                return found[attribute]
        except Exception:
            pass
        return default

    @staticmethod
    def extract_all(element, selector: str) -> List:
        """提取所有匹配的元素"""
        try:
            return element.select(selector)
        except Exception:
            return []

    @staticmethod
    def parse_price(price_str: str) -> float:
        """解析价格字符串"""
        if not price_str:
            return 0.0
        # 移除非数字和小数点以外的字符
        cleaned = re.sub(r"[^\d.]", "", price_str)
        try:
            return float(cleaned)
        except ValueError:
            return 0.0

    @staticmethod
    def parse_rating(rating_str: str) -> float:
        """解析评分字符串"""
        if not rating_str:
            return 0.0
        match = re.search(r"(\d+\.?\d*)", rating_str)
        if match:
            return float(match.group(1))
        return 0.0

    @staticmethod
    def parse_review_count(count_str: str) -> int:
        """解析评论数量"""
        if not count_str:
            return 0
        # 处理 "1,234 reviews" 这样的格式
        cleaned = re.sub(r"[,\s]", "", count_str)
        match = re.search(r"(\d+)", cleaned)
        if match:
            return int(match.group(1))
        return 0

# =====================================
# 爬虫核心逻辑
# =====================================

class EcommerceCrawler:
    """电商爬虫主类"""

    def __init__(self, config: CrawlerConfig):
        self.config = config
        self.products: List[Product] = []
        self.failed_urls: List[Dict[str, str]] = []
        self.stats = {
            "pages_visited": 0,
            "products_found": 0,
            "products_saved": 0,
            "requests_failed": 0,
            "start_time": None,
            "end_time": None,
        }

        # 确保输出目录存在
        Path(config.output_dir).mkdir(parents=True, exist_ok=True)

    async def start(self) -> None:
        """启动爬虫"""
        print("=" * 60)
        print("电商商品数据采集系统启动")
        print("=" * 60)
        print(f"起始 URL: {self.config.start_url}")
        print(f"最大并发: {self.config.max_concurrency}")
        print(f"输出目录: {self.config.output_dir}")
        print("=" * 60)

        self.stats["start_time"] = datetime.now()

        try:
            await self.crawl()
        except KeyboardInterrupt:
            print("\n用户中断,正在停止爬虫...")
        finally:
            await self.finish()

    async def crawl(self) -> None:
        """执行爬取逻辑"""
        # 阶段1: 采集分类页面
        print("\n阶段1: 采集分类页面")
        category_urls = await self.fetch_category_pages()

        # 阶段2: 采集商品列表页
        print("\n阶段2: 采集商品列表页")
        product_urls = await self.fetch_product_listings(category_urls)

        # 阶段3: 采集商品详情页
        print("\n阶段3: 采集商品详情页")
        await self.fetch_product_details(product_urls)

    async def fetch_category_pages(self) -> List[str]:
        """获取所有分类页面"""
        category_urls = []

        # 这里模拟从首页获取分类链接
        # 实际使用时需要解析 HTML 获取真正的分类链接
        mock_categories = [
            "/category/electronics",
            "/category/clothing",
            "/category/home-garden",
            "/category/sports",
        ]

        for cat in mock_categories:
            url = urljoin(self.config.start_url, cat)
            category_urls.append(url)
            print(f"  发现分类: {url}")
            await self.random_delay()

        return category_urls

    async def fetch_product_listings(self, category_urls: List[str]) -> List[str]:
        """获取商品列表页中的商品链接"""
        all_product_urls = []
        page_count = 0

        for category_url in category_urls:
            print(f"\n  采集分类: {category_url}")

            for page in range(1, self.config.max_pages_per_category + 1):
                if page > 1:
                    page_url = f"{category_url}?page={page}"
                else:
                    page_url = category_url

                print(f"    采集列表页 {page}: {page_url}")
                page_count += 1
                self.stats["pages_visited"] += 1

                # 模拟从页面提取商品链接
                # 实际使用时需要解析 HTML
                mock_product_urls = [
                    f"/product/item-{page_count}-{i}" 
                    for i in range(1, 4)
                ]

                for product_path in mock_product_urls:
                    full_url = urljoin(self.config.start_url, product_path)
                    all_product_urls.append(full_url)
                    print(f"      发现商品: {full_url}")

                await self.random_delay()

        return all_product_urls

    async def fetch_product_details(self, product_urls: List[str]) -> None:
        """采集商品详情页"""
        for i, url in enumerate(product_urls, 1):
            print(f"\n  [{i}/{len(product_urls)}] 采集商品: {url}")

            try:
                # 模拟从页面提取商品数据
                # 实际使用时需要解析 HTML
                product = self.simulate_product_extraction(url)

                if product.validate():
                    self.products.append(product)
                    self.stats["products_found"] += 1
                    self.stats["products_saved"] += 1
                    print(f"    成功: {product.title[:50]}...")
                else:
                    print(f"    验证失败: {product.error_message}")
                    self.failed_urls.append({
                        "url": url,
                        "error": product.error_message or "Unknown"
                    })

            except Exception as e:
                print(f"    错误: {str(e)}")
                self.failed_urls.append({
                    "url": url,
                    "error": str(e)
                })
                self.stats["requests_failed"] += 1

            self.stats["pages_visited"] += 1
            await self.random_delay()

    def simulate_product_extraction(self, url: str) -> Product:
        """模拟商品数据提取(实际项目中替换为真实解析逻辑)"""
        # 解析 URL 获取商品 ID
        parsed = urlparse(url)
        path_parts = parsed.path.strip("/").split("/")
        product_id = path_parts[-1] if path_parts else "unknown"

        # 模拟提取的数据
        return Product(
            product_id=product_id,
            title=f"商品标题 - {product_id}",
            price=99.99,
            original_price=129.99,
            currency="USD",
            category="Electronics",
            subcategory="Accessories",
            brand="DemoBrand",
            rating=4.5,
            review_count=1234,
            seller="Official Store",
            in_stock=True,
            description="这是一个示例商品描述,实际项目中会从页面提取真实内容。",
            specifications={
                "Material": "Plastic",
                "Weight": "200g",
                "Dimensions": "10x5x2 cm"
            },
            images=["https://example.com/image1.jpg"],
            source_url=url,
        )

    async def random_delay(self) -> None:
        """随机延迟"""
        import random
        delay = random.uniform(
            self.config.request_delay_min,
            self.config.request_delay_max
        )
        await asyncio.sleep(delay)

    async def save_results(self) -> None:
        """保存采集结果"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        # 保存为 JSON
        json_path = Path(self.config.output_dir) / f"products_{timestamp}.json"
        with open(json_path, "w", encoding="utf-8") as f:
            data = {
                "metadata": {
                    "scraped_at": datetime.now().isoformat(),
                    "total_products": len(self.products),
                    "config": asdict(self.config),
                },
                "products": [p.to_dict() for p in self.products]
            }
            json.dump(data, f, indent=2, ensure_ascii=False)
        print(f"\nJSON 数据已保存: {json_path}")

        # 保存为 CSV
        csv_path = Path(self.config.output_dir) / f"products_{timestamp}.csv"
        if self.products:
            with open(csv_path, "w", newline="", encoding="utf-8") as f:
                # 展平嵌套字段以便 CSV 存储
                flattened_products = []
                for p in self.products:
                    flat = {
                        "product_id": p.product_id,
                        "title": p.title,
                        "price": p.price,
                        "original_price": p.original_price,
                        "currency": p.currency,
                        "category": p.category,
                        "brand": p.brand,
                        "rating": p.rating,
                        "review_count": p.review_count,
                        "in_stock": p.in_stock,
                        "source_url": p.source_url,
                    }
                    flattened_products.append(flat)

                writer = csv.DictWriter(f, fieldnames=flattened_products[0].keys())
                writer.writeheader()
                writer.writerows(flattened_products)
            print(f"CSV 数据已保存: {csv_path}")

        # 保存失败记录
        if self.failed_urls:
            error_path = Path(self.config.output_dir) / f"failed_{timestamp}.json"
            with open(error_path, "w", encoding="utf-8") as f:
                json.dump(self.failed_urls, f, indent=2, ensure_ascii=False)
            print(f"失败记录已保存: {error_path}")

    async def finish(self) -> None:
        """完成爬虫并生成报告"""
        self.stats["end_time"] = datetime.now()

        print("\n" + "=" * 60)
        print("爬虫执行完成")
        print("=" * 60)

        # 计算耗时
        if self.stats["start_time"] and self.stats["end_time"]:
            duration = (self.stats["end_time"] - self.stats["start_time"]).total_seconds()
            hours = int(duration // 3600)
            minutes = int((duration % 3600) // 60)
            seconds = int(duration % 60)
            print(f"总耗时: {hours:02d}:{minutes:02d}:{seconds:02d}")

        print(f"访问页面数: {self.stats['pages_visited']}")
        print(f"发现商品数: {self.stats['products_found']}")
        print(f"保存商品数: {self.stats['products_saved']}")
        print(f"请求失败数: {self.stats['requests_failed']}")

        if self.products:
            print(f"\n成功率和效率:")
            print(f"  商品采集成功率: {self.stats['products_saved']/len(self.products):.1%}")

        # 保存结果
        await self.save_results()

        print("=" * 60)

# =====================================
# 主程序入口
# =====================================

async def main():
    """主函数"""
    # 创建配置
    config = CrawlerConfig(
        start_url="https://demo.ecommerce-example.com",
        max_concurrency=2,
        max_retries=3,
        max_pages_per_category=5,
        request_delay_min=1.0,
        request_delay_max=2.0,
        output_dir="./ecommerce_data"
    )

    # 创建并启动爬虫
    crawler = EcommerceCrawler(config)
    await crawler.start()

# 运行入口
if __name__ == "__main__":
    asyncio.run(main())

项目二:社交媒体数据采集

社交媒体平台的数据采集是另一个常见需求。与电商网站不同,社交媒体通常采用 SPA 架构,大量使用 JavaScript 动态渲染,并且对自动化访问有严格的限制。这一部分我们将学习如何用 Crawlee 应对这些挑战。

# social_media_crawler.py

import asyncio
import json
import re
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from urllib.parse import urljoin, parse_qs, urlencode

# =====================================
# 配置与常量
# =====================================

@dataclass
class SocialMediaConfig:
    """社交媒体爬虫配置"""
    platform: str = "twitter"              # 平台标识
    base_url: str = "https://twitter.com"
    api_base_url: str = "https://api.twitter.com/2"

    # 认证配置
    use_session: bool = True
    auth_cookies: Dict[str, str] = field(default_factory=dict)

    # 采集配置
    max_tweets: int = 1000
    max_depth: int = 3                    # 关注者遍历深度
    include_retweets: bool = True
    include_replies: bool = False

    # 限流配置
    rate_limit_delay: float = 2.0         # 请求间隔(秒)
    max_retries: int = 5
    retry_delay: float = 60.0             # 限流后重试延迟(秒)

    # 输出配置
    output_dir: str = "./social_data"

# =====================================
# 数据模型
# =====================================

@dataclass
class Tweet:
    """推文数据模型"""
    tweet_id: str = ""
    user_id: str = ""
    username: str = ""
    display_name: str = ""
    content: str = ""
    created_at: str = ""

    # 互动数据
    likes: int = 0
    retweets: int = 0
    replies: int = 0
    views: int = 0

    # 引用和回复
    is_retweet: bool = False
    original_tweet_id: Optional[str] = None
    is_reply: bool = False
    reply_to_tweet_id: Optional[str] = None
    reply_to_user: Optional[str] = None

    # 媒体
    has_media: bool = False
    media_urls: List[str] = field(default_factory=list)
    has_video: bool = False
    video_url: Optional[str] = None

    # 地理位置
    has_location: bool = False
    location: str = ""
    coordinates: Optional[Dict[str, float]] = None

    # 来源信息
    source_app: str = ""
    source_url: str = ""
    scraped_at: str = ""

    def __post_init__(self):
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

@dataclass
class User:
    """用户数据模型"""
    user_id: str = ""
    username: str = ""
    display_name: str = ""
    bio: str = ""
    location: str = ""
    website: str = ""

    # 账户信息
    verified: bool = False
    protected: bool = False
    created_at: str = ""

    # 统计数据
    followers_count: int = 0
    following_count: int = 0
    tweet_count: int = 0
    listed_count: int = 0

    # 头像和背景图
    avatar_url: str = ""
    banner_url: str = ""

    # 关联数据
    pinned_tweet_id: Optional[str] = None

    # 元数据
    source_url: str = ""
    scraped_at: str = ""

    def __post_init__(self):
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

# =====================================
# 会话管理
# =====================================

class SessionManager:
    """会话管理器"""

    def __init__(self, config: SocialMediaConfig):
        self.config = config
        self.session_headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
            "Accept": "application/json, text/html",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "DNT": "1",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
        }
        self.request_count = 0
        self.last_request_time = None
        self.rate_limit_remaining = 100
        self.rate_limit_reset = None

    def update_headers(self, additional_headers: Dict[str, str]) -> None:
        """更新请求头"""
        self.session_headers.update(additional_headers)

    def add_cookies(self, cookies: Dict[str, str]) -> None:
        """添加 cookies"""
        if cookies:
            cookie_str = "; ".join(f"{k}={v}" for k, v in cookies.items())
            self.session_headers["Cookie"] = cookie_str

    async def rate_limit_wait(self) -> None:
        """等待以遵守速率限制"""
        if self.rate_limit_remaining <= 5:
            if self.rate_limit_reset:
                wait_time = max(
                    (self.rate_limit_reset - datetime.now()).total_seconds(),
                    self.config.rate_limit_delay
                )
                print(f"接近速率限制,等待 {wait_time:.1f} 秒...")
                await asyncio.sleep(wait_time)
        else:
            await asyncio.sleep(self.config.rate_limit_delay)

        self.request_count += 1

    def update_rate_limit_info(self, headers: Dict[str, str]) -> None:
        """更新速率限制信息"""
        if "x-rate-limit-remaining" in headers:
            self.rate_limit_remaining = int(headers["x-rate-limit-remaining"])
        if "x-rate-limit-reset" in headers:
            reset_timestamp = int(headers["x-rate-limit-reset"])
            self.rate_limit_reset = datetime.fromtimestamp(reset_timestamp)

    def is_rate_limited(self) -> bool:
        """检查是否被限流"""
        if self.rate_limit_remaining <= 0:
            return True
        return False

# =====================================
# 爬虫核心
# =====================================

class SocialMediaCrawler:
    """社交媒体爬虫"""

    def __init__(self, config: SocialMediaConfig):
        self.config = config
        self.session = SessionManager(config)
        self.tweets: List[Tweet] = []
        self.users: List[User] = []
        self.visited_urls: Set[str] = set()
        self.stats = {
            "tweets_collected": 0,
            "users_collected": 0,
            "requests_made": 0,
            "rate_limited": 0,
            "errors": 0,
        }

        # 设置初始 cookies
        if config.auth_cookies:
            self.session.add_cookies(config.auth_cookies)

    async def search_tweets(self, query: str, max_results: int = 100) -> List[Tweet]:
        """搜索推文"""
        print(f"\n搜索推文: {query}")
        tweets = []

        # 构建搜索参数
        params = {
            "query": query,
            "max_results": min(max_results, 100),
            "tweet.fields": "created_at,public_metrics,entities,geo",
            "expansions": "author_id,attachments.media_keys",
            "user.fields": "username,name,verified",
        }

        # 模拟分页请求
        remaining = max_results
        next_token = None

        while remaining > 0 and len(tweets) < max_results:
            if next_token:
                params["next_token"] = next_token

            # 这里应该发起真实的 API 请求
            # await self.session.rate_limit_wait()

            # 模拟数据
            batch_size = min(remaining, 100)
            for i in range(batch_size):
                tweet = Tweet(
                    tweet_id=f"tweet_{len(tweets)}",
                    user_id=f"user_{hash(query) % 1000}",
                    username="example_user",
                    display_name="Example User",
                    content=f"这是关于 '{query}' 的推文内容 #{query.replace(' ', '')}",
                    created_at=(datetime.now() - timedelta(days=i)).isoformat(),
                    likes=hash(f"{query}_{i}") % 10000,
                    retweets=hash(f"{query}_{i}") % 1000,
                    views=hash(f"{query}_{i}") % 100000,
                    source_app="Twitter Web App",
                    source_url=f"{self.config.base_url}/i/status/{len(tweets)}",
                )
                tweets.append(tweet)
                self.stats["tweets_collected"] += 1

            self.stats["requests_made"] += 1
            remaining -= batch_size

            print(f"  已获取 {len(tweets)} 条推文...")
            await asyncio.sleep(0.5)

            # 模拟分页
            if remaining > 0:
                next_token = f"mock_token_{len(tweets)}"
            else:
                break

        self.tweets.extend(tweets)
        return tweets

    async def get_user_timeline(self, username: str, 
                                 max_tweets: int = 100) -> List[Tweet]:
        """获取用户时间线"""
        print(f"\n获取用户时间线: @{username}")
        tweets = []

        # 模拟请求
        for i in range(min(max_tweets, 20)):
            tweet = Tweet(
                tweet_id=f"tweet_{username}_{i}",
                user_id=f"user_{hash(username) % 1000}",
                username=username,
                display_name=username.title(),
                content=f"这是 @{username} 的第 {i+1} 条推文",
                created_at=(datetime.now() - timedelta(hours=i*2)).isoformat(),
                likes=i * 10,
                retweets=i * 5,
                views=i * 100,
                source_app="Twitter for iPhone",
                source_url=f"{self.config.base_url}/{username}/status/{i}",
            )
            tweets.append(tweet)

        self.tweets.extend(tweets)
        self.stats["tweets_collected"] += len(tweets)
        return tweets

    async def get_user_profile(self, username: str) -> Optional[User]:
        """获取用户资料"""
        print(f"\n获取用户资料: @{username}")

        # 模拟请求
        user = User(
            user_id=f"user_{hash(username) % 1000}",
            username=username,
            display_name=username.title(),
            bio=f"这是 @{username} 的个人简介",
            location="San Francisco, CA",
            website="https://example.com",
            verified=True,
            followers_count=hash(username) % 100000,
            following_count=hash(username) % 1000,
            tweet_count=hash(username) % 10000,
            listed_count=hash(username) % 100,
            avatar_url=f"{self.config.base_url}/profile_images/{username}.jpg",
            source_url=f"{self.config.base_url}/{username}",
        )

        self.users.append(user)
        self.stats["users_collected"] += 1
        return user

    async def get_followers(self, username: str, 
                           max_followers: int = 100) -> List[User]:
        """获取用户关注者"""
        print(f"\n获取 @{username} 的关注者")
        followers = []

        for i in range(min(max_followers, 20)):
            follower = User(
                user_id=f"follower_{hash(username)}_{i}",
                username=f"{username}_follower_{i}",
                display_name=f"Follower {i}",
                followers_count=hash(f"{username}_{i}") % 10000,
                verified=False,
            )
            followers.append(follower)

        self.users.extend(followers)
        return followers

    async def collect_user_network(self, seed_username: str, 
                                   depth: int = 2) -> None:
        """收集用户社交网络"""
        print(f"\n收集用户网络: @{seed_username} (深度: {depth})")

        visited_users = set()
        current_level = {seed_username}

        for level in range(depth):
            print(f"\n层级 {level + 1}/{depth}")
            next_level = set()

            for username in current_level:
                if username in visited_users:
                    continue
                visited_users.add(username)

                # 获取用户资料
                await self.get_user_profile(username)

                # 获取用户推文
                await self.get_user_timeline(username, max_tweets=10)

                # 获取关注者
                followers = await self.get_followers(username, max_followers=5)
                for follower in followers:
                    next_level.add(follower.username)

                await asyncio.sleep(self.config.rate_limit_delay)

            current_level = next_level
            print(f"  已收集 {len(visited_users)} 个用户...")

    async def search_by_hashtag(self, hashtag: str, 
                               max_tweets: int = 100) -> List[Tweet]:
        """按话题标签搜索"""
        query = f"#{hashtag.replace('#', '')}"
        return await self.search_tweets(query, max_tweets)

    async def search_by_location(self, location: str, 
                                 radius: str = "10km",
                                 max_tweets: int = 100) -> List[Tweet]:
        """按地理位置搜索"""
        query = f"geocode:{location},{radius}"
        return await self.search_tweets(query, max_tweets)

    async def save_results(self) -> None:
        """保存采集结果"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_dir = Path(self.config.output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)

        # 保存推文
        tweets_path = output_dir / f"tweets_{timestamp}.json"
        with open(tweets_path, "w", encoding="utf-8") as f:
            json.dump({
                "metadata": {
                    "collected_at": datetime.now().isoformat(),
                    "total_tweets": len(self.tweets),
                    "platform": self.config.platform,
                },
                "tweets": [t.to_dict() for t in self.tweets]
            }, f, indent=2, ensure_ascii=False)
        print(f"\n推文已保存: {tweets_path}")

        # 保存用户
        users_path = output_dir / f"users_{timestamp}.json"
        with open(users_path, "w", encoding="utf-8") as f:
            json.dump({
                "metadata": {
                    "collected_at": datetime.now().isoformat(),
                    "total_users": len(self.users),
                    "platform": self.config.platform,
                },
                "users": [u.to_dict() for u in self.users]
            }, f, indent=2, ensure_ascii=False)
        print(f"用户已保存: {users_path}")

    async def run(self) -> None:
        """运行爬虫"""
        print("=" * 60)
        print("社交媒体数据采集系统")
        print("=" * 60)
        print(f"平台: {self.config.platform}")
        print(f"基础 URL: {self.config.base_url}")
        print("=" * 60)

        # 示例任务
        # 任务1: 搜索特定话题
        print("\n任务1: 按话题搜索")
        await self.search_by_hashtag("python", max_tweets=50)

        # 任务2: 收集特定用户信息
        print("\n任务2: 收集用户网络")
        await self.collect_user_network("example_user", depth=2)

        # 任务3: 收集用户时间线
        print("\n任务3: 收集用户时间线")
        await self.get_user_timeline("example_user", max_tweets=20)

        # 保存结果
        await self.save_results()

        # 打印统计
        print("\n" + "=" * 60)
        print("采集统计")
        print("=" * 60)
        print(f"推文数量: {self.stats['tweets_collected']}")
        print(f"用户数量: {self.stats['users_collected']}")
        print(f"请求次数: {self.stats['requests_made']}")
        print(f"限流次数: {self.stats['rate_limited']}")
        print(f"错误次数: {self.stats['errors']}")
        print("=" * 60)

# =====================================
# 主程序
# =====================================

async def main():
    """主函数"""
    config = SocialMediaConfig(
        platform="twitter",
        base_url="https://twitter.com",
        max_tweets=1000,
        rate_limit_delay=2.0,
        output_dir="./social_data"
    )

    crawler = SocialMediaCrawler(config)
    await crawler.run()

if __name__ == "__main__":
    asyncio.run(main())

项目三:新闻网站聚合采集

新闻网站的数据采集涉及到文章内容提取、时间排序、分类聚合等需求。这类网站通常结构相对规整,但也存在一些独特的挑战,比如广告过滤、内容去重、多语言处理等。

# news_aggregator.py

import asyncio
import json
import re
import hashlib
from typing import List, Dict, Any, Optional, Set
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass
from abc import ABC, abstractmethod
import csv

# =====================================
# 配置与数据模型
# =====================================

@dataclass
class NewsConfig:
    """新闻采集配置"""
    sources: List[Dict[str, str]] = field(default_factory=list)
    categories: List[str] = field(default_factory=list)
    keywords_filter: List[str] = field(default_factory=list)  # 关键词过滤
    exclude_keywords: List[str] = field(default_factory=list)  # 排除关键词
    min_article_length: int = 200       # 最小文章长度
    max_age_days: int = 7               # 只采集指定天数内的文章
    max_articles_per_source: int = 100
    request_timeout: int = 30
    output_dir: str = "./news_data"

@dataclass
class Article:
    """文章数据模型"""
    article_id: str = ""
    title: str = ""
    content: str = ""
    summary: str = ""

    # 来源信息
    source_name: str = ""
    source_url: str = ""
    author: str = ""
    published_at: str = ""
    scraped_at: str = ""

    # 分类与标签
    category: str = ""
    tags: List[str] = field(default_factory=list)

    # 媒体
    image_url: str = ""
    has_video: bool = False

    # 统计
    word_count: int = 0
    reading_time_minutes: float = 0.0

    # 元数据
    language: str = "zh"
    is_opinion: bool = False
    is_sponsored: bool = False

    def __post_init__(self):
        if not self.scraped_at:
            self.scraped_at = datetime.now().isoformat()
        if self.content and not self.word_count:
            self.word_count = len(self.content)
            self.reading_time_minutes = self.word_count / 500  # 假设每分钟500字

    def to_dict(self) -> Dict[str, Any]:
        return asdict(self)

    def generate_id(self) -> str:
        """基于内容和来源生成唯一ID"""
        unique_str = f"{self.source_name}:{self.title}:{self.published_at}"
        return hashlib.md5(unique_str.encode()).hexdigest()[:16]

# =====================================
# 内容提取器
# =====================================

class ContentExtractor(ABC):
    """内容提取器基类"""

    @abstractmethod
    async def extract(self, html: str, url: str) -> Optional[Article]:
        """从 HTML 中提取文章内容"""
        pass

class GenericContentExtractor(ContentExtractor):
    """通用内容提取器"""

    # 常见的选择器模式
    TITLE_SELECTORS = [
        "article h1",
        ".article-title",
        ".post-title",
        "h1.title",
        "[itemprop=headline]",
    ]

    CONTENT_SELECTORS = [
        "article .content",
        ".article-body",
        ".post-content",
        "[itemprop=articleBody]",
        ".entry-content",
        "#article-content",
    ]

    AUTHOR_SELECTORS = [
        ".author",
        ".byline",
        "[rel=author]",
        "[itemprop=author]",
    ]

    DATE_SELECTORS = [
        "time",
        ".date",
        ".published",
        "[itemprop=datePublished]",
    ]

    def __init__(self, language: str = "zh"):
        self.language = language

    async def extract(self, html: str, url: str) -> Optional[Article]:
        """提取文章内容"""
        # 这里应该使用 BeautifulSoup 或其他 HTML 解析库
        # 为了演示,我们模拟提取过程

        article = Article(
            source_url=url,
            title=self.extract_field(html, self.TITLE_SELECTORS, "标题"),
            content=self.extract_field(html, self.CONTENT_SELECTORS, "正文"),
            author=self.extract_field(html, self.AUTHOR_SELECTORS, "作者"),
            published_at=self.extract_field(html, self.DATE_SELECTORS, "发布时间"),
            source_name=self.extract_source_name(url),
            language=self.language,
        )

        # 生成唯一 ID
        article.article_id = article.generate_id()

        # 生成摘要
        article.summary = self.generate_summary(article.content)

        # 提取标签
        article.tags = self.extract_tags(article.content)

        return article

    def extract_field(self, html: str, selectors: List[str], 
                      default: str) -> str:
        """使用选择器提取字段"""
        # 模拟提取,实际应该使用 CSS 选择器
        return f"模拟提取的{default}"

    def extract_source_name(self, url: str) -> str:
        """从 URL 提取来源名称"""
        from urllib.parse import urlparse
        domain = urlparse(url).netloc
        return domain.replace("www.", "").split(".")[0]

    def generate_summary(self, content: str, max_length: int = 200) -> str:
        """生成文章摘要"""
        if len(content) <= max_length:
            return content
        return content[:max_length] + "..."

    def extract_tags(self, content: str) -> List[str]:
        """提取文章标签"""
        # 简单的关键词提取(实际应该使用 NLP 技术)
        common_tags = ["科技", "财经", "体育", "娱乐", "社会"]
        found_tags = [tag for tag in common_tags if tag in content]
        return found_tags[:5]

# =====================================
# 新闻爬虫主类
# =====================================

class NewsAggregator:
    """新闻聚合器"""

    def __init__(self, config: NewsConfig):
        self.config = config
        self.articles: List[Article] = []
        self.seen_articles: Set[str] = set()  # 用于去重
        self.extractor = GenericContentExtractor()
        self.stats = {
            "sources_processed": 0,
            "articles_found": 0,
            "articles_saved": 0,
            "duplicates_skipped": 0,
            "filtered_out": 0,
            "errors": 0,
        }

        Path(config.output_dir).mkdir(parents=True, exist_ok=True)

    async def add_source(self, name: str, url: str, 
                        category: str = "general") -> None:
        """添加新闻源"""
        self.config.sources.append({
            "name": name,
            "url": url,
            "category": category,
        })

    async def fetch_source_listings(self, source: Dict[str, str]) -> List[str]:
        """获取源的文章列表"""
        article_urls = []

        print(f"\n采集新闻源: {source['name']} ({source['url']})")

        # 模拟文章列表
        # 实际应该解析首页或 RSS 源
        for i in range(1, min(self.config.max_articles_per_source + 1, 6)):
            article_url = f"{source['url']}/article-{i}"
            article_urls.append(article_url)
            print(f"  发现文章: {article_url}")

        self.stats["sources_processed"] += 1
        return article_urls

    async def fetch_article(self, url: str, source_info: Dict[str, str]) -> Optional[Article]:
        """获取单篇文章"""
        try:
            # 模拟 HTML 内容
            # 实际应该发起 HTTP 请求
            html = f"<html><body><article><h1>模拟文章标题</h1>...</article></body></html>"

            # 使用提取器解析
            article = await self.extractor.extract(html, url)

            if article:
                article.source_name = source_info["name"]
                article.category = source_info["category"]
                return article

        except Exception as e:
            print(f"  错误: 无法获取 {url}: {e}")
            self.stats["errors"] += 1

        return None

    def should_include_article(self, article: Article) -> bool:
        """判断文章是否应该被包含"""
        # 检查是否已存在(去重)
        if article.article_id in self.seen_articles:
            self.stats["duplicates_skipped"] += 1
            return False

        # 检查文章长度
        if article.word_count < self.config.min_article_length:
            self.stats["filtered_out"] += 1
            return False

        # 检查关键词过滤
        if self.config.keywords_filter:
            title_lower = article.title.lower()
            if not any(kw.lower() in title_lower for kw in self.config.keywords_filter):
                self.stats["filtered_out"] += 1
                return False

        # 检查排除关键词
        if self.config.exclude_keywords:
            content_lower = (article.title + " " + article.content).lower()
            if any(kw.lower() in content_lower for kw in self.config.exclude_keywords):
                self.stats["filtered_out"] += 1
                return False

        # 检查文章年龄
        if article.published_at:
            try:
                pub_date = datetime.fromisoformat(article.published_at)
                age_days = (datetime.now() - pub_date).days
                if age_days > self.config.max_age_days:
                    self.stats["filtered_out"] += 1
                    return False
            except (ValueError, TypeError):
                pass

        return True

    async def crawl(self) -> None:
        """执行采集"""
        print("=" * 60)
        print("新闻聚合采集系统")
        print("=" * 60)
        print(f"配置的新闻源数量: {len(self.config.sources)}")
        print(f"关键词过滤: {self.config.keywords_filter or '无'}")
        print(f"排除关键词: {self.config.exclude_keywords or '无'}")
        print("=" * 60)

        for source in self.config.sources:
            # 获取文章列表
            article_urls = await self.fetch_source_listings(source)

            # 采集每篇文章
            for url in article_urls:
                article = await self.fetch_article(url, source)

                if article:
                    self.stats["articles_found"] += 1

                    # 应用过滤规则
                    if self.should_include_article(article):
                        self.articles.append(article)
                        self.seen_articles.add(article.article_id)
                        self.stats["articles_saved"] += 1
                        print(f"  已保存: {article.title[:50]}...")
                    else:
                        print(f"  已过滤: {article.title[:50]}...")

                # 避免请求过快
                await asyncio.sleep(0.5)

    async def save_results(self) -> None:
        """保存结果"""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_dir = Path(self.config.output_dir)

        # 按来源分组保存
        sources_data = {}
        for article in self.articles:
            if article.source_name not in sources_data:
                sources_data[article.source_name] = []
            sources_data[article.source_name].append(article)

        # 保存汇总数据
        summary_path = output_dir / f"news_summary_{timestamp}.json"
        with open(summary_path, "w", encoding="utf-8") as f:
            json.dump({
                "metadata": {
                    "collected_at": datetime.now().isoformat(),
                    "total_articles": len(self.articles),
                    "sources": list(sources_data.keys()),
                },
                "articles": [a.to_dict() for a in self.articles]
            }, f, indent=2, ensure_ascii=False)
        print(f"\n汇总数据已保存: {summary_path}")

        # 按来源分别保存
        for source_name, articles in sources_data.items():
            source_filename = source_name.replace(".", "_")
            source_path = output_dir / f"news_{source_filename}_{timestamp}.json"
            with open(source_path, "w", encoding="utf-8") as f:
                json.dump({
                    "source": source_name,
                    "article_count": len(articles),
                    "articles": [a.to_dict() for a in articles]
                }, f, indent=2, ensure_ascii=False)
            print(f"来源数据已保存: {source_path}")

        # 保存 CSV 格式
        csv_path = output_dir / f"news_{timestamp}.csv"
        if self.articles:
            with open(csv_path, "w", newline="", encoding="utf-8") as f:
                writer = csv.DictWriter(f, fieldnames=[
                    "article_id", "title", "source_name", "author",
                    "category", "published_at", "word_count", "image_url"
                ])
                writer.writeheader()
                for article in self.articles:
                    writer.writerow({
                        "article_id": article.article_id,
                        "title": article.title,
                        "source_name": article.source_name,
                        "author": article.author,
                        "category": article.category,
                        "published_at": article.published_at,
                        "word_count": article.word_count,
                        "image_url": article.image_url,
                    })
            print(f"CSV 数据已保存: {csv_path}")

    async def generate_report(self) -> None:
        """生成采集报告"""
        print("\n" + "=" * 60)
        print("采集报告")
        print("=" * 60)
        print(f"处理的新闻源数: {self.stats['sources_processed']}")
        print(f"发现的文章数: {self.stats['articles_found']}")
        print(f"保存的文章数: {self.stats['articles_saved']}")
        print(f"去重跳过的文章: {self.stats['duplicates_skipped']}")
        print(f"过滤掉的文章: {self.stats['filtered_out']}")
        print(f"错误的请求数: {self.stats['errors']}")

        if self.articles:
            print("\n来源分布:")
            sources_count = {}
            for article in self.articles:
                sources_count[article.source_name] = sources_count.get(
                    article.source_name, 0) + 1
            for source, count in sorted(sources_count.items(), 
                                        key=lambda x: -x[1]):
                print(f"  {source}: {count} 篇")

        print("=" * 60)

# =====================================
# 主程序
# =====================================

async def main():
    """主函数"""
    config = NewsConfig(
        keywords_filter=["科技", "人工智能", "AI"],
        exclude_keywords=["广告", "推广"],
        max_age_days=7,
        min_article_length=200,
        output_dir="./news_data"
    )

    aggregator = NewsAggregator(config)

    # 添加新闻源
    await aggregator.add_source(
        name="TechNews",
        url="https://tech.example.com",
        category="科技"
    )
    await aggregator.add_source(
        name="FinanceDaily",
        url="https://finance.example.com",
        category="财经"
    )
    await aggregator.add_source(
        name="SportsWorld",
        url="https://sports.example.com",
        category="体育"
    )

    # 执行采集
    await aggregator.crawl()

    # 生成报告
    await aggregator.generate_report()

    # 保存结果
    await aggregator.save_results()

if __name__ == "__main__":
    asyncio.run(main())

常见使用场景与进阶技巧

分布式爬虫架构

当单机爬虫无法满足大规模数据采集需求时,我们需要考虑分布式架构。Crawlee 虽然是单机库,但可以与消息队列、分布式任务调度系统结合,构建高效的分布式爬虫。

# distributed_crawler_concept.py

import asyncio
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import hashlib

# =====================================
# 分布式架构概念演示
# =====================================

class QueueType(Enum):
    """消息队列类型"""
    REDIS = "redis"
    RABBITMQ = "rabbitmq"
    SQS = "sqs"

@dataclass
class DistributedTask:
    """分布式任务"""
    task_id: str
    task_type: str
    payload: Dict[str, Any]
    priority: int = 0
    created_at: str = ""
    scheduled_at: Optional[str] = None
    attempts: int = 0
    max_attempts: int = 3

    def __post_init__(self):
        if not self.created_at:
            self.created_at = datetime.now().isoformat()
        if not self.task_id:
            self.task_id = hashlib.md5(
                json.dumps(self.payload, sort_keys=True).encode()
            ).hexdigest()[:16]

class RedisQueueAdapter:
    """Redis 队列适配器"""

    def __init__(self, redis_url: str):
        self.redis_url = redis_url
        self.connected = False

    async def connect(self) -> None:
        """连接到 Redis"""
        print(f"连接到 Redis: {self.redis_url}")
        # 实际应该使用 aioredis 或 redis-py
        self.connected = True
        print("连接成功")

    async def enqueue(self, task: DistributedTask) -> None:
        """入队任务"""
        task_json = json.dumps(task.__dict__, ensure_ascii=False)
        # 实际应该使用 LPUSH 或 RPUSH
        print(f"任务入队: {task.task_id} ({task.task_type})")

    async def dequeue(self) -> Optional[DistributedTask]:
        """出队任务"""
        # 实际应该使用 BRPOP 或 BLPOP
        print("等待任务...")
        return None

    async def get_queue_size(self) -> int:
        """获取队列大小"""
        return 0

class CrawlerWorker:
    """爬虫工作节点"""

    def __init__(self, worker_id: str, queue: RedisQueueAdapter):
        self.worker_id = worker_id
        self.queue = queue
        self.processed_count = 0
        self.running = False

    async def start(self) -> None:
        """启动工作节点"""
        print(f"工作节点 {self.worker_id} 启动")
        self.running = True

        while self.running:
            task = await self.queue.dequeue()
            if task:
                await self.process_task(task)
            else:
                await asyncio.sleep(1)

    async def process_task(self, task: DistributedTask) -> None:
        """处理任务"""
        print(f"[{self.worker_id}] 处理任务: {task.task_id}")

        try:
            # 根据任务类型执行相应操作
            if task.task_type == "scrape_url":
                await self.handle_scrape_task(task)
            elif task.task_type == "crawl_depth":
                await self.handle_depth_task(task)
            elif task.task_type == "parse_content":
                await self.handle_parse_task(task)

            self.processed_count += 1

        except Exception as e:
            print(f"任务处理失败: {e}")
            task.attempts += 1
            if task.attempts < task.max_attempts:
                # 重新入队
                await self.queue.enqueue(task)

    async def handle_scrape_task(self, task: DistributedTask) -> None:
        """处理页面抓取任务"""
        url = task.payload.get("url")
        print(f"  抓取页面: {url}")
        await asyncio.sleep(0.5)

    async def handle_depth_task(self, task: DistributedTask) -> None:
        """处理深度爬取任务"""
        url = task.payload.get("url")
        depth = task.payload.get("depth", 1)
        print(f"  深度爬取: {url} (深度: {depth})")

    async def handle_parse_task(self, task: DistributedTask) -> None:
        """处理内容解析任务"""
        content = task.payload.get("content")
        selectors = task.payload.get("selectors", {})
        print(f"  解析内容: {len(content)} 字符")

    async def stop(self) -> None:
        """停止工作节点"""
        print(f"工作节点 {self.worker_id} 停止")
        self.running = False

class DistributedCoordinator:
    """分布式协调器"""

    def __init__(self, queue: RedisQueueAdapter):
        self.queue = queue
        self.stats = {
            "total_tasks": 0,
            "completed_tasks": 0,
            "failed_tasks": 0,
        }

    async def submit_urls(self, urls: List[str]) -> None:
        """提交 URL 列表"""
        for url in urls:
            task = DistributedTask(
                task_type="scrape_url",
                payload={"url": url}
            )
            await self.queue.enqueue(task)
            self.stats["total_tasks"] += 1

        print(f"已提交 {len(urls)} 个 URL 到任务队列")

    async def submit_depth_crawl(self, seed_url: str, max_depth: int) -> None:
        """提交深度爬取任务"""
        task = DistributedTask(
            task_type="crawl_depth",
            payload={
                "url": seed_url,
                "depth": 0,
                "max_depth": max_depth
            },
            priority=10
        )
        await self.queue.enqueue(task)
        self.stats["total_tasks"] += 1
        print(f"已提交深度爬取任务: {seed_url} (最大深度: {max_depth})")

    async def get_status(self) -> Dict[str, Any]:
        """获取系统状态"""
        queue_size = await self.queue.get_queue_size()
        return {
            **self.stats,
            "queue_size": queue_size,
            "completion_rate": (
                self.stats["completed_tasks"] / self.stats["total_tasks"]
                if self.stats["total_tasks"] > 0 else 0
            )
        }

# =====================================
# 演示分布式架构
# =====================================

async def demo_distributed_crawler():
    """演示分布式爬虫架构"""

    print("=== 分布式爬虫架构演示 ===\n")

    # 初始化组件
    queue = RedisQueueAdapter("redis://localhost:6379")
    await queue.connect()

    # 创建协调器
    coordinator = DistributedCoordinator(queue)

    # 提交任务
    urls = [f"https://example.com/page-{i}" for i in range(1, 11)]
    await coordinator.submit_urls(urls)

    # 提交深度爬取任务
    await coordinator.submit_depth_crawl("https://example.com", max_depth=3)

    # 创建工作节点
    workers = [
        CrawlerWorker(f"worker-{i}", queue)
        for i in range(1, 4)
    ]

    print("\n启动工作节点...")
    # 注意:实际应该使用 asyncio.gather 来并行运行
    # 这里为了演示,只模拟部分行为
    for worker in workers[:1]:
        # 模拟处理几个任务
        for _ in range(3):
            await asyncio.sleep(0.2)
            worker.processed_count += 1
            print(f"  [{worker.worker_id}] 已处理 {worker.processed_count} 个任务")

    # 获取状态
    status = await coordinator.get_status()
    print(f"\n系统状态: {status}")

asyncio.run(demo_distributed_crawler())

反反爬策略与高级配置

在实际生产环境中,许多网站会部署各种反爬措施来保护其数据。作为负责任的数据采集者,我们需要了解这些机制,并在合理合法的范围内应对它们。

# anti_detection.py

import asyncio
import random
import hashlib
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
from dataclasses import dataclass

# =====================================
# 反爬机制与应对策略
# =====================================

@dataclass
class AntiDetectionConfig:
    """反检测配置"""
    # 浏览器指纹
    randomize_user_agent: bool = True
    randomize_viewport: bool = True
    randomize_timezone: bool = True

    # 行为模拟
    human_like_scrolling: bool = True
    random_mouse_movement: bool = True
    variable_delays: bool = True
    min_delay: float = 1.0
    max_delay: float = 3.0

    # 请求特征
    spoof_accept_encoding: bool = True
    remove_selenium_markers: bool = True
    use_real_headers: bool = True

    # IP 相关
    rotate_proxies: bool = True
    use_residential_proxies: bool = False

class UserAgentRotator:
    """User-Agent 轮换器"""

    USER_AGENTS = [
        # Chrome on Windows
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",

        # Chrome on macOS
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",

        # Firefox on Windows
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 "
        "Firefox/121.0",

        # Firefox on Linux
        "Mozilla/5.0 (X11; Linux x86_64; rv:121.0) Gecko/20100101 Firefox/121.0",

        # Safari on macOS
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 "
        "(KHTML, like Gecko) Version/17.2 Safari/605.1.15",

        # Safari on iOS
        "Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15 "
        "(KHTML, like Gecko) Version/17.2 Mobile/15E148 Safari/604.1",

        # Edge on Windows
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
        "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
    ]

    @classmethod
    def get_random(cls) -> str:
        """获取随机 User-Agent"""
        return random.choice(cls.USER_AGENTS)

    @classmethod
    def get_by_platform(cls, platform: str) -> str:
        """根据平台获取 User-Agent"""
        platform_map = {
            "windows": [ua for ua in cls.USER_AGENTS if "Windows" in ua],
            "macos": [ua for ua in cls.USER_AGENTS if "Mac OS" in ua],
            "linux": [ua for ua in cls.USER_AGENTS if "X11" in ua],
            "ios": [ua for ua in cls.USER_AGENTS if "iPhone" in ua],
            "android": [ua for ua in cls.USER_AGENTS if "Android" in ua],
        }
        candidates = platform_map.get(platform.lower(), cls.USER_AGENTS)
        return random.choice(candidates) if candidates else cls.get_random()

class BrowserFingerprintGenerator:
    """浏览器指纹生成器"""

    @staticmethod
    def generate_viewport() -> Dict[str, int]:
        """生成随机视口大小"""
        viewports = [
            {"width": 1920, "height": 1080},
            {"width": 1366, "height": 768},
            {"width": 1536, "height": 864},
            {"width": 1440, "height": 900},
            {"width": 1280, "height": 800},
            {"width": 2560, "height": 1440},
        ]
        return random.choice(viewports)

    @staticmethod
    def generate_timezone() -> Dict[str, Any]:
        """生成随机时区配置"""
        timezones = [
            {"timezone": "Asia/Shanghai", "offset": "+08:00"},
            {"timezone": "America/New_York", "offset": "-05:00"},
            {"timezone": "Europe/London", "offset": "+00:00"},
            {"timezone": "Asia/Tokyo", "offset": "+09:00"},
            {"timezone": "Australia/Sydney", "offset": "+11:00"},
        ]
        return random.choice(timezones)

    @staticmethod
    def generate_webgl_vendor() -> Dict[str, str]:
        """生成 WebGL 供应商信息"""
        vendors = [
            {"vendor": "Intel Inc.", "renderer": "Intel Iris OpenGL Engine"},
            {"vendor": "NVIDIA Corporation", "renderer": "NVIDIA GeForce GTX 1060"},
            {"vendor": "AMD", "renderer": "AMD Radeon Pro 5500M"},
        ]
        return random.choice(vendors)

    @classmethod
    def generate_complete_fingerprint(cls) -> Dict[str, Any]:
        """生成完整指纹"""
        return {
            "user_agent": UserAgentRotator.get_random(),
            "viewport": cls.generate_viewport(),
            "timezone": cls.generate_timezone(),
            "webgl": cls.generate_webgl_vendor(),
            "languages": ["zh-CN", "zh", "en-US", "en"],
            "device_memory": random.choice([2, 4, 8, 16]),
            "hardware_concurrency": random.choice([2, 4, 8, 16]),
            "max_touch_points": random.choice([0, 1, 2, 5, 10]),
        }

class HumanBehaviorSimulator:
    """人类行为模拟器"""

    def __init__(self, config: AntiDetectionConfig):
        self.config = config
        self.last_action_time = datetime.now()

    async def random_delay(self) -> None:
        """模拟人类操作的随机延迟"""
        if not self.config.variable_delays:
            await asyncio.sleep(self.config.min_delay)
            return

        delay = random.uniform(
            self.config.min_delay,
            self.config.max_delay
        )
        await asyncio.sleep(delay)

    async def scroll_like_human(self, page, scroll_count: int = 5) -> None:
        """模拟人类滚动行为"""
        if not self.config.human_like_scrolling:
            # 简单滚动到底部
            await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
            return

        viewport_height = await page.evaluate("window.innerHeight")

        for i in range(scroll_count):
            # 随机停顿
            await asyncio.sleep(random.uniform(0.5, 2.0))

            # 随机滚动距离(不完全是整页)
            scroll_amount = int(viewport_height * random.uniform(0.5, 1.5))
            current_position = i * scroll_amount

            # 添加一些随机抖动
            jitter = random.randint(-50, 50)
            await page.evaluate(
                f"window.scrollTo({jitter}, {current_position + scroll_amount})"
            )

            # 偶尔回滚一点(模拟阅读)
            if random.random() < 0.3:
                await asyncio.sleep(random.uniform(0.3, 1.0))
                back_scroll = int(scroll_amount * random.uniform(0.1, 0.3))
                await page.evaluate(
                    f"window.scrollBy(0, -{back_scroll})"
                )

    async def move_mouse_like_human(self, page, 
                                    start_x: int, start_y: int,
                                    end_x: int, end_y: int) -> None:
        """模拟人类鼠标移动"""
        if not self.config.random_mouse_movement:
            await page.mouse.move(end_x, end_y)
            return

        # 生成贝塞尔曲线路径点
        steps = random.randint(10, 30)
        for i in range(steps):
            t = i / steps
            # 添加一些随机扰动
           扰动_x = random.uniform(-10, 10)
            扰动_y = random.uniform(-10, 10)

            x = start_x + (end_x - start_x) * t + 扰动_x
            y = start_y + (end_y - start_y) * t + 扰动_y

            await page.mouse.move(int(x), int(y))
            await asyncio.sleep(random.uniform(0.01, 0.05))

    async def click_like_human(self, page, selector: str) -> None:
        """模拟人类点击"""
        # 获取元素位置
        element = await page.query_selector(selector)
        if not element:
            return

        box = await element.bounding_box()
        if not box:
            return

        # 计算点击位置(在元素范围内随机)
        x = box["x"] + random.uniform(0, box["width"])
        y = box["y"] + random.uniform(0, box["height"])

        # 移动鼠标
        await self.move_mouse_like_human(
            page,
            int(x + random.uniform(-50, 50)),
            int(y + random.uniform(-50, 50)),
            int(x), int(y)
        )

        # 随机停顿后点击
        await asyncio.sleep(random.uniform(0.1, 0.3))
        await page.mouse.click(int(x), int(y))

class SeleniumMarkerRemover:
    """Selenium 检测标记移除器"""

    @staticmethod
    def inject_stealth_scripts(page) -> None:
        """注入反检测脚本"""
        scripts = [
            # 移除 webdriver 属性
            """
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            });
            """,

            # 模拟插件
            """
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3, 4, 5]
            });
            """,

            # 模拟语言
            """
            Object.defineProperty(navigator, 'languages', {
                get: () => ['zh-CN', 'zh', 'en-US', 'en']
            });
            """,

            # 移除自动化相关的痕迹
            """
            window.chrome = { runtime: {} };
            """,

            # 模拟权限
            """
            const originalQuery = window.navigator.permissions.query;
            window.navigator.permissions.query = (parameters) => (
                parameters.name === 'notifications' ?
                    Promise.resolve({ state: Notification.permission }) :
                    originalQuery(parameters)
            );
            """,
        ]

        for script in scripts:
            # page.evaluate(script)  # 实际应该执行
            print(f"注入脚本: {script[:50]}...")

    @staticmethod
    def remove_automation_properties() -> str:
        """生成移除自动化属性的脚本"""
        return """
        // 移除 Selenium/Automation 检测
        Object.defineProperty(navigator, 'webdriver', {
            get: () => undefined
        });

        // 修改 permissions API
        const originalQuery = window.navigator.permissions.query;
        window.navigator.permissions.query = (parameters) => (
            parameters.name === 'notifications' ?
                Promise.resolve({ state: Notification.permission }) :
                originalQuery(parameters)
        );

        // 拦截 CDP 命令
        Object.defineProperty(navigator, 'maxTouchPoints', {
            get: () => 0
        });
        """

# =====================================
# 完整反检测爬虫示例
# =====================================

class StealthCrawler:
    """隐匿爬虫"""

    def __init__(self):
        self.config = AntiDetectionConfig()
        self.fingerprint = BrowserFingerprintGenerator.generate_complete_fingerprint()
        self.behavior_simulator = HumanBehaviorSimulator(self.config)

    async def setup_browser(self, page) -> None:
        """配置浏览器以绕过检测"""
        print("配置浏览器环境...")

        # 设置 User-Agent
        await page.set_extra_http_headers({
            "User-Agent": self.fingerprint["user_agent"]
        })

        # 设置视口
        await page.set_viewport_size(
            self.fingerprint["viewport"]["width"],
            self.fingerprint["viewport"]["height"]
        )

        # 注入反检测脚本
        SeleniumMarkerRemover.inject_stealth_scripts(page)

        print("浏览器配置完成")

    async def visit_page(self, page, url: str) -> None:
        """以人类行为方式访问页面"""
        print(f"\n访问页面: {url}")

        # 随机延迟(模拟思考)
        await self.behavior_simulator.random_delay()

        # 导航
        # await page.goto(url, wait_until="networkidle")

        # 模拟滚动浏览
        # await self.behavior_simulator.scroll_like_human(page, scroll_count=3)

        print("页面访问完成")

# =====================================
# 演示反检测功能
# =====================================

def demo_anti_detection():
    """演示反检测功能"""

    print("=== 反爬策略与高级配置演示 ===\n")

    # User-Agent 轮换
    print("User-Agent 示例:")
    for i in range(3):
        ua = UserAgentRotator.get_random()
        platform = "Windows" if "Windows" in ua else "macOS" if "Mac" in ua else "Linux"
        print(f"  {i+1}. [{platform}] {ua[:60]}...")

    print()

    # 浏览器指纹生成
    print("生成的浏览器指纹:")
    fp = BrowserFingerprintGenerator.generate_complete_fingerprint()
    print(f"  User-Agent: {fp['user_agent'][:60]}...")
    print(f"  视口: {fp['viewport']}")
    print(f"  时区: {fp['timezone']}")
    print(f"  WebGL: {fp['webgl']['vendor']}")
    print(f"  设备内存: {fp['device_memory']} GB")
    print(f"  CPU 核心数: {fp['hardware_concurrency']}")

    print()

    # 配置选项
    print("反检测配置选项:")
    config = AntiDetectionConfig()
    print(f"  随机化 User-Agent: {config.randomize_user_agent}")
    print(f"  随机化视口: {config.randomize_viewport}")
    print(f"  人类滚动: {config.human_like_scrolling}")
    print(f"  随机鼠标移动: {config.random_mouse_movement}")
    print(f"  变量延迟: {config.min_delay}-{config.max_delay}秒")

demo_anti_detection()

最佳实践与性能优化

代码组织与项目结构

良好的代码组织不仅能提高开发效率,还能让项目更易于维护和扩展。以下是一个推荐的爬虫项目结构:

# project_structure_best_practices.py

# =====================================
# 推荐的项目结构
# =====================================

PROJECT_STRUCTURE = """
# 推荐的爬虫项目目录结构
# 
# my_crawler_project/
# ├── src/
# │   ├── __init__.py
# │   ├── main.py                 # 程序入口
# │   ├── config.py              # 配置管理
# │   ├── spiders/               # 爬虫模块
# │   │   ├── __init__.py
# │   │   ├── base.py            # 爬虫基类
# │   │   ├── ecommerce.py       # 电商爬虫
# │   │   ├── social.py          # 社交媒体爬虫
# │   │   └── news.py            # 新闻爬虫
# │   ├── parsers/               # 解析器模块
# │   │   ├── __init__.py
# │   │   ├── html_parser.py     # HTML 解析
# │   │   ├── json_parser.py     # JSON 解析
# │   │   └── text_parser.py     # 文本处理
# │   ├── pipelines/             # 数据管道
# │   │   ├── __init__.py
# │   │   ├── validators.py      # 数据验证
# │   │   ├── cleaners.py        # 数据清洗
# │   │   └── storage.py         # 数据存储
# │   ├── models/                # 数据模型
# │   │   ├── __init__.py
# │   │   └── entities.py        # 实体定义
# │   ├── utils/                 # 工具函数
# │   │   ├── __init__.py
# │   │   ├── http_client.py     # HTTP 客户端
# │   │   ├── proxy_manager.py   # 代理管理
# │   │   └── logger.py          # 日志工具
# │   └── services/              # 业务服务
# │       ├── __init__.py
# │       └── scheduler.py       # 任务调度
# ├── tests/                     # 测试目录
# │   ├── __init__.py
# │   ├── test_spiders.py
# │   ├── test_parsers.py
# │   └── fixtures/             # 测试数据
# ├── data/                      # 数据目录
# │   ├── input/                # 输入数据
# │   └── output/               # 输出数据
# ├── logs/                      # 日志文件
# ├── config.yaml               # YAML 配置
# ├── .env                      # 环境变量
# ├── requirements.txt          # Python 依赖
# └── README.md                 # 项目说明
"""

print(PROJECT_STRUCTURE)

# =====================================
# 配置管理最佳实践
# =====================================

CONFIG_MANAGEMENT = """
# 配置管理建议
# 
# 1. 使用环境变量管理敏感信息
#    - 数据库密码
#    - API 密钥
#    - 代理认证信息
#    - 第三方服务凭证
# 
# 2. 分离开发和生产配置
#    - config.dev.yaml
#    - config.prod.yaml
# 
# 3. 使用 Pydantic 或类似库进行配置验证
# 
# 4. 提供配置模板和文档
"""

print(CONFIG_MANAGEMENT)

# =====================================
# 错误处理最佳实践
# =====================================

ERROR_HANDLING = '''
# 错误处理模式示例

class CrawlerError(Exception):
    """爬虫基础异常"""
    pass

class ProxyError(CrawlerError):
    """代理相关错误"""
    pass

class ParsingError(CrawlerError):
    """解析错误"""
    pass

class RateLimitError(CrawlerError):
    """限流错误"""
    pass

class AuthenticationError(CrawlerError):
    """认证错误"""
    pass

# 使用示例

async def safe_request(url: str) -> Optional[str]:
    """安全的请求函数"""
    max_retries = 3

    for attempt in range(max_retries):
        try:
            response = await make_request(url)
            return response
        except RateLimitError:
            # 限流时等待后重试
            await asyncio.sleep(60 * (attempt + 1))
        except ProxyError:
            # 切换代理后重试
            await switch_to_next_proxy()
        except AuthenticationError:
            # 认证失败,记录并跳过
            logging.error(f"认证失败: {url}")
            return None
        except Exception as e:
            # 未知错误,记录后继续
            logging.error(f"请求错误 ({url}): {e}")
            if attempt == max_retries - 1:
                return None

    return None
'''

print(ERROR_HANDLING)

# =====================================
# 日志记录最佳实践
# =====================================

LOGGING_PRACTICES = '''
# 日志记录最佳实践

import logging
from pathlib import Path
from datetime import datetime

def setup_logging(log_dir: str = "./logs", 
                  level: int = logging.INFO) -> logging.Logger:
    """配置日志系统"""

    # 创建日志目录
    log_path = Path(log_dir)
    log_path.mkdir(parents=True, exist_ok=True)

    # 生成日志文件名
    log_file = log_path / f"crawler_{datetime.now().strftime('%Y%m%d')}.log"

    # 创建 logger
    logger = logging.getLogger("crawler")
    logger.setLevel(level)

    # 清除现有处理器
    logger.handlers.clear()

    # 文件处理器
    file_handler = logging.FileHandler(log_file, encoding="utf-8")
    file_handler.setLevel(level)

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.WARNING)  # 控制台只显示警告及以上

    # 格式化
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

# 日志级别使用建议
# DEBUG: 详细的调试信息(请求参数、响应内容等)
# INFO: 正常的流程信息(任务开始、进度更新等)
# WARNING: 警告信息(重试、降级等)
# ERROR: 错误信息(请求失败、解析错误等)
# CRITICAL: 严重错误(程序无法继续等)
'''

print(LOGGING_PRACTICES)

# =====================================
# 性能优化建议
# =====================================

PERFORMANCE_TIPS = """
# 性能优化建议

1. 并发控制
   - 合理设置 max_concurrency
   - 避免过高并发导致被封禁
   - 根据目标网站调整

2. 连接复用
   - 使用 HTTP Keep-Alive
   - 合理设置连接池大小
   - 避免频繁建立和断开连接

3. 缓存策略
   - 对不经常变化的资源使用缓存
   - 合理设置缓存时间
   - 区分不同类型的缓存策略

4. 资源释放
   - 及时关闭不再使用的资源
   - 避免内存泄漏
   - 定期清理临时数据

5. 异步优化
   - 充分利用 asyncio
   - 避免在异步函数中执行同步阻塞操作
   - 使用适当的并发控制机制

6. 数据处理
   - 使用生成器处理大数据集
   - 批量写入数据库
   - 避免在内存中积累过多数据

示例代码:

import asyncio
from typing import AsyncGenerator

async def process_items(items: list) -> AsyncGenerator:
    """使用异步生成器处理数据"""
    batch_size = 100

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]

        # 处理批次
        results = []
        for item in batch:
            result = await process_single_item(item)
            results.append(result)

        # 批量存储
        await batch_store(results)

        # 报告进度
        yield i + len(batch), len(items)

        # 释放内存
        del results

async def main():
    total = 1000
    async for processed, total in process_items(range(total)):
        print(f"进度: {processed}/{total} ({processed/total:.1%})")
"""

print(PERFORMANCE_TIPS)

测试策略

测试是确保爬虫可靠性的关键环节。一个好的测试策略应该覆盖单元测试、集成测试和端到端测试三个层次。

# test_strategy.py

import asyncio
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import unittest
from unittest.mock import Mock, patch, AsyncMock

# =====================================
# 单元测试示例
# =====================================

class TestDataModels(unittest.TestCase):
    """数据模型测试"""

    def test_product_validation(self):
        """测试商品数据验证"""
        from ecommerce_crawler import Product

        # 有效商品
        valid_product = Product(
            product_id="SKU001",
            title="测试商品",
            price=99.99
        )
        self.assertTrue(valid_product.validate())

        # 无标题商品
        invalid_product = Product(
            product_id="SKU002",
            title="",
            price=99.99
        )
        self.assertFalse(invalid_product.validate())

    def test_price_parsing(self):
        """测试价格解析"""
        from ecommerce_crawler import HTMLParser

        self.assertEqual(HTMLParser.parse_price("$99.99"), 99.99)
        self.assertEqual(HTMLParser.parse_price("¥299.00"), 299.00)
        self.assertEqual(HTMLParser.parse_price("invalid"), 0.0)

    def test_rating_parsing(self):
        """测试评分解析"""
        from ecommerce_crawler import HTMLParser

        self.assertEqual(HTMLParser.parse_rating("4.5 out of 5"), 4.5)
        self.assertEqual(HTMLParser.parse_rating("3.5/5 stars"), 3.5)
        self.assertEqual(HTMLParser.parse_rating("invalid"), 0.0)

class TestProxyPool(unittest.TestCase):
    """代理池测试"""

    def test_proxy_health_check(self):
        """测试代理健康状态"""
        from proxy_management import ProxyInfo, ProxyStatus

        healthy_proxy = ProxyInfo(
            url="http://proxy1.com:8080",
            success_count=10,
            failure_count=1
        )
        self.assertTrue(healthy_proxy.is_healthy)
        self.assertAlmostEqual(healthy_proxy.success_rate, 0.91, places=2)

        unhealthy_proxy = ProxyInfo(
            url="http://proxy2.com:8080",
            success_count=1,
            failure_count=10,
            status=ProxyStatus.FAILED
        )
        self.assertFalse(unhealthy_proxy.is_healthy)

# =====================================
# 集成测试示例
# =====================================

class TestCrawlerIntegration(unittest.TestCase):
    """爬虫集成测试"""

    def setUp(self):
        """测试前准备"""
        self.config = CrawlerConfig(
            start_url="https://example.com",
            max_concurrency=1,
            max_retries=1
        )

    @patch("ecommerce_crawler.Crawler.fetch_product_listings")
    async def test_crawl_flow(self, mock_fetch):
        """测试完整爬取流程"""
        mock_fetch.return_value = [
            "https://example.com/product/1",
            "https://example.com/product/2"
        ]

        crawler = EcommerceCrawler(self.config)
        await crawler.crawl()

        self.assertGreater(crawler.stats["pages_visited"], 0)

# =====================================
# Mock 和 Fixture
# =====================================

class MockResponse:
    """模拟 HTTP 响应"""

    def __init__(self, status_code: int = 200, 
                 content: str = "", json_data: Optional[Dict] = None):
        self.status_code = status_code
        self.content = content.encode() if content else b""
        self._json_data = json_data

    def json(self):
        return self._json_data

def create_mock_html_response(title: str, content: str, 
                              author: str = "Test Author") -> MockResponse:
    """创建模拟的 HTML 响应"""
    html = f'''
    <html>
    <head><title>{title}</title></head>
    <body>
        <article>
            <h1>{title}</h1>
            <p class="author">{author}</p>
            <div class="content">{content}</div>
        </article>
    </body>
    </html>
    '''
    return MockResponse(status_code=200, content=html)

# =====================================
# 测试运行器
# =====================================

def run_tests():
    """运行所有测试"""
    # 发现并运行测试
    loader = unittest.TestLoader()
    suite = unittest.TestSuite()

    # 添加测试用例
    suite.addTests(loader.loadTestsFromTestCase(TestDataModels))
    suite.addTests(loader.loadTestsFromTestCase(TestProxyPool))

    # 运行测试
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)

    # 输出摘要
    print(f"\n测试完成:")
    print(f"  运行: {result.testsRun}")
    print(f"  成功: {result.testsRun - len(result.failures) - len(result.errors)}")
    print(f"  失败: {len(result.failures)}")
    print(f"  错误: {len(result.errors)}")

    return result.wasSuccessful()

# =====================================
# 测试覆盖建议
# =====================================

TEST_COVERAGE = """
测试覆盖建议:

1. 数据模型测试
   - 必填字段验证
   - 数据类型检查
   - 边界条件处理

2. 解析器测试
   - 正常 HTML 解析
   - 异常 HTML 处理
   - 编码问题处理

3. 管道测试
   - 正常数据流
   - 异常数据处理
   - 边界情况

4. 爬虫逻辑测试
   - 请求调度
   - 重试机制
   - 去重功能

5. 集成测试
   - 端到端流程
   - 配置加载
   - 错误恢复

6. 性能测试
   - 并发处理
   - 内存使用
   - 响应时间

建议使用 pytest 作为测试框架,
配合 pytest-asyncio 处理异步测试,
使用 pytest-cov 查看测试覆盖率。
"""

print(TEST_COVERAGE)

常见问题与解决方案

在实际使用 Crawlee 的过程中,你可能会遇到各种问题。以下是一些常见问题的解决方案:

# troubleshooting.py

# =====================================
# 常见问题与解决方案
# =====================================

TROUBLESHOOTING_GUIDE = """
=== 常见问题与解决方案 ===

问题 1: 页面加载超时

原因:
- 网络连接不稳定
- 目标网站响应缓慢
- JavaScript 渲染时间过长

解决方案:
- 增加 request_timeout 配置
- 使用更长的 initial_delay
- 考虑使用无头浏览器替代 HTTP 客户端
- 检查网络连接并重试

代码示例:

config = CrawlerConfig(
    request_timeout=60,          # 增加到 60 秒
    max_retries=5,               # 增加重试次数
)
"""

print(TROUBLESHOOTING_GUIDE)

# =====================================

PROBLEM_2 = """
问题 2: 被目标网站封禁

原因:
- 请求频率过高
- User-Agent 被识别
- IP 地址被标记
- 缺少必要的 cookies/session

解决方案:
- 降低并发和请求频率
- 轮换 User-Agent
- 使用代理池
- 模拟完整的浏览器行为
- 处理 cookies 和 session

代码示例:

# 使用代理轮换
class RotatingProxyMiddleware:
    def __init__(self, proxy_pool):
        self.proxy_pool = proxy_pool

    async def process_request(self, request):
        proxy = self.proxy_pool.get_next_proxy()
        if proxy:
            request.proxy = proxy.url

# 使用 Cookie 处理
class CookieAwareCrawler(BeautifulSoupCrawler):
    def __init__(self):
        self.cookies = {}

    async def request_handler(self, context):
        # 保存 cookies
        for cookie in context.response.cookies:
            self.cookies[cookie.name] = cookie.value

        # 在后续请求中使用 cookies
        context.request.headers["Cookie"] = "; ".join(
            f"{k}={v}" for k, v in self.cookies.items()
        )
"""

print(PROBLEM_2)

# =====================================

PROBLEM_3 = """
问题 3: JavaScript 渲染内容无法获取

原因:
- 页面内容通过 JavaScript 动态加载
- 使用了懒加载技术
- 依赖 WebSocket 或其他实时通信

解决方案:
- 使用 Playwright 或 Puppeteer 等浏览器自动化工具
- 等待内容加载完成
- 触发必要的交互(如滚动、点击)
- 分析网络请求直接获取 API 数据

代码示例:

from crawlee.playwright import PlaywrightCrawler, PlaywrightCrawlerRun

class DynamicPageCrawler(PlaywrightCrawler):
    async def request_handler(self, context: PlaywrightCrawlerRun) -> None:
        # 等待内容加载
        await context.page.wait_for_selector(
            ".content-loaded",
            timeout=30000
        )

        # 触发无限滚动
        await context.page.evaluate('''
            async function scrollToBottom() {
                while (true) {
                    const oldHeight = document.body.scrollHeight;
                    window.scrollTo(0, oldHeight);
                    await new Promise(r => setTimeout(r, 1000));
                    if (document.body.scrollHeight === oldHeight) break;
                }
            }
            await scrollToBottom();
        ''')

        # 等待更多内容加载
        await context.page.wait_for_load_state("networkidle")

        # 提取内容
        content = await context.page.inner_text(".content")
"""

print(PROBLEM_3)

# =====================================

PROBLEM_4 = """
问题 4: 内存占用过高

原因:
- 累积了过多未处理的请求
- 大文件下载后未及时释放
- 数据在内存中堆积

解决方案:
- 实现批量处理和定期清理
- 使用流式处理大文件
- 配置合理的队列大小
- 定期调用垃圾回收

代码示例:

class MemoryEfficientCrawler(BeautifulSoupCrawler):
    def __init__(self):
        super().__init__(
            max_concurrency=10,
            max_requests_per_crawl=1000,  # 限制总请求数
        )
        self.processed_count = 0
        self.batch_size = 100

    async def request_handler(self, context):
        # 处理数据
        data = self.extract_data(context)
        await self.save_data(data)

        self.processed_count += 1

        # 定期清理和报告
        if self.processed_count % self.batch_size == 0:
            gc.collect()  # 触发垃圾回收
            memory_info = await self.get_memory_usage()
            print(f"已处理 {self.processed_count} 条, 内存: {memory_info}MB")

    async def get_memory_usage(self):
        import psutil
        process = psutil.Process()
        return process.memory_info().rss / 1024 / 1024
"""

print(PROBLEM_4)

# =====================================

PROBLEM_5 = """
问题 5: 数据存储失败

原因:
- 数据库连接问题
- 磁盘空间不足
- 权限问题

解决方案:
- 实现重试机制
- 检查存储后端状态
- 预留足够的磁盘空间
- 使用临时文件缓存

代码示例:

class RobustStorage:
    def __init__(self):
        self.retry_count = 3
        self.retry_delay = 5

    async def save(self, data: Dict, filepath: str):
        import asyncio

        for attempt in range(self.retry_count):
            try:
                # 写入临时文件
                temp_path = f"{filepath}.tmp"
                await self.write_file(temp_path, data)

                # 原子性移动
                import os
                os.replace(temp_path, filepath)
                return True

            except Exception as e:
                print(f"存储失败 (尝试 {attempt + 1}/{self.retry_count}): {e}")
                if attempt < self.retry_count - 1:
                    await asyncio.sleep(self.retry_delay)
                else:
                    # 保存到错误队列
                    await self.save_to_error_queue(data, filepath)
                    return False
"""

print(PROBLEM_5)

# =====================================

DEBUGGING_TIPS = """
=== 调试技巧 ===

1. 启用详细日志
   - 设置 LOG_LEVEL=DEBUG
   - 使用 logging 模块输出详细信息
   - 记录请求和响应内容

2. 使用代理进行调试
   - 配置 HTTP 代理(如 mitmproxy)
   - 捕获并分析网络请求
   - 检查请求头和响应内容

3. 逐步执行
   - 在关键位置添加断点
   - 使用 print 调试中间变量
   - 检查数据流

4. 模拟环境
   - 使用 responses 库模拟 HTTP 响应
   - 使用 pytest fixtures 提供测试数据
   - 创建最小化复现案例

5. 监控资源使用
   - 监控 CPU 和内存使用
   - 检查文件描述符数量
   - 跟踪网络连接状态

调试代码示例:

import logging

logging.basicConfig(level=logging.DEBUG)

logger = logging.getLogger(__name__)

async def debug_request(url: str, response):
    logger.debug(f"URL: {url}")
    logger.debug(f"Status: {response.status_code}")
    logger.debug(f"Headers: {response.headers}")
    logger.debug(f"Content length: {len(response.content)}")

    if response.status_code != 200:
        logger.error(f"请求失败: {response.text}")
"""

print(DEBUGGING_TIPS)

总结与资源链接

通过这篇文章,我们系统地学习了 Crawlee 这个强大的网页抓取框架。从基础概念到高级应用,从简单的单页面爬虫到复杂的分布式系统,我们覆盖了开发生产级爬虫所需的大部分知识。

核心要点回顾

Crawlee 的设计理念是让网页抓取变得简单而可靠。它通过事件驱动的架构、管道式的数据处理、智能的请求调度等特性,大大降低了开发高质量爬虫的门槛。无论你是需要快速抓取一些网页数据,还是构建企业级的数据采集系统,Crawlee 都能提供合适的解决方案。

在实际应用中,最重要的几点经验是:第一,合理配置并发和延迟参数,在效率和稳定性之间找到平衡;第二,充分利用代理池和反检测策略,提高采集的成功率;第三,设计良好的数据管道,确保数据的质量和一致性;第四,建立完善的错误处理和监控机制,及时发现和解决问题。

相关资源链接

如果你想进一步学习 Crawlee 和网页抓取技术,以下资源会很有帮助:

Crawlee 官方文档提供了完整的 API 参考和教程,是学习框架的首选资源。Apify 平台本身也有大量关于网页抓取和数据处理的最佳实践文章。CSDN 和知乎上有许多中文社区贡献的 Crawlee 使用经验和案例分析。对于更深入的技术细节,可以参考 Playwright 和 Puppeteer 的官方文档,因为 Crawlee 的浏览器自动化功能是基于这些库实现的。

此外,还有一些值得关注的开源项目:Scrapy 是 Python 生态中另一个流行的爬虫框架,与 Crawlee 有不同的设计哲学;Playwright 和 Puppeteer 提供了强大的浏览器自动化能力;Playwright 官方还提供了 Stealth 插件,可以有效绕过常见的反爬检测。

未来展望

网页技术和反爬技术都在不断演进,爬虫开发也需要持续学习和更新。未来的爬虫技术可能会更加依赖机器学习来识别页面结构和处理验证码,或者更多地利用云原生架构来实现弹性扩展。随着人工智能技术的发展,智能化的数据提取和自然语言处理也会成为爬虫系统的重要组成部分。

无论技术如何变化,理解基本原理、掌握最佳实践、保持学习的态度,始终是成为优秀爬虫开发者的关键。希望这篇文章能够为你提供一个良好的起点,帮助你在网页抓取的道路上走得更远。

如果你有任何问题或想法,欢迎在评论区交流讨论。祝你抓取愉快!

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注