别再手动筛选日期了!这个开源项目让「最近30天」数据分析效率提升10倍

别再手动筛选日期了!这个开源项目让「最近30天」数据分析效率提升10倍

别再手动筛选日期了!这个开源项目让「最近30天」数据分析效率提升10倍


为什么这个项目值得关注

在日常的数据分析工作中,「最近30天」几乎是最常见的时间窗口。无论是运营报表、销售统计,还是用户行为分析,我们总是需要频繁地筛选和计算最近一个月的数据。然而,你是否注意到这样一个现象:每次写SQL或者Python代码时,我们都要重复编写类似 WHERE date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) 这样的日期过滤逻辑?

这种重复性工作不仅枯燥,还容易出错。不同的数据源可能有不同的日期格式,不同的业务场景可能需要灵活调整时间窗口,而且一旦需求变化,修改起来也相当麻烦。

mvanhorn/last30days-skill 正是为了解决这一痛点而生。这个开源项目提供了一个通用的时间窗口处理框架,让你能够以一种优雅、灵活的方式处理「最近N天」类型的数据分析需求。无论是30天、7天还是90天,只需要简单配置,就能轻松切换。

这个项目能解决什么问题

想象一下这样的场景:运营团队需要一份日报,展示过去7天的用户活跃数据;销售总监要求一份周报,统计过去30天的订单情况;产品经理想要一份月度报告,分析过去90天的功能使用情况。如果每次都要重新编写日期过滤逻辑,那将是一场噩梦。

last30days-skill 的核心价值在于:

统一的接口设计:无论数据来源是MySQL、PostgreSQL,还是CSV文件、API接口,都使用相同的调用方式。

灵活的时间窗口:不只是30天,你可以轻松设置为任意天数,还能支持工作日、自然日等不同计算方式。

智能的日期处理:自动处理时区问题、日期格式差异,还能识别相对日期和绝对日期。

开箱即用的模板:内置多种常见场景的模板,让你从繁琐的日期处理中解放出来。


环境搭建

系统要求

在开始之前,确保你的开发环境满足以下要求:

  • Python 3.8 或更高版本
  • pip 包管理器
  • 稳定的网络连接(用于下载依赖)

安装步骤

第一步:创建虚拟环境(推荐)

为项目创建一个独立的Python环境是一个好习惯,这样可以避免依赖冲突:

# 创建名为 last30days 的虚拟环境
python -m venv last30days_env

# 激活虚拟环境
# 在 Windows 系统上:
last30days_env\Scripts\activate

# 在 macOS 或 Linux 系统上:
source last30days_env/bin/activate

第二步:安装项目依赖

项目依赖可以通过pip直接安装。如果你想要安装最新版本:

pip install last30days-skill

如果你希望安装特定版本以保持稳定性:

pip install last30days-skill==1.2.0

安装完成后,验证一下是否成功:

python -c "import last30days; print(last30days.__version__)"

第三步:配置数据源(可选)

如果你的数据存储在数据库中,需要进行相应的配置。项目支持多种数据库,配置文件通常命名为 config.yaml

database:
  type: mysql
  host: localhost
  port: 3306
  user: your_username
  password: your_password
  database: your_database

date_settings:
  default_window: 30
  timezone: Asia/Shanghai
  format: "%Y-%m-%d"

核心功能详解

1. 时间窗口配置器(TimeWindow)

这是项目最核心的组件,用于定义和管理时间范围。TimeWindow 类提供了丰富的配置选项:

基本用法

from last30days import TimeWindow

# 创建一个最近30天的时间窗口(默认值)
tw = TimeWindow()
print(tw.start_date)  # 输出:30天前的日期
print(tw.end_date)    # 输出:今天

# 创建最近7天的时间窗口
tw_7days = TimeWindow(days=7)
print(f"最近7天:{tw_7days.start_date}{tw_7days.end_date}")

# 创建最近90天的时间窗口
tw_90days = TimeWindow(days=90)
print(f"最近90天:{tw_90days.start_date}{tw_90days.end_date}")

指定具体日期范围

from datetime import datetime
from last30days import TimeWindow

# 指定开始和结束日期
tw_custom = TimeWindow(
    start_date="2024-01-01",
    end_date="2024-01-31"
)
print(f"自定义范围:{tw_custom.start_date}{tw_custom.end_date}")

使用相对日期表达式

from last30days import TimeWindow

# 上个月第一天至今
tw_last_month = TimeWindow(
    start_date="first_day_of_last_month",
    relative=True
)

