从手动跑批到自动化流水线:DataTalksClub Zoomcamp 如何带你系统攻破现代数据工程全链路
引言:为什么这门课值得你投入时间
在数据工程领域,简历上写着“熟悉 ETL”、“会用 Spark”的候选人多如牛毛,但真正能独立搭建一套完整数据流水线、处理生产环境常见问题的开发者却凤毛麟角。这种供需错配的背后,是传统学习路径的致命缺陷:要么只学理论不动手,要么东学一点西学一点始终形不成体系。
DataTalksClub 的 data-engineering-zoomcamp 正是为解决这个痛点而生。这门课程不是教你背几个命令、跑几个 Demo那么简单,而是让你从零开始,亲手搭建一套完整的数据工程系统:从本地开发环境到云端部署,从单表处理到流式计算,从手动脚本到自动化编排,覆盖了现代数据工程师日常工作的全部核心场景。
这门课程由 DataTalks.Club 社区维护,背后是欧洲数据领域资深从业者组成的团队。课程内容持续更新,配套的 Slack 社区活跃着来自全球的学习者和导师。与某些商业培训课程不同,这门课完全免费,但质量却毫不逊色于付费课程——甚至在 GitHub 上获得了超过两万颗星,被认为是数据工程领域最受欢迎的入门级实战课程之一。
为什么值得学习
数据工程师的日常工作到底是什么?这个问题困扰着很多想转行的新人。打开招聘网站,岗位描述里写满了“数据仓库”、“实时流处理”、“ETL 流程优化”等术语,但你很难找到一份清晰的答案告诉你这些技术如何串联起来、如何在实际项目中协同工作。
Zoomcamp 的课程设计完美回答了这个问题。整个学习路径模拟了一条真实的数据流水线:从数据源获取原始数据,到数据湖存储,再到数据仓库转换,最后到分析层输出。每个模块都不是孤立的知识点,而是整条流水线上的一个环节。你学完所有内容后,回过头来看,就能清晰地理解为什么需要 Docker(环境隔离和一致性)、为什么需要工作流编排工具(任务依赖和调度)、为什么需要流处理框架(实时数据需求)。这种系统化的认知,是单纯看文档或教程很难获得的。
另一个值得关注的点是课程的工具选型。Zoomcamp 选择了当今业界主流的工具栈:Terraform 进行基础设施编排、Terraform 进行云端资源管理、GCP BigQuery 作为数据仓库、Mage 作为工作流编排工具、Apache Spark 处理大规模数据、Kafka 实现流式处理、DBT 完成数据转换。这些工具在生产环境中被广泛使用,掌握它们等于拿到了进入数据工程领域的入场券。
更重要的是,这门课不追求“大而全”的理论灌输,而是坚持“做中学”的理念。每个模块都设计了明确的实战项目:搭建一个完整的 GCP 数据平台、实现航班数据的数据管道、构建实时 Taxi 数据分析系统、开发一个端到端的机器学习特征工程流水线。这些项目不是玩具级别的 Demo,而是足够接近生产环境的实战练习。
环境搭建:从零开始配置开发环境
学习任何技术的第一步都是搭建一个可靠的开发环境。Zoomcamp 使用 Docker 作为环境管理的核心工具,这解决了数据工程领域最令人头疼的“环境不一致”问题:本地跑得好好的代码,部署到服务器就报错;换了台电脑所有依赖都要重新装;团队成员的开发环境各有差异难以对齐。Docker 让你把应用程序和它的全部依赖打包成一个镜像,确保在任何地方都以相同的方式运行。
首先需要在你的电脑上安装 Docker Desktop。Windows 用户建议使用 WSL2 作为后端,macOS 用户直接使用默认配置即可。安装完成后,打开终端验证安装是否成功:
docker --version
docker-compose --version
如果看到版本号输出,说明 Docker 已经正确安装。接下来创建一个专门用于课程项目的目录结构:
mkdir de-zoomcamp
cd de-zoomcamp
mkdir 01-docker-terraform
mkdir 02-gcp
mkdir 03-data-ingestion
mkdir 04-analytics-engineering
mkdir 05-streaming
mkdir 06-capstone
课程的所有练习和项目都将在这个目录结构下展开。建议使用 VS Code 作为编辑器,配合 Docker 插件可以获得很好的开发体验:直接在容器内编辑代码、查看日志、调试问题。
对于需要使用 GCP 的模块,还需要配置 Google Cloud SDK。首先在 GCP 控制台创建一个免费账户和项目,然后在本地安装 gcloud CLI:
brew install google-cloud-sdk # macOS
# 或者从 https://cloud.google.com/sdk/docs/install 下载安装
安装完成后进行身份验证和项目配置:
gcloud auth login
gcloud config set project YOUR_PROJECT_ID
gcloud services enable compute.googleapis.com bigquery.googleapis.com
最后一步是启用必要的 GCP API。打开 GCP 控制台的 API 和服务面板,确保以下 API 处于启用状态:BigQuery API、Compute Engine API、Cloud Storage API。这些服务在免费层配额内基本可以完成所有课程练习。
模块一:Docker 与 Terraform 基础
Docker 的核心概念是镜像和容器。镜像是一个只读的模板,包含了运行应用程序所需的全部内容:代码、运行时环境、系统工具、库文件。你可以从 Dockerfile 构建镜像,也可以从 Docker Hub 拉取他人分享的镜像。容器则是镜像的运行实例,可以理解为镜像的一个进程。
来看一个典型的 Dockerfile 示例,这是课程中用于运行 Python 数据处理任务的容器配置:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
ENV PYTHONUNBUFFERING=1
CMD ["python", "pipeline.py"]
这个 Dockerfile 的工作流程是这样的:首先指定基础镜像 python:3.9-slim,这是一个精简版的 Python 运行环境;其次设置工作目录为 /app;然后复制依赖文件并安装 Python 包;接着复制全部代码文件;最后设置环境变量并指定启动命令。
构建镜像使用 docker build 命令:
docker build -t de-zoomcamp:latest .
构建完成后,可以运行容器:
docker run -it de-zoomcamp:latest bash
参数 -it 表示交互式终端模式,让你能够进入容器的 shell 环境。退出容器后,容器会自动停止。
docker-compose 是 Docker 的编排工具,用于定义和运行多容器应用。在数据工程场景中,通常需要同时运行多个服务:PostgreSQL 数据库、Redis 缓存、Spark 工作节点等。docker-compose.yml 文件让你在一个 YAML 文件中定义所有服务,然后通过一条命令启动整个系统。
version: '3.8'
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: root
POSTGRES_PASSWORD: root
POSTGRES_DB: ny_taxi
ports:
- "5432:5432"
volumes:
- ny_taxi_data:/var/lib/postgresql/data
pgadmin:
image: dpage/pgadmin4
environment:
PGADMIN_DEFAULT_EMAIL: admin@admin.com
PGADMIN_DEFAULT_PASSWORD: root
ports:
- "8080:80"
volumes:
ny_taxi_data:
这个配置定义了两个服务:PostgreSQL 数据库和 pgAdmin 管理界面。通过 volumes 配置,数据被持久化存储,即使容器被删除也不会丢失。运行 docker-compose up -d 启动所有服务,docker-compose down 停止并删除容器。
Terraform 是基础设施即代码(Infrastructure as Code)工具,用于声明式地管理云端资源。使用 Terraform,你可以用代码定义 GCP 项目中的所有资源,然后通过几条命令创建、更新或销毁这些资源,确保环境配置的一致性和可重复性。
provider "google" {
project = "your-project-id"
region = "us-central1"
}
resource "google_storage_bucket" "data_lake_bucket" {
name = "your-data-lake-bucket"
location = "US"
lifecycle_rule {
action {
type = "Delete"
}
condition {
age = 30 # 30天后自动删除旧文件
}
}
}
resource "google_bigquery_dataset" "data_warehouse" {
dataset_id = "trips_data_all"
location = "US"
}
这个 Terraform 配置声明了两个资源:一个 GCS 存储桶用作数据湖,一个 BigQuery 数据集用作数据仓库。运行 terraform init 初始化工作目录,terraform plan 预览将要执行的操作,terraform apply 执行实际变更。每次修改 .tf 文件后重新运行 plan 和 apply,就能以受控的方式更新云端基础设施。
模块二:数据摄取与 PostgreSQL 实战
数据摄取是数据流水线的第一步,负责从各种来源提取数据并加载到目标系统。Zoomcamp 使用纽约出租车数据集作为主要的数据源,这个数据集在数据工程领域非常经典,因为它足够大(数亿条记录)、结构清晰、又贴近真实业务场景。
课程首先带你用 pandas 将 CSV 和 Parquet 格式的数据加载到 PostgreSQL:
import pandas as pd
from sqlalchemy import create_engine
import os
# 建立数据库连接
engine = create_engine(
'postgresql://root:root@localhost:5432/ny_taxi'
)
# 读取本地 CSV 文件
df = pd.read_csv('taxi_zone_lookup.csv')
# 写入 PostgreSQL 表
df.to_sql(
name='zones',
con=engine,
if_exists='replace',
dtype={'zone_id': sqlalchemy.Integer}
)
# 验证数据写入
query = "SELECT COUNT(*) FROM zones"
result = pd.read_sql(query, con=engine)
print(f"zones 表包含 {result.iloc[0, 0]} 条记录")
这段代码展示了数据摄取的基本模式:连接目标数据库、读取源数据、写入目标表。对于小规模数据(几十 MB 以内),这种方法是可行的。但当数据量达到 GB 甚至 TB 级别时,pandas 的内存消耗会成为严重问题,需要使用分块读取或者直接使用数据库的批量导入工具。
PostgreSQL 的 COPY 命令是高效批量导入数据的选择:
-- 先创建表结构
CREATE TABLE IF NOT EXISTS zones (
locationid INTEGER,
borough VARCHAR(255),
zone VARCHAR(255),
service_zone VARCHAR(255)
);
-- 使用 COPY 命令批量导入
COPY zones
FROM '/path/to/taxi_zone_lookup.csv'
WITH (FORMAT csv, HEADER true);
-- 验证导入结果
SELECT COUNT(*) FROM zones;
COPY 命令在数据库服务器端执行,数据不需要经过客户端内存中转,导入速度比 INSERT 快几十倍。对于更大的数据集,可以考虑使用 pg_bulkload 这样的外部工具进一步优化。
课程还介绍了如何使用 pgAdmin 可视化工具管理数据库。pgAdmin 是一个功能完整的 PostgreSQL 管理界面,支持查询执行、数据浏览、备份恢复等操作。通过 docker-compose 启动 pgAdmin 后,访问 http://localhost:8080,使用配置中设置的邮箱和密码登录,就能看到一个类似 Navicat 的图形化数据库管理界面。
模块三:工作流编排实战
当数据管道变得复杂,涉及多个相互依赖的任务时,手动执行脚本的方式就不可行了。工作流编排工具解决了这个问题:它允许你定义任务之间的依赖关系,自动处理执行顺序,支持任务失败重试、历史记录查看、定时调度等生产级功能。
Zoomcamp 使用 Mage 作为主要的工作流编排工具。Mage 是一个开源的现代数据管道平台,以代码为中心,支持 Python、SQL、R、TensorFlow 等多种语言,特别适合数据工程师和分析师使用。
Mage 的项目结构非常清晰:
mage_demo/
├── magic/
│ ├── etl/
│ │ ├── load_data.py
│ │ ├── transform_data.py
│ │ └── export_data.py
│ └── pipelines/
│ └── etl_pipeline/
│ ├── metadata.yaml
│ └── block_1.py
├── io/
├── transformers/
└── profiles/
每个管道(Pipeline)由多个块(Block)组成,每个块是一个独立的计算单元。块与块之间可以定义依赖关系,Mage 会自动计算执行顺序。
# block_1.py - 数据加载块
from mage_ai.data_preparation.variable_manager import get_variable
from mage_ai.io.postgres import PostgresLoader
import pandas as pd
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data_from_postgres(*args, **kwargs):
query = 'SELECT * FROM yellow_taxi_trips LIMIT 10000'
loader = PostgresLoader()
df = loader.load(query=query)
return df
# block_2.py - 数据转换块
import pandas as pd
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform_data(df, *args, **kwargs):
# 移除异常数据
df = df[df['passenger_count'] > 0]
df = df[df['trip_distance'] > 0]
# 添加时间维度特征
df['pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])
df['trip_duration_minutes'] = (
df['dropoff_datetime'] - df['pickup_datetime']
).dt.total_seconds() / 60
return df
# block_3.py - 数据导出块
from mage_ai.io.bigquery import BigQueryLoader
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_bigquery(df, **kwargs):
loader = BigQueryLoader.with_credentials(
credentials_path='/path/to/service_account.json',
project_id='your-project-id'
)
loader.export(
df,
uri='bigquery://your-project.dataset.yellow_taxi_trips',
if_exists='replace'
)
return df
Mage 的 Web UI 提供了完整的开发体验:编写代码、预览数据、执行管道、查看日志都可以在浏览器中完成。更重要的是,Mage 支持触发器(Trigger)配置,可以设置定时执行或者依赖外部事件执行。
trigger_name: daily_pipeline
description: 每天凌晨2点运行ETL管道
schedule:
cron: "0 2 * * *" # UTC时间,每天02:00
pipeline_uuid: etl_pipeline
配置好触发器后,Mage 会自动按照 cron 表达式调度管道运行,完全无需人工干预。
模块四:云平台与 BigQuery 实战
Google BigQuery 是一个无服务器的数据仓库服务,擅长处理大规模数据集的分析查询。与传统需要管理集群的数据仓库不同,BigQuery 自动扩展计算资源,用户只需要为实际查询的数据量付费。
Zoomcamp 带你将本地 PostgreSQL 中的数据迁移到 BigQuery。迁移的策略是:首先将数据导出为 Parquet 格式(压缩率高、查询速度快),上传到 GCS 存储桶,然后通过 BigQuery 的外部表功能或者加载操作将数据导入 BigQuery。
from google.cloud import storage
from google.cloud import bigquery
import pandas as pd
# 初始化客户端
storage_client = storage.Client()
bigquery_client = bigquery.Client()
# 上传 Parquet 文件到 GCS
bucket = storage_client.bucket('your-bucket-name')
blob = bucket.blob('yellow_taxi_trips.parquet')
# 使用 pandas 读取数据并保存为 Parquet
df = pd.read_parquet('yellow_taxi_trips.parquet')
df.to_parquet('/tmp/trips.parquet', engine='pyarrow', compression='snappy')
blob.upload_from_filename('/tmp/trips.parquet')
# 定义 BigQuery 表Schema
table_id = "your-project.your_dataset.yellow_taxi_trips"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("VendorID", "INTEGER"),
bigquery.SchemaField("tpep_pickup_datetime", "TIMESTAMP"),
bigquery.SchemaField("tpep_dropoff_datetime", "TIMESTAMP"),
bigquery.SchemaField("passenger_count", "FLOAT"),
bigquery.SchemaField("trip_distance", "FLOAT"),
bigquery.SchemaField("RatecodeID", "INTEGER"),
bigquery.SchemaField("store_and_fwd_flag", "STRING"),
bigquery.SchemaField("PULocationID", "INTEGER"),
bigquery.SchemaField("DOLocationID", "INTEGER"),
bigquery.SchemaField("payment_type", "INTEGER"),
bigquery.SchemaField("fare_amount", "FLOAT"),
bigquery.SchemaField("extra", "FLOAT"),
bigquery.SchemaField("mta_tax", "FLOAT"),
bigquery.SchemaField("tip_amount", "FLOAT"),
bigquery.SchemaField("tolls_amount", "FLOAT"),
bigquery.SchemaField("improvement_surcharge", "FLOAT"),
bigquery.SchemaField("total_amount", "FLOAT"),
],
source_format=bigquery.SourceFormat.PARQUET,
)
# 加载数据
uri = "gs://your-bucket-name/yellow_taxi_trips.parquet"
load_job = bigquery_client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # 等待作业完成
table = bigquery_client.get_table(table_id)
print(f"加载了 {table.num_rows} 行数据到 {table_id}")
数据加载完成后,就可以在 BigQuery 中执行 SQL 分析了。BigQuery 的 SQL 语法与标准 ANSI SQL 基本兼容,但对窗口函数、分区表等高级特性有很好的支持。
-- 按月统计乘客数量和收入
SELECT
DATE_TRUNC(DATE(tpep_pickup_datetime), MONTH) AS pickup_month,
COUNT(*) AS trip_count,
SUM(passenger_count) AS total_passengers,
ROUND(SUM(total_amount), 2) AS total_revenue,
ROUND(AVG(trip_distance), 2) AS avg_distance
FROM `your-project.your_dataset.yellow_taxi_trips`
WHERE tpep_pickup_datetime >= '2023-01-01'
GROUP BY pickup_month
ORDER BY pickup_month;
-- 使用窗口函数计算累计收入
WITH monthly_revenue AS (
SELECT
DATE_TRUNC(DATE(tpep_pickup_datetime), MONTH) AS month,
SUM(total_amount) AS revenue
FROM `your-project.your_dataset.yellow_taxi_trips`
GROUP BY month
)
SELECT
month,
revenue,
SUM(revenue) OVER (ORDER BY month) AS cumulative_revenue,
ROUND(100.0 * revenue / SUM(revenue) OVER (), 2) AS revenue_share_pct
FROM monthly_revenue
ORDER BY month;
BigQuery 的分区表功能对查询性能和成本控制非常重要。通过将表按时间字段分区,BigQuery 可以只扫描相关分区的数据,大幅减少查询的数据量和费用:
CREATE TABLE IF NOT EXISTS `your-project.your_dataset.yellow_taxi_partitioned`
PARTITION BY DATE(tpep_pickup_datetime)
AS
SELECT * FROM `your-project.your_dataset.yellow_taxi_trips`;
对于高基数字段,还可以使用聚簇(Clustering)功能进一步优化:
CREATE TABLE IF NOT EXISTS `your-project.your_dataset.yellow_taxi_clustered`
PARTITION BY DATE(tpep_pickup_datetime)
CLUSTER BY PULocationID, DOLocationID
AS
SELECT * FROM `your-project.your_dataset.yellow_taxi_trips`;
聚簇表会根据指定的列对数据进行排序,使得按这些列过滤的查询更加高效。
模块五:Spark 与 Databricks 实战
当数据规模达到单机无法处理的级别时,Apache Spark 成为必然选择。Spark 是一个分布式计算引擎,将数据分散到集群的多个节点上并行处理,从而实现水平扩展。Databricks 是 Spark 的商业化平台,提供了托管的 Spark 环境、笔记本界面和集群管理功能,大大降低了使用 Spark 的门槛。
Zoomcamp 使用 Spark 处理出租车数据,演示大规模数据处理的基本操作:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
# 初始化 Spark Session
spark = SparkSession.builder \
.appName('taxi-data-analysis') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.28.0') \
.getOrCreate()
# 定义 Schema(避免 Spark 推测类型提高性能)
schema = StructType([
StructField("VendorID", IntegerType(), True),
StructField("tpep_pickup_datetime", StringType(), True),
StructField("tpep_dropoff_datetime", StringType(), True),
StructField("passenger_count", DoubleType(), True),
StructField("trip_distance", DoubleType(), True),
StructField("RatecodeID", DoubleType(), True),
StructField("store_and_fwd_flag", StringType(), True),
StructField("PULocationID", IntegerType(), True),
StructField("DOLocationID", IntegerType(), True),
StructField("payment_type", IntegerType(), True),
StructField("fare_amount", DoubleType(), True),
StructField("extra", DoubleType(), True),
StructField("mta_tax", DoubleType(), True),
StructField("tip_amount", DoubleType(), True),
StructField("tolls_amount", DoubleType(), True),
StructField("improvement_surcharge", DoubleType(), True),
StructField("total_amount", DoubleType(), True),
])
# 读取 Parquet 文件
df = spark.read \
.option("header", "true") \
.schema(schema) \
.parquet('data/yellow_taxi_trips/*.parquet')
print(f"读取了 {df.count()} 条记录")
df.printSchema()
Spark DataFrame API 与 pandas 的操作方式类似,但背后是分布式计算引擎,处理方式有本质不同:
# 数据清洗
df_clean = df \
.filter(F.col("passenger_count") > 0) \
.filter(F.col("trip_distance") > 0) \
.filter(F.col("fare_amount") > 0) \
.filter(F.col("tpep_pickup_datetime").isNotNull()) \
.filter(F.col("tpep_dropoff_datetime").isNotNull())
# 添加计算列
df_clean = df_clean.withColumn(
"pickup_date",
F.to_date(F.col("tpep_pickup_datetime"))
).withColumn(
"trip_duration_hours",
(F.unix_timestamp(F.col("tpep_dropoff_datetime")) -
F.unix_timestamp(F.col("tpep_pickup_datetime"))) / 3600
)
# 按日期聚合统计
daily_stats = df_clean.groupBy("pickup_date") \
.agg(
F.count("*").alias("trip_count"),
F.sum("passenger_count").alias("total_passengers"),
F.round(F.avg("trip_distance"), 2).alias("avg_distance"),
F.round(F.avg("fare_amount"), 2).alias("avg_fare"),
F.round(F.sum("total_amount"), 2).alias("total_revenue")
) \
.orderBy("pickup_date")
daily_stats.show(10)
Spark 支持将数据直接写入 BigQuery:
# 配置 BigQuery 连接
spark.conf.set("credentialsFile", "/path/to/service-account.json")
spark.conf.set("parentProject", "your-project-id")
# 写入 BigQuery
daily_stats.write \
.format("bigquery") \
.option("table", "your-project.dataset.daily_taxi_stats") \
.option("temporaryGcsBucket", "your-temp-bucket") \
.mode("overwrite") \
.save()
Databricks 环境下,可以使用笔记本单元格逐块执行代码,并添加 Markdown 注释说明每一步的目的。这种笔记本式的开发方式特别适合数据探索和原型开发阶段。
模块六:流式处理与 Kafka 实战
批处理适合对历史数据做离线分析,但很多业务场景需要实时数据:欺诈检测需要秒级响应、实时推荐需要最新用户行为、监控系统需要即时告警。流式处理解决的就是这类问题。
Apache Kafka 是流式处理架构的核心组件,扮演着数据总线和消息队列的双重角色。Kafka 的核心概念包括:Topic(数据主题,类似于数据库的表)、Producer(生产者,向 Topic 写入数据)、Consumer(消费者,从 Topic 读取数据)、Partition(分区,实现数据并行处理和水平扩展)。
# Kafka Producer - 模拟 Taxi 数据实时产生
from kafka import KafkaProducer
import json
import time
import random
from datetime import datetime
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 生成模拟 Taxi 数据
def generate_taxi_event():
return {
'vendor_id': random.randint(1, 2),
'pickup_time': datetime.now().isoformat(),
'dropoff_time': datetime.now().isoformat(),
'passenger_count': random.randint(1, 6),
'trip_distance': round(random.uniform(0.5, 20.0), 2),
'pulocationid': random.randint(1, 265),
'dolocationid': random.randint(1, 265),
'fare_amount': round(random.uniform(5.0, 50.0), 2),
}
# 持续发送事件
while True:
event = generate_taxi_event()
producer.send('taxi-rides', event)
print(f"发送事件: {event}")
time.sleep(random.uniform(0.1, 1.0))
# Kafka Consumer - 实时消费和处理 Taxi 数据
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'taxi-rides',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='taxi-consumer-group',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# 实时计算统计
trip_count = 0
total_fare = 0.0
for message in consumer:
event = message.value
trip_count += 1
total_fare += event['fare_amount']
# 每100条打印一次统计
if trip_count % 100 == 0:
print(f"已处理 {trip_count} 条记录,平均票价: ${total_fare / trip_count:.2f}")
实际生产环境中,Kafka 后面通常会接流处理框架(如 Apache Flink 或 Kafka Streams)来做复杂的实时计算。课程使用 Spark Structured Streaming 展示如何实现实时聚合:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, avg, sum as spark_sum
spark = SparkSession.builder \
.appName('streaming-taxi-analysis') \
.config('spark.jars.packages', 'com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.28.0') \
.getOrCreate()
# 读取 Kafka 流数据
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "taxi-rides") \
.load()
# 解析 JSON
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import from_json, col
schema = StructType([
StructField("vendor_id", IntegerType(), True),
StructField("pickup_time", StringType(), True),
StructField("dropoff_time", StringType(), True),
StructField("passenger_count", IntegerType(), True),
StructField("trip_distance", DoubleType(), True),
StructField("pulocationid", IntegerType(), True),
StructField("dolocationid", IntegerType(), True),
StructField("fare_amount", DoubleType(), True),
])
events = lines.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# 窗口聚合(每分钟统计一次)
windowed_stats = events \
.withWatermark("pickup_time", "10 minutes") \
.groupBy(
window(col("pickup_time"), "1 minute"),
col("pulocationid")
) \
.agg(
count("*").alias("trip_count"),
spark_sum("passenger_count").alias("total_passengers"),
avg("fare_amount").alias("avg_fare"),
avg("trip_distance").alias("avg_distance")
)
# 输出到控制台(调试用)或者 BigQuery(生产用)
query = windowed_stats \
.writeStream \
.format("bigquery") \
.outputMode("append") \
.option("checkpointLocation", "gs://your-bucket/checkpoints/") \
.option("table", "your-project.dataset.streaming_stats") \
.start()
这段代码展示了流处理的核心模式:读取 Kafka 数据、解析 JSON、定义时间窗口、执行聚合、输出到外部系统。withWatermark 是处理乱序数据的关键机制,允许系统延迟一段时间后关闭窗口,确保即使数据到达顺序有偏差也能正确计算。
模块七:分析工程与 DBT 实战
DBT(Data Build Tool)是数据转换层的革命性工具,它将 SQL 文件组织成模型,通过依赖关系图管理转换逻辑,支持测试、文档、增量加载等高级功能。在现代数据架构中,DBT 通常位于数据湖或数仓之上,负责将原始数据转换为业务可用的分析模型。
Zoomcamp 的 DBT 模块带你构建一个完整的指标层:
-- models/marts/dim_trips.sql
{{ config(materialized='table') }}
WITH source_trips AS (
SELECT *
FROM {{ ref('stg_yellow_taxi_trips') }}
),
enriched_trips AS (
SELECT
trip_id,
vendor_id,
pickup_datetime,
dropoff_datetime,
passenger_count,
trip_distance,
pickup_location_id,
dropoff_location_id,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
improvement_surcharge,
total_amount,
-- 计算派生字段
DATE_TRUNC(pickup_datetime, HOUR) AS pickup_hour,
DATE_TRUNC(pickup_datetime, DAY) AS pickup_date,
EXTRACT(YEAR FROM pickup_datetime) AS pickup_year,
EXTRACT(MONTH FROM pickup_datetime) AS pickup_month,
EXTRACT(DAYOFWEEK FROM pickup_datetime) AS pickup_day_of_week,
-- 计算行程时长(分钟)
TIMESTAMP_DIFF(dropoff_datetime, pickup_datetime, MINUTE) AS trip_duration_minutes
FROM source_trips
)
SELECT * FROM enriched_trips
DBT 的 ref 函数用于建立模型之间的依赖关系。当执行 dbt run 时,DBT 会自动分析依赖图,按照正确的顺序执行模型。
DBT 的种子(Seed)功能用于加载静态数据:
-- data/taxi_zones.csv
location_id,borough,zone,service_zone
1,East Harlem North,East Harlem North,Boro Zone
2,East Harlem South,East Harlem South,Boro Zone
3,Chelsea,Chelsea,Boro Zone
...
# dbt_project.yml
seeds:
de_zoomcamp:
taxi_zones:
schema: staging
运行 dbt seed 将 CSV 文件加载到数据仓库中作为表,供其他模型引用。
DBT 还支持数据测试,确保数据质量:
-- models/marts/dim_trips.yml
version: 2
models:
- name: dim_trips
description: 出租车行程事实表
columns:
- name: trip_id
tests:
- unique
- not_null
- name: pickup_datetime
tests:
- not_null
- name: passenger_count
tests:
- not_null
- accepted_values:
values: [1, 2, 3, 4, 5, 6]
- name: trip_duration_minutes
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
运行 dbt test 执行所有测试,DBT 会报告哪些行没有通过测试,帮助你发现数据质量问题。
综合项目实战
课程的最后阶段是一个综合项目,要求你独立完成一个端到端的数据管道设计、开发和部署。这个项目没有标准答案,你需要综合运用课程中学到的所有技能:环境配置、数据摄取、转换处理、工作流编排、云端部署。
项目要求包括:使用 Docker 容器化所有组件、使用 Terraform 在 GCP 创建基础设施、使用 Mage 或其他编排工具调度任务、数据存储在 GCS 和 BigQuery 中、最终结果可以通过 BI 工具(如 Metabase)可视化展示。
一个参考的项目结构:
capstone-project/
├── terraform/
│ ├── main.tf
│ ├── variables.tf
│ └── outputs.tf
├── mage/
│ ├── pipelines/
│ │ └── data_pipeline/
│ └── io_config.yaml
├── dbt/
│ └── models/
├── docker/
│ └── docker-compose.yml
└── README.md
这个项目是检验学习成果的最佳方式。建议按照敏捷开发的方式迭代推进:首先完成核心功能的 MVP,然后逐步添加错误处理、监控告警、性能优化等高级特性。
实战技巧与最佳实践
数据工程领域有很多“坑”是教科书上不会教的,课程和社区总结了宝贵的经验教训。
关于 Docker 的最佳实践,首先是镜像大小控制。使用多阶段构建(Multi-stage Build)可以显著减小镜像体积:构建阶段使用完整的开发环境,应用阶段只复制编译产物。例如 Python 应用可以使用 alpine 基础镜像并删除 build 依赖。其次是善用 .dockerignore 文件,避免把 node_modules、.git 等不必要的文件复制到镜像中,减少构建上下文传输时间。
# 多阶段构建示例
FROM python:3.9 AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --user -r requirements.txt
FROM python:3.9-slim
WORKDIR /app
COPY --from=builder /root/.local /root/.local
COPY . .
ENV PATH=/root/.local/bin:$PATH
CMD ["python", "main.py"]
关于 BigQuery 的最佳实践,关键是理解其计费模型。BigQuery 按查询扫描的数据量计费,而不是按查询执行时间。因此,SELECT * 是一条非常昂贵的操作,在分析前应该只选择需要的列。使用 PARTITION BY 和 CLUSTER BY 可以让 BigQuery 只扫描相关分区的数据,大幅减少费用。另外,频繁查询的表应该考虑使用物化视图(Materialized View)预计算结果。
关于 Kafka 的最佳实践,首先是分区数的设置。分区数决定了消费者的最大并行度,但也会影响文件句柄和内存占用。通常建议将分区数设置为消费者数量的整数倍,并根据吞吐量目标调整:目标吞吐量 T、分区数 = 消费者数 × 期望每个消费者处理速度。其次是消费者组管理。同一个消费者组的实例共享分区,协调器会自动分配分区给各实例,实现负载均衡。不同消费者组相互独立,都会收到全量消息。
关于 DBT 的最佳实践,模型命名应该清晰表达业务含义。stg_ 前缀表示临时模型、dim_ 表示维度表、fct_ 表示事实表。复杂逻辑应该拆分为多个模型,通过 ref() 建立依赖链,而不是写一个巨大的 SQL 文件。另外,善用宏(Macro)封装重复逻辑,例如日期处理、字符串格式化等。
调试与排错技巧
数据管道出问题是常态,能够快速定位和解决问题是数据工程师的核心能力。
管道失败时,首先检查日志。DBT 和 Mage 都提供详细的执行日志,包含 SQL 语句、错误堆栈、数据样本等关键信息:
# DBT 调试模式 - 显示更多信息
dbt --debug run -s dim_trips
# Mage 查看特定块执行日志
mage run /path/to/pipeline block_1
PostgreSQL 连接问题通常由几个原因导致:容器网络配置错误、服务未启动、认证配置不正确。使用 pg_isready 验证服务状态,使用 psql 直接连接测试:
pg_isready -h localhost -p 5432
psql -h localhost -p 5432 -U root -d ny_taxi
BigQuery 权限问题需要检查服务账号的 IAM 角色配置,确保有 BigQuery Admin 或足够的细粒度权限:
gcloud projects add-iam-policy-binding your-project-id \
--member="serviceAccount:your-service-account@your-project.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
Kafka 消费者没有收到消息的排查步骤:检查 Topic 是否存在(kafka-topics –list)、检查消费者组状态(kafka-consumer-groups –describe)、检查生产者是否正常发送、确认消费者组名正确(不同消费者组名会各自从 offset 0 开始消费)。
扩展学习资源
完成 Zoomcamp 课程后,可以继续深入学习以下方向:
数据质量与可观测性方向,可以研究 Great Expectations、dbt tests、Soda.io 等工具。Soda SQL 是专门为数据仓库设计的质量检测工具,可以配置数据列的各种检查规则,自动发现数据异常。
高级流处理方向,可以学习 Apache Flink。Flink 提供了比 Spark Structured Streaming 更强大的窗口函数和状态管理能力,是大规模实时计算的首选框架。
数据平台工程方向,可以研究现代数据栈(Modern Data Stack)的其他组件:Airbyte 用于数据集成(EL 部分)、Metabase 或 Apache Superset 用于可视化、dbt Cloud 用于托管 DBT 运行。
机器学习工程方向,可以学习 MLflow 用于实验跟踪和模型管理、Kubeflow 用于机器学习流水线编排、Tecton 用于特征平台。
结语:开启你的数据工程之旅
DataTalksClub 的 data-engineering-zoomcamp 不仅仅是几周的学习内容,更是一套完整的方法论和工具链的起点。课程设计者精心选择的工具组合——Docker、Terraform、Mage、Spark、Kafka、BigQuery、DBT——代表了现代数据平台的主流技术选型。掌握这些工具,你就有能力构建从数据摄取到分析产出的完整流水线。
更重要的是,这门课培养的思维方式:如何将业务需求转化为数据模型、如何设计可靠的自动化流程、如何确保数据质量和系统稳定性——这些软技能比任何具体工具都更有价值。因为工具会不断演进,但解决问题的底层逻辑是相通的。
学习数据工程没有捷径,但有正确的方法。选择一门系统化的课程,亲手敲代码、调 Bug、解决问题,比看一百篇博客文章更有收获。Zoomcamp 提供了这样的机会,而且完全免费。现在就打开 GitHub 仓库,开始你的数据工程之旅吧。
相关链接
- GitHub 仓库:https://github.com/DataTalksClub/data-engineering-zoomcamp
- DataTalks.Club 社区:https://datatalks.club
- Slack 交流群:https://datatalks.club/slack.html
- 课程视频(B站搬运):搜索“DataTalksClub 数据工程zoomcamp”
评论区