# 本季度第一天至今
tw_this_quarter = TimeWindow(
    start_date="first_day_of_current_quarter",
    relative=True
)

# 指定工作日数(自动排除周末)
tw_workdays = TimeWindow(
    days=30,
    workdays_only=True  # 只计算工作日
)

2. 数据过滤器(DateFilter)

DateFilter 类负责将时间窗口应用到实际数据上,支持多种数据源和格式。

SQL查询中的应用

from last30days import TimeWindow, DateFilter

# 创建时间窗口
tw = TimeWindow(days=30)

# 创建过滤器
df = DateFilter(timewindow=tw)

# 生成SQL条件
sql_condition = df.as_sql_condition(
    date_column="order_date",
    dialect="mysql"
)
print(sql_condition)
# 输出: order_date >= '2024-01-01' AND order_date <= '2024-01-31'

完整的SQL查询示例

from last30days import TimeWindow, DateFilter

tw = TimeWindow(days=30)
df = DateFilter(timewindow=tw)

date_condition = df.as_sql_condition("created_at", dialect="postgresql")

query = f"""
SELECT 
    product_id,
    COUNT(*) as order_count,
    SUM(amount) as total_amount
FROM orders
WHERE {date_condition}
GROUP BY product_id
ORDER BY total_amount DESC
"""
print(query)

DataFrame过滤

import pandas as pd
from last30days import TimeWindow, DateFilter

# 假设我们有一个包含日期列的DataFrame
df = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=60),
    'value': range(60)
})

# 创建时间窗口
tw = TimeWindow(days=30)

# 过滤DataFrame
filtered_df = df[tw.filter_date(df['date'])]
print(f"原始数据行数: {len(df)}")
print(f"过滤后行数: {len(filtered_df)}")

3. 聚合计算器(Aggregator)

在获取到时间窗口内的数据后,往往需要进行各种聚合计算。Aggregator 类提供了常用的聚合函数封装。

基本聚合操作

from last30days import TimeWindow, Aggregator
import pandas as pd

# 创建示例数据
df = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=60),
    'category': ['A', 'B'] * 30,
    'sales': [100 + i for i in range(60)]
})

# 创建时间窗口
tw = TimeWindow(days=30)

# 使用聚合器
agg = Aggregator(df, timewindow=tw)

# 按天聚合
daily_result = agg.aggregate_by_day('sales')
print("按天聚合结果:")
print(daily_result)

# 按周聚合
weekly_result = agg.aggregate_by_week('sales')
print("\n按周聚合结果:")
print(weekly_result)

# 按月聚合
monthly_result = agg.aggregate_by_month('sales')
print("\n按月聚合结果:")
print(monthly_result)

自定义聚合函数

from last30days import TimeWindow, Aggregator
import pandas as pd

df = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=60),
    'category': ['A', 'B'] * 30,
    'sales': [100 + i for i in range(60)],
    'quantity': [1 + i % 5 for i in range(60)]
})

tw = TimeWindow(days=30)
agg = Aggregator(df, timewindow=tw)

# 自定义聚合:同时计算销售额和销量
result = agg.aggregate(
    group_by='category',
    metrics={
        'sales': ['sum', 'mean', 'max'],
        'quantity': ['sum', 'mean']
    }
)
print(result)

4. 报表生成器(ReportBuilder)

ReportBuilder 类可以将分析结果快速生成为格式化的报表,支持多种输出格式。

生成文本报表

from last30days import TimeWindow, ReportBuilder
import pandas as pd

# 准备数据
df = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=30),
    'revenue': [1000 + i * 50 for i in range(30)]
})

tw = TimeWindow(days=30)
report = ReportBuilder(df, timewindow=tw)

# 生成文本格式报表
text_report = report.build_text_report(
    title="月度营收报表",
    metrics=['revenue'],
    include_summary=True
)
print(text_report)

生成JSON格式报表

from last30days import TimeWindow, ReportBuilder
import json

# 继续使用上面的数据...

json_report = report.build_json_report(
    title="月度营收报表",
    metrics=['revenue']
)
print(json.dumps(json_report, indent=2, ensure_ascii=False))

生成HTML报表

from last30days import TimeWindow, ReportBuilder

html_report = report.build_html_report(
    title="月度营收报表",
    metrics=['revenue'],
    include_chart=True
)

# 保存为HTML文件
with open('report.html', 'w', encoding='utf-8') as f:
    f.write(html_report)

实战教程:从需求到实现

场景一:电商订单数据分析

让我们通过一个完整的实战案例来学习如何使用这个项目。假设你是电商平台的运营分析师,需要生成一份「最近30天」的销售数据分析报告。

第一步:准备数据环境

import pandas as pd
from datetime import datetime, timedelta
from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder

# 模拟电商订单数据
def generate_sample_orders():
    """生成模拟订单数据"""
    dates = pd.date_range(
        start=datetime.now() - timedelta(days=60),
        end=datetime.now(),
        freq='H'
    )

    orders = []
    for i, date in enumerate(dates):
        if i % 10 != 0:  # 模拟部分时间点没有订单
            orders.append({
                'order_id': f'ORD{date.strftime("%Y%m%d")}{i:04d}',
                'order_date': date,
                'customer_id': f'CUST{(i % 100):03d}',
                'product_id': f'PROD{(i % 50):03d}',
                'category': ['电子产品', '服装', '食品', '家居'][i % 4],
                'quantity': 1 + (i % 3),
                'unit_price': 50 + (i % 100),
                'discount': 0 if i % 5 != 0 else 10,
                'status': ['已完成', '已发货', '处理中'][i % 3]
            })

    return pd.DataFrame(orders)

# 生成数据
orders_df = generate_sample_orders()
print(f"生成订单数据: {len(orders_df)} 条记录")
print(orders_df.head())

运行结果示例:

生成订单数据: 1296 条记录
      order_id          order_date customer_id product_id category  quantity  \
0  ORD20240101  2024-01-01 00:00:00      CUST000      PROD000        电子产品        1   
1  ORD20240101  2024-01-01 01:00:00      CUST010      PROD010          食品        3   
2  ORD20240101  2024-01-01 02:00:00      CUST020      PROD020         服装        2   
...

第二步:创建时间窗口并过滤数据

# 创建最近30天的时间窗口
tw = TimeWindow(days=30)

# 使用DateFilter过滤数据
df_filtered = orders_df[tw.filter_date(orders_df['order_date'])]

print(f"时间窗口: {tw.start_date}{tw.end_date}")
print(f"过滤后订单数: {len(df_filtered)} 条")
print(f"过滤后订单占比: {len(df_filtered) / len(orders_df) * 100:.1f}%")

第三步:计算关键指标

# 计算订单总金额
df_filtered = df_filtered.copy()
df_filtered['total_amount'] = (
    df_filtered['quantity'] * df_filtered['unit_price'] - df_filtered['discount']
)

# 基本统计
print("=" * 50)
print("最近30天销售概况")
print("=" * 50)
print(f"总订单数: {len(df_filtered)}")
print(f"总销售额: ¥{df_filtered['total_amount'].sum():,.2f}")
print(f"平均客单价: ¥{df_filtered['total_amount'].mean():,.2f}")
print(f"总销售数量: {df_filtered['quantity'].sum()}")
print(f"去重客户数: {df_filtered['customer_id'].nunique()}")

第四步:按维度分析

# 按类目分析
category_analysis = df_filtered.groupby('category').agg({
    'order_id': 'count',
    'total_amount': 'sum',
    'quantity': 'sum'
}).rename(columns={
    'order_id': 'order_count',
    'total_amount': 'revenue',
    'quantity': 'total_quantity'
})

category_analysis['avg_order_value'] = (
    category_analysis['revenue'] / category_analysis['order_count']
)

print("\n按类目销售分析:")
print("-" * 50)
print(category_analysis.sort_values('revenue', ascending=False))

第五步:生成日报表

# 使用ReportBuilder生成完整报表
report = ReportBuilder(df_filtered, timewindow=tw)

# 构建分析结果字典
analysis_result = {
    'report_title': '电商平台最近30天销售分析报告',
    'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'time_window': {
        'start': str(tw.start_date.date()),
        'end': str(tw.end_date.date()),
        'days': 30
    },
    'summary': {
        'total_orders': len(df_filtered),
        'total_revenue': float(df_filtered['total_amount'].sum()),
        'avg_order_value': float(df_filtered['total_amount'].mean()),
        'unique_customers': df_filtered['customer_id'].nunique()
    },
    'category_breakdown': category_analysis.to_dict('index')
}

# 生成JSON格式报告
json_report = report.build_json_report(
    title=analysis_result['report_title'],
    data=analysis_result
)

print("\n" + "=" * 50)
print("生成的报表内容:")
print("=" * 50)
import json
print(json.dumps(json_report, indent=2, ensure_ascii=False))

场景二:用户行为日志分析

接下来让我们看另一个常见场景:分析用户最近30天的行为日志。

第一步:准备日志数据

import random
from datetime import datetime, timedelta

def generate_user_logs(num_logs=5000):
    """生成模拟用户行为日志"""
    behaviors = ['浏览', '点击', '收藏', '加入购物车', '下单', '评价']
    pages = ['首页', '商品列表', '商品详情', '购物车', '个人中心', '搜索页']

    logs = []
    base_time = datetime.now() - timedelta(days=60)

    for i in range(num_logs):
        log_time = base_time + timedelta(
            minutes=random.randint(0, 60*24*60)
        )
        logs.append({
            'log_id': f'LOG{i:08d}',
            'user_id': f'USER{random.randint(1, 500):04d}',
            'behavior': random.choice(behaviors),
            'page': random.choice(pages),
            'timestamp': log_time,
            'duration_seconds': random.randint(5, 300) if random.random() > 0.3 else None,
            'device': random.choice(['iOS', 'Android', 'Web'])
        })

    return pd.DataFrame(logs)

# 生成日志数据
logs_df = generate_user_logs()
print(f"生成用户日志: {len(logs_df)} 条记录")

第二步:按时间段分析用户行为

from last30days import TimeWindow, Aggregator

# 创建时间窗口
tw = TimeWindow(days=30)

# 过滤日志
filtered_logs = logs_df[tw.filter_date(logs_df['timestamp'])].copy()

# 按小时统计
filtered_logs['hour'] = filtered_logs['timestamp'].dt.hour
hourly_stats = filtered_logs.groupby('hour').size()

print("最近30天各时段用户活跃度:")
print("-" * 40)
for hour, count in hourly_stats.items():
    bar = '█' * (count // 50)
    print(f"{hour:02d}:00 - {count:4d} {bar}")

第三步:行为漏斗分析

def funnel_analysis(df, behaviors):
    """简单的漏斗分析"""
    results = {}
    total_users = df['user_id'].nunique()

    for behavior in behaviors:
        users_with_behavior = df[df['behavior'] == behavior]['user_id'].nunique()
        conversion_rate = (users_with_behavior / total_users) * 100 if total_users > 0 else 0
        results[behavior] = {
            'user_count': users_with_behavior,
            'conversion_rate': conversion_rate
        }

    return results

# 定义漏斗步骤
funnel_steps = ['浏览', '点击', '加入购物车', '下单']

print("\n用户行为漏斗:")
print("=" * 50)
funnel_result = funnel_analysis(filtered_logs, funnel_steps)

for i, (step, data) in enumerate(funnel_result.items()):
    bar_length = int(data['conversion_rate'] / 2)
    bar = '█' * bar_length
    print(f"{step:10s} | {data['user_count']:4d} 用户 | {data['conversion_rate']:5.1f}% {bar}")

    if i > 0:
        prev_step = funnel_steps[i - 1]
        prev_rate = funnel_result[prev_step]['conversion_rate']
        drop_off = prev_rate - data['conversion_rate']
        print(f"            ↓ 流失 {drop_off:.1f}%")

第四步:生成行为分析报告

from last30days import ReportBuilder

# 计算各项指标
behavior_summary = filtered_logs.groupby('behavior').agg({
    'log_id': 'count',
    'user_id': 'nunique'
}).rename(columns={
    'log_id': 'total_count',
    'user_id': 'unique_users'
})

behavior_summary['percentage'] = (
    behavior_summary['total_count'] / behavior_summary['total_count'].sum() * 100
)

print("\n用户行为分布:")
print("-" * 50)
print(behavior_summary.sort_values('total_count', ascending=False))

# 设备分布
device_stats = filtered_logs.groupby('device').agg({
    'log_id': 'count',
    'user_id': 'nunique'
}).rename(columns={
    'log_id': 'total_count',
    'user_id': 'unique_users'
})

print("\n设备分布:")
print("-" * 50)
print(device_stats)

场景三:财务数据月度对比

财务分析是另一个常见场景,让我们看看如何使用这个项目进行月度财务数据对比。

第一步:创建财务模拟数据

import random

def generate_financial_data():
    """生成模拟财务数据"""
    records = []
    base_date = datetime.now() - timedelta(days=90)

    categories = {
        '收入': ['商品销售', '服务收入', '会员订阅', '广告收入'],
        '支出': ['采购成本', '运营费用', '人力成本', '营销费用', '租金水电']
    }

    for i in range(200):
        record_date = base_date + timedelta(days=random.randint(0, 90))
        is_income = random.random() > 0.4
        category_type = '收入' if is_income else '支出'

        records.append({
            'transaction_id': f'TXN{record_date.strftime("%Y%m")}{i:04d}',
            'date': record_date,
            'type': category_type,
            'category': random.choice(categories[category_type]),
            'amount': abs(int(random.gauss(10000, 5000))),
            'description': f'业务交易 {i+1}'
        })

    return pd.DataFrame(records)

# 生成财务数据
financial_df = generate_financial_data()
print("生成财务数据:")
print(financial_df.head(10))

第二步:计算各月汇总

from last30days import TimeWindow

# 创建两个时间窗口进行对比
tw_current = TimeWindow(days=30)  # 本月
tw_last = TimeWindow(days=60, end_days_ago=30)  # 上月

# 过滤数据
current_month = financial_df[tw_current.filter_date(financial_df['date'])].copy()
last_month = financial_df[tw_last.filter_date(financial_df['date'])].copy()

print(f"本月数据: {len(current_month)} 条")
print(f"上月数据: {len(last_month)} 条")

第三步:计算环比数据

def calculate_summary(df):
    """计算财务汇总"""
    income = df[df['type'] == '收入']['amount'].sum()
    expense = df[df['type'] == '支出']['amount'].sum()
    profit = income - expense

    return {
        'income': income,
        'expense': expense,
        'profit': profit,
        'profit_rate': (profit / income * 100) if income > 0 else 0
    }

current_summary = calculate_summary(current_month)
last_summary = calculate_summary(last_month)

print("\n" + "=" * 60)
print("月度财务对比报告")
print("=" * 60)
print(f"{'指标':<15} {'本月':>15} {'上月':>15} {'环比':>15}")
print("-" * 60)

for metric in ['income', 'expense', 'profit', 'profit_rate']:
    current_val = current_summary[metric]
    last_val = last_summary[metric]

    if metric == 'profit_rate':
        current_str = f"{current_val:.2f}%"
        last_str = f"{last_val:.2f}%"
        change = current_val - last_val
        change_str = f"{change:+.2f}%"
    else:
        current_str = f{current_val:,.0f}"
        last_str = f{last_val:,.0f}"
        if last_val > 0:
            change = (current_val - last_val) / last_val * 100
            change_str = f"{change:+.1f}%"
        else:
            change_str = "N/A"

    print(f"{metric:<15} {current_str:>15} {last_str:>15} {change_str:>15}")

常见使用场景

场景一:自动化定时报表

你可以将 last30days-skill 集成到定时任务中,自动生成周期性报表:

import schedule
import time
from datetime import datetime

from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder

def generate_daily_report():
    """每日定时任务:生成销售日报"""
    print(f"[{datetime.now()}] 开始生成日报...")

    # 假设这是从数据库获取的数据
    # sales_data = fetch_sales_from_db()

    tw = TimeWindow(days=7)  # 最近7天
    report = ReportBuilder(sales_data, timewindow=tw)

    # 生成HTML报告
    html_content = report.build_html_report(title="每日销售报表")

    # 保存报告
    filename = f"daily_report_{datetime.now().strftime('%Y%m%d')}.html"
    with open(filename, 'w', encoding='utf-8') as f:
        f.write(html_content)

    print(f"[{datetime.now()}] 日报已生成: {filename}")

def generate_weekly_report():
    """每周定时任务:生成销售周报"""
    print(f"[{datetime.now()}] 开始生成周报...")

    tw = TimeWindow(days=30)  # 最近30天
    report = ReportBuilder(sales_data, timewindow=tw)

    # 生成JSON格式的周报
    json_content = report.build_json_report(title="每周销售报表")

    filename = f"weekly_report_{datetime.now().strftime('%Y%m%d')}.json"
    with open(filename, 'w', encoding='utf-8') as f:
        import json
        json.dump(json_content, f, ensure_ascii=False, indent=2)

    print(f"[{datetime.now()}] 周报已生成: {filename}")

# 设置定时任务
schedule.every().day.at("08:00").do(generate_daily_report)
schedule.every().monday.at("09:00").do(generate_weekly_report)

# 运行调度器
while True:
    schedule.run_pending()
    time.sleep(60)

场景二:多数据源联合分析

当你的数据分布在多个数据源时,可以使用 DataSource 统一管理:

from last30days import TimeWindow, DataSource

# 创建数据源配置
sources = {
    'orders': DataSource(
        type='mysql',
        connection_params={
            'host': 'localhost',
            'database': 'ecommerce',
            'query': 'SELECT * FROM orders WHERE order_date >= {start_date}'
        }
    ),
    'customers': DataSource(
        type='mysql',
        connection_params={
            'host': 'localhost',
            'database': 'ecommerce',
            'query': 'SELECT * FROM customers WHERE created_date >= {start_date}'
        }
    ),
    'logs': DataSource(
        type='csv',
        file_path='./data/user_logs.csv'
    )
}

# 创建时间窗口
tw = TimeWindow(days=30)

# 加载所有数据
all_data = {}
for name, source in sources.items():
    all_data[name] = source.load(timewindow=tw)
    print(f"已加载 {name}: {len(all_data[name])} 条记录")

# 合并分析
combined_analysis = merge_and_analyze(all_data)
print("多数据源联合分析完成!")

场景三:自定义日期过滤器

对于特殊需求,你可以继承 DateFilter 类创建自定义过滤器:

from last30days import DateFilter, TimeWindow

class BusinessDayFilter(DateFilter):
    """排除法定节假日的过滤器"""

    def __init__(self, timewindow, holidays=None):
        super().__init__(timewindow)
        self.holidays = holidays or []

    def filter(self, df, date_column):
        """过滤出业务日(非周末且非节假日)"""
        import pandas as pd

        filtered = df[
            (df[date_column].dt.dayofweek < 5) &  # 排除周末
            (~df[date_column].dt.date.isin(self.holidays))  # 排除节假日
        ]

        # 进一步应用时间窗口
        return filtered[
            (filtered[date_column] >= self.timewindow.start_date) &
            (filtered[date_column] <= self.timewindow.end_date)
        ]

# 使用自定义过滤器
tw = TimeWindow(days=30)
custom_filter = BusinessDayFilter(
    tw,
    holidays=['2024-01-01', '2024-02-10', '2024-05-01']
)

business_day_data = custom_filter.filter(data_df, 'date_column')
print(f"业务日数据量: {len(business_day_data)}")

技巧与最佳实践

性能优化技巧

1. 避免在循环中创建时间窗口

# 低效的做法
for day in date_range:
    tw = TimeWindow(days=30)  # 每次都创建新实例
    result = process_data(tw, day)

# 高效的做法
tw = TimeWindow(days=30)  # 一次性创建
for day in date_range:
    result = process_data(tw, day)  # 复用实例

2. 使用向量化操作而非逐行处理

# 低效的做法
for index, row in df.iterrows():
    if tw.contains(row['date']):
        filtered.append(row)

# 高效的做法
filtered_df = df[tw.filter_date(df['date'])]

3. 对大数据集使用分块处理

from last30days import TimeWindow, DateFilter

tw = TimeWindow(days=30)

# 分块读取大型CSV文件
chunk_size = 10000
filtered_chunks = []

for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    filtered_chunk = chunk[tw.filter_date(chunk['date'])]
    if len(filtered_chunk) > 0:
        filtered_chunks.append(filtered_chunk)

# 合并结果
result = pd.concat(filtered_chunks, ignore_index=True)

代码组织最佳实践

1. 封装为可复用的分析模块

# analytics_module.py
from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder

class SalesAnalyzer:
    """销售数据分析器"""

    def __init__(self, data, timewindow=None):
        self.data = data
        self.timewindow = timewindow or TimeWindow(days=30)
        self.filtered_data = None

    def filter_by_timewindow(self):
        """按时间窗口过滤数据"""
        self.filtered_data = self.data[
            self.timewindow.filter_date(self.data['date'])
        ].copy()
        return self

    def calculate_kpis(self):
        """计算关键绩效指标"""
        if self.filtered_data is None:
            self.filter_by_timewindow()

        return {
            'total_revenue': self.filtered_data['amount'].sum(),
            'order_count': len(self.filtered_data),
            'avg_order_value': self.filtered_data['amount'].mean(),
            'unique_customers': self.filtered_data['customer_id'].nunique()
        }

    def generate_report(self, format='json'):
        """生成报告"""
        report = ReportBuilder(self.filtered_data, timewindow=self.timewindow)
        return report.build_json_report(title="销售分析报告")

2. 使用配置文件管理参数

# config/report_config.yaml
reports:
  sales:
    default_window: 30
    metrics:
      - total_revenue
      - order_count
      - avg_order_value
    group_by:
      - category
      - region
    output_format: html

  user_activity:
    default_window: 7
    metrics:
      - active_users
      - session_count
      - avg_duration
    group_by:
      - device
      - platform
    output_format: json
# 使用配置
import yaml

with open('config/report_config.yaml', 'r', encoding='utf-8') as f:
    config = yaml.safe_load(f)

# 创建分析器
from last30days import TimeWindow

sales_config = config['reports']['sales']
analyzer = SalesAnalyzer(
    data=sales_data,
    timewindow=TimeWindow(days=sales_config['default_window'])
)

错误处理与调试

1. 优雅处理日期解析错误

from last30days import TimeWindow, DateFilter
from datetime import datetime

def safe_date_parse(date_string):
    """安全的日期解析"""
    formats = [
        '%Y-%m-%d',
        '%Y/%m/%d',
        '%Y%m%d',
        '%d-%m-%Y',
        '%m/%d/%Y'
    ]

    for fmt in formats:
        try:
            return datetime.strptime(date_string, fmt)
        except ValueError:
            continue

    raise ValueError(f"无法解析日期: {date_string}")

# 使用安全的解析函数
df['parsed_date'] = df['date_string'].apply(safe_date_parse)

2. 调试时间窗口配置

from last30days import TimeWindow

# 调试模式:打印详细信息
class DebugTimeWindow(TimeWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        print("=" * 50)
        print("TimeWindow 配置信息:")
        print(f"  类型: {self.__class__.__name__}")
        print(f"  开始日期: {self.start_date}")
        print(f"  结束日期: {self.end_date}")
        print(f"  天数: {(self.end_date - self.start_date).days}")
        print("=" * 50)

# 使用调试类
tw = DebugTimeWindow(days=30)

进阶功能

自定义时间窗口类型

除了内置的固定天数窗口,项目还支持创建自定义时间窗口:

from last30days import TimeWindow
from datetime import datetime, timedelta

class FiscalYearWindow(TimeWindow):
    """财年窗口(假设财年从4月开始)"""

    def __init__(self, year=None):
        if year is None:
            year = datetime.now().year

        # 财年从4月1日开始
        if datetime.now().month < 4:
            start_date = datetime(year - 1, 4, 1)
        else:
            start_date = datetime(year, 4, 1)

        end_date = start_date + timedelta(days=365) - timedelta(days=1)

        super().__init__(start_date=start_date, end_date=end_date)

# 使用财年窗口
fy_window = FiscalYearWindow(2024)
print(f"2024财年: {fy_window.start_date}{fy_window.end_date}")

动态时间窗口表达式

from last30days import TimeWindow, parse_relative_date

# 支持的自然语言表达式
expressions = [
    "最近7天",
    "最近30天",
    "上个月",
    "本月",
    "最近一个季度",
    "最近一年"
]

for expr in expressions:
    tw = parse_relative_date(expr)
    print(f"{expr}: {tw.start_date}{tw.end_date}")

数据可视化集成

import matplotlib.pyplot as plt
from last30days import TimeWindow, Aggregator

# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False

tw = TimeWindow(days=30)
agg = Aggregator(filtered_data, timewindow=tw)

# 获取日度数据
daily_data = agg.aggregate_by_day('sales')

# 绘制趋势图
fig, ax = plt.subplots(figsize=(12, 6))

ax.plot(daily_data['date'], daily_data['sales'], marker='o', linewidth=2)
ax.fill_between(daily_data['date'], daily_data['sales'], alpha=0.3)

ax.set_title('最近30天销售趋势', fontsize=16)
ax.set_xlabel('日期', fontsize=12)
ax.set_ylabel('销售额', fontsize=12)
ax.grid(True, alpha=0.3)

plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('sales_trend.png', dpi=150)
plt.show()

常见问题解答

Q1: 如何处理不同时区的数据?

from last30days import TimeWindow
from datetime import datetime
import pytz

# 创建时间窗口时指定时区
tw = TimeWindow(
    days=30,
    timezone='Asia/Shanghai'  # 或 'UTC', 'America/New_York' 等
)

# 时区转换
local_tz = pytz.timezone('Asia/Shanghai')
utc_tz = pytz.UTC

local_time = datetime.now(local_tz)
utc_time = local_time.astimezone(utc_tz)

Q2: 如何处理日期格式不统一的数据?

import pandas as pd
from last30days import TimeWindow

# 方法1:使用pandas的日期解析
df = pd.read_csv('data.csv', parse_dates=['date'])

# 方法2:手动统一格式
df['date'] = pd.to_datetime(df['date'], format='mixed')

# 方法3:创建日期标准化函数
def normalize_date(date_val):
    """标准化日期格式"""
    if isinstance(date_val, str):
        return pd.to_datetime(date_val)
    elif hasattr(date_val, 'date'):
        return pd.to_datetime(date_val.date())
    else:
        return date_val

df['normalized_date'] = df['date'].apply(normalize_date)

Q3: 如何处理大数据量导致的内存问题?

from last30days import TimeWindow
import pandas as pd

# 方法1:分批处理
tw = TimeWindow(days=30)
batch_size = 50000
results = []

for start_idx in range(0, len(large_df), batch_size):
    end_idx = min(start_idx + batch_size, len(large_df))
    batch = large_df.iloc[start_idx:end_idx]

    filtered_batch = batch[tw.filter_date(batch['date'])]
    results.append(filtered_batch)

final_result = pd.concat(results, ignore_index=True)

# 方法2:使用数据库过滤
# 在数据库层面完成过滤,减少数据传输量
query = f"""
SELECT * FROM large_table 
WHERE date >= '{tw.start_date}' 
  AND date <= '{tw.end_date}'
"""
filtered_data = pd.read_sql(query, connection)

Q4: 如何添加自定义的聚合函数?

from last30days import Aggregator
import pandas as pd

class CustomAggregator(Aggregator):
    """自定义聚合器"""

    def median_absolute_deviation(self, series):
        """计算MAD(绝对中位差)"""
        median = series.median()
        return (series - median).abs().median()

    def percentile_80(self, series):
        """计算80百分位数"""
        return series.quantile(0.8)

    def weighted_average(self, values, weights):
        """计算加权平均"""
        return (values * weights).sum() / weights.sum()

    def custom_aggregation(self, df, group_by, metric):
        """执行自定义聚合"""
        result = df.groupby(group_by).agg({
            metric: [
                ('sum', 'sum'),
                ('mean', 'mean'),
                ('median', 'median'),
                ('std', 'std'),
                ('mad', self.median_absolute_deviation),
                ('p80', self.percentile_80)
            ]
        })
        return result

# 使用自定义聚合器
agg = CustomAggregator(data, timewindow=tw)
result = agg.custom_aggregation(df, 'category', 'sales')

相关项目推荐

如果你对 last30days-skill 感兴趣,以下相关项目也值得一看:

数据处理框架

  • pandas: Python数据分析的基础库,last30days-skill 在其基础上提供了便捷的时间窗口功能。
  • polars: 高性能DataFrame库,特别适合处理大型数据集。
  • dask: 分布式计算库,可以处理超出内存容量的数据。

时间序列分析

  • prophet: Facebook开源的时间序列预测库。
  • statsmodels: 统计建模和时间序列分析工具。
  • tsfresh: 自动提取时间序列特征的库。

可视化工具

  • plotly: 交互式可视化库。
  • streamlit: 快速构建数据应用的框架。
  • dash: 基于Plotly的Python仪表盘框架。

报表生成

  • reportlab: Python PDF报表生成库。
  • weasyprint: HTML/CSS转PDF工具。
  • jinja2: 模板引擎,可用于生成自定义报表。

结语

mvanhorn/last30days-skill 项目以其简洁优雅的设计,为数据分析工作中的时间窗口处理提供了一个高效的解决方案。通过统一的接口、灵活的配置和丰富的内置功能,它大大简化了「最近N天」类型分析的代码编写工作。

无论你是数据分析师、开发工程师,还是业务运营人员,只要涉及到时间窗口相关的数据处理,这个项目都能帮你提升效率、减少错误。希望这篇教程能够帮助你快速上手,并在实际工作中发挥作用。

如果你在使用过程中遇到问题或有好的建议,欢迎访问项目的 GitHub 页面参与讨论和贡献。

项目链接: https://github.com/mvanhorn/last30days-skill

推荐阅读:

  • 项目官方文档
  • pandas 时间序列处理指南
  • Python 数据分析最佳实践

如果内容对您有帮助,欢迎打赏

您的支持是我继续创作的动力

前往打赏页面

评论区

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注