别再手动筛选日期了!这个开源项目让「最近30天」数据分析效率提升10倍
为什么这个项目值得关注
在日常的数据分析工作中,「最近30天」几乎是最常见的时间窗口。无论是运营报表、销售统计,还是用户行为分析,我们总是需要频繁地筛选和计算最近一个月的数据。然而,你是否注意到这样一个现象:每次写SQL或者Python代码时,我们都要重复编写类似 WHERE date >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) 这样的日期过滤逻辑?
这种重复性工作不仅枯燥,还容易出错。不同的数据源可能有不同的日期格式,不同的业务场景可能需要灵活调整时间窗口,而且一旦需求变化,修改起来也相当麻烦。
mvanhorn/last30days-skill 正是为了解决这一痛点而生。这个开源项目提供了一个通用的时间窗口处理框架,让你能够以一种优雅、灵活的方式处理「最近N天」类型的数据分析需求。无论是30天、7天还是90天,只需要简单配置,就能轻松切换。
这个项目能解决什么问题
想象一下这样的场景:运营团队需要一份日报,展示过去7天的用户活跃数据;销售总监要求一份周报,统计过去30天的订单情况;产品经理想要一份月度报告,分析过去90天的功能使用情况。如果每次都要重新编写日期过滤逻辑,那将是一场噩梦。
last30days-skill 的核心价值在于:
统一的接口设计:无论数据来源是MySQL、PostgreSQL,还是CSV文件、API接口,都使用相同的调用方式。
灵活的时间窗口:不只是30天,你可以轻松设置为任意天数,还能支持工作日、自然日等不同计算方式。
智能的日期处理:自动处理时区问题、日期格式差异,还能识别相对日期和绝对日期。
开箱即用的模板:内置多种常见场景的模板,让你从繁琐的日期处理中解放出来。
环境搭建
系统要求
在开始之前,确保你的开发环境满足以下要求:
- Python 3.8 或更高版本
- pip 包管理器
- 稳定的网络连接(用于下载依赖)
安装步骤
第一步:创建虚拟环境(推荐)
为项目创建一个独立的Python环境是一个好习惯,这样可以避免依赖冲突:
# 创建名为 last30days 的虚拟环境
python -m venv last30days_env
# 激活虚拟环境
# 在 Windows 系统上:
last30days_env\Scripts\activate
# 在 macOS 或 Linux 系统上:
source last30days_env/bin/activate
第二步:安装项目依赖
项目依赖可以通过pip直接安装。如果你想要安装最新版本:
pip install last30days-skill
如果你希望安装特定版本以保持稳定性:
pip install last30days-skill==1.2.0
安装完成后,验证一下是否成功:
python -c "import last30days; print(last30days.__version__)"
第三步:配置数据源(可选)
如果你的数据存储在数据库中,需要进行相应的配置。项目支持多种数据库,配置文件通常命名为 config.yaml:
database:
type: mysql
host: localhost
port: 3306
user: your_username
password: your_password
database: your_database
date_settings:
default_window: 30
timezone: Asia/Shanghai
format: "%Y-%m-%d"
核心功能详解
1. 时间窗口配置器(TimeWindow)
这是项目最核心的组件,用于定义和管理时间范围。TimeWindow 类提供了丰富的配置选项:
基本用法
from last30days import TimeWindow
# 创建一个最近30天的时间窗口(默认值)
tw = TimeWindow()
print(tw.start_date) # 输出:30天前的日期
print(tw.end_date) # 输出:今天
# 创建最近7天的时间窗口
tw_7days = TimeWindow(days=7)
print(f"最近7天:{tw_7days.start_date} 至 {tw_7days.end_date}")
# 创建最近90天的时间窗口
tw_90days = TimeWindow(days=90)
print(f"最近90天:{tw_90days.start_date} 至 {tw_90days.end_date}")
指定具体日期范围
from datetime import datetime
from last30days import TimeWindow
# 指定开始和结束日期
tw_custom = TimeWindow(
start_date="2024-01-01",
end_date="2024-01-31"
)
print(f"自定义范围:{tw_custom.start_date} 至 {tw_custom.end_date}")
使用相对日期表达式
from last30days import TimeWindow
# 上个月第一天至今
tw_last_month = TimeWindow(
start_date="first_day_of_last_month",
relative=True
)
# 本季度第一天至今
tw_this_quarter = TimeWindow(
start_date="first_day_of_current_quarter",
relative=True
)
# 指定工作日数(自动排除周末)
tw_workdays = TimeWindow(
days=30,
workdays_only=True # 只计算工作日
)
2. 数据过滤器(DateFilter)
DateFilter 类负责将时间窗口应用到实际数据上,支持多种数据源和格式。
SQL查询中的应用
from last30days import TimeWindow, DateFilter
# 创建时间窗口
tw = TimeWindow(days=30)
# 创建过滤器
df = DateFilter(timewindow=tw)
# 生成SQL条件
sql_condition = df.as_sql_condition(
date_column="order_date",
dialect="mysql"
)
print(sql_condition)
# 输出: order_date >= '2024-01-01' AND order_date <= '2024-01-31'
完整的SQL查询示例
from last30days import TimeWindow, DateFilter
tw = TimeWindow(days=30)
df = DateFilter(timewindow=tw)
date_condition = df.as_sql_condition("created_at", dialect="postgresql")
query = f"""
SELECT
product_id,
COUNT(*) as order_count,
SUM(amount) as total_amount
FROM orders
WHERE {date_condition}
GROUP BY product_id
ORDER BY total_amount DESC
"""
print(query)
DataFrame过滤
import pandas as pd
from last30days import TimeWindow, DateFilter
# 假设我们有一个包含日期列的DataFrame
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=60),
'value': range(60)
})
# 创建时间窗口
tw = TimeWindow(days=30)
# 过滤DataFrame
filtered_df = df[tw.filter_date(df['date'])]
print(f"原始数据行数: {len(df)}")
print(f"过滤后行数: {len(filtered_df)}")
3. 聚合计算器(Aggregator)
在获取到时间窗口内的数据后,往往需要进行各种聚合计算。Aggregator 类提供了常用的聚合函数封装。
基本聚合操作
from last30days import TimeWindow, Aggregator
import pandas as pd
# 创建示例数据
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=60),
'category': ['A', 'B'] * 30,
'sales': [100 + i for i in range(60)]
})
# 创建时间窗口
tw = TimeWindow(days=30)
# 使用聚合器
agg = Aggregator(df, timewindow=tw)
# 按天聚合
daily_result = agg.aggregate_by_day('sales')
print("按天聚合结果:")
print(daily_result)
# 按周聚合
weekly_result = agg.aggregate_by_week('sales')
print("\n按周聚合结果:")
print(weekly_result)
# 按月聚合
monthly_result = agg.aggregate_by_month('sales')
print("\n按月聚合结果:")
print(monthly_result)
自定义聚合函数
from last30days import TimeWindow, Aggregator
import pandas as pd
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=60),
'category': ['A', 'B'] * 30,
'sales': [100 + i for i in range(60)],
'quantity': [1 + i % 5 for i in range(60)]
})
tw = TimeWindow(days=30)
agg = Aggregator(df, timewindow=tw)
# 自定义聚合:同时计算销售额和销量
result = agg.aggregate(
group_by='category',
metrics={
'sales': ['sum', 'mean', 'max'],
'quantity': ['sum', 'mean']
}
)
print(result)
4. 报表生成器(ReportBuilder)
ReportBuilder 类可以将分析结果快速生成为格式化的报表,支持多种输出格式。
生成文本报表
from last30days import TimeWindow, ReportBuilder
import pandas as pd
# 准备数据
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=30),
'revenue': [1000 + i * 50 for i in range(30)]
})
tw = TimeWindow(days=30)
report = ReportBuilder(df, timewindow=tw)
# 生成文本格式报表
text_report = report.build_text_report(
title="月度营收报表",
metrics=['revenue'],
include_summary=True
)
print(text_report)
生成JSON格式报表
from last30days import TimeWindow, ReportBuilder
import json
# 继续使用上面的数据...
json_report = report.build_json_report(
title="月度营收报表",
metrics=['revenue']
)
print(json.dumps(json_report, indent=2, ensure_ascii=False))
生成HTML报表
from last30days import TimeWindow, ReportBuilder
html_report = report.build_html_report(
title="月度营收报表",
metrics=['revenue'],
include_chart=True
)
# 保存为HTML文件
with open('report.html', 'w', encoding='utf-8') as f:
f.write(html_report)
实战教程:从需求到实现
场景一:电商订单数据分析
让我们通过一个完整的实战案例来学习如何使用这个项目。假设你是电商平台的运营分析师,需要生成一份「最近30天」的销售数据分析报告。
第一步:准备数据环境
import pandas as pd
from datetime import datetime, timedelta
from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder
# 模拟电商订单数据
def generate_sample_orders():
"""生成模拟订单数据"""
dates = pd.date_range(
start=datetime.now() - timedelta(days=60),
end=datetime.now(),
freq='H'
)
orders = []
for i, date in enumerate(dates):
if i % 10 != 0: # 模拟部分时间点没有订单
orders.append({
'order_id': f'ORD{date.strftime("%Y%m%d")}{i:04d}',
'order_date': date,
'customer_id': f'CUST{(i % 100):03d}',
'product_id': f'PROD{(i % 50):03d}',
'category': ['电子产品', '服装', '食品', '家居'][i % 4],
'quantity': 1 + (i % 3),
'unit_price': 50 + (i % 100),
'discount': 0 if i % 5 != 0 else 10,
'status': ['已完成', '已发货', '处理中'][i % 3]
})
return pd.DataFrame(orders)
# 生成数据
orders_df = generate_sample_orders()
print(f"生成订单数据: {len(orders_df)} 条记录")
print(orders_df.head())
运行结果示例:
生成订单数据: 1296 条记录
order_id order_date customer_id product_id category quantity \
0 ORD20240101 2024-01-01 00:00:00 CUST000 PROD000 电子产品 1
1 ORD20240101 2024-01-01 01:00:00 CUST010 PROD010 食品 3
2 ORD20240101 2024-01-01 02:00:00 CUST020 PROD020 服装 2
...
第二步:创建时间窗口并过滤数据
# 创建最近30天的时间窗口
tw = TimeWindow(days=30)
# 使用DateFilter过滤数据
df_filtered = orders_df[tw.filter_date(orders_df['order_date'])]
print(f"时间窗口: {tw.start_date} 至 {tw.end_date}")
print(f"过滤后订单数: {len(df_filtered)} 条")
print(f"过滤后订单占比: {len(df_filtered) / len(orders_df) * 100:.1f}%")
第三步:计算关键指标
# 计算订单总金额
df_filtered = df_filtered.copy()
df_filtered['total_amount'] = (
df_filtered['quantity'] * df_filtered['unit_price'] - df_filtered['discount']
)
# 基本统计
print("=" * 50)
print("最近30天销售概况")
print("=" * 50)
print(f"总订单数: {len(df_filtered)}")
print(f"总销售额: ¥{df_filtered['total_amount'].sum():,.2f}")
print(f"平均客单价: ¥{df_filtered['total_amount'].mean():,.2f}")
print(f"总销售数量: {df_filtered['quantity'].sum()}")
print(f"去重客户数: {df_filtered['customer_id'].nunique()}")
第四步:按维度分析
# 按类目分析
category_analysis = df_filtered.groupby('category').agg({
'order_id': 'count',
'total_amount': 'sum',
'quantity': 'sum'
}).rename(columns={
'order_id': 'order_count',
'total_amount': 'revenue',
'quantity': 'total_quantity'
})
category_analysis['avg_order_value'] = (
category_analysis['revenue'] / category_analysis['order_count']
)
print("\n按类目销售分析:")
print("-" * 50)
print(category_analysis.sort_values('revenue', ascending=False))
第五步:生成日报表
# 使用ReportBuilder生成完整报表
report = ReportBuilder(df_filtered, timewindow=tw)
# 构建分析结果字典
analysis_result = {
'report_title': '电商平台最近30天销售分析报告',
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'time_window': {
'start': str(tw.start_date.date()),
'end': str(tw.end_date.date()),
'days': 30
},
'summary': {
'total_orders': len(df_filtered),
'total_revenue': float(df_filtered['total_amount'].sum()),
'avg_order_value': float(df_filtered['total_amount'].mean()),
'unique_customers': df_filtered['customer_id'].nunique()
},
'category_breakdown': category_analysis.to_dict('index')
}
# 生成JSON格式报告
json_report = report.build_json_report(
title=analysis_result['report_title'],
data=analysis_result
)
print("\n" + "=" * 50)
print("生成的报表内容:")
print("=" * 50)
import json
print(json.dumps(json_report, indent=2, ensure_ascii=False))
场景二:用户行为日志分析
接下来让我们看另一个常见场景:分析用户最近30天的行为日志。
第一步:准备日志数据
import random
from datetime import datetime, timedelta
def generate_user_logs(num_logs=5000):
"""生成模拟用户行为日志"""
behaviors = ['浏览', '点击', '收藏', '加入购物车', '下单', '评价']
pages = ['首页', '商品列表', '商品详情', '购物车', '个人中心', '搜索页']
logs = []
base_time = datetime.now() - timedelta(days=60)
for i in range(num_logs):
log_time = base_time + timedelta(
minutes=random.randint(0, 60*24*60)
)
logs.append({
'log_id': f'LOG{i:08d}',
'user_id': f'USER{random.randint(1, 500):04d}',
'behavior': random.choice(behaviors),
'page': random.choice(pages),
'timestamp': log_time,
'duration_seconds': random.randint(5, 300) if random.random() > 0.3 else None,
'device': random.choice(['iOS', 'Android', 'Web'])
})
return pd.DataFrame(logs)
# 生成日志数据
logs_df = generate_user_logs()
print(f"生成用户日志: {len(logs_df)} 条记录")
第二步:按时间段分析用户行为
from last30days import TimeWindow, Aggregator
# 创建时间窗口
tw = TimeWindow(days=30)
# 过滤日志
filtered_logs = logs_df[tw.filter_date(logs_df['timestamp'])].copy()
# 按小时统计
filtered_logs['hour'] = filtered_logs['timestamp'].dt.hour
hourly_stats = filtered_logs.groupby('hour').size()
print("最近30天各时段用户活跃度:")
print("-" * 40)
for hour, count in hourly_stats.items():
bar = '█' * (count // 50)
print(f"{hour:02d}:00 - {count:4d} {bar}")
第三步:行为漏斗分析
def funnel_analysis(df, behaviors):
"""简单的漏斗分析"""
results = {}
total_users = df['user_id'].nunique()
for behavior in behaviors:
users_with_behavior = df[df['behavior'] == behavior]['user_id'].nunique()
conversion_rate = (users_with_behavior / total_users) * 100 if total_users > 0 else 0
results[behavior] = {
'user_count': users_with_behavior,
'conversion_rate': conversion_rate
}
return results
# 定义漏斗步骤
funnel_steps = ['浏览', '点击', '加入购物车', '下单']
print("\n用户行为漏斗:")
print("=" * 50)
funnel_result = funnel_analysis(filtered_logs, funnel_steps)
for i, (step, data) in enumerate(funnel_result.items()):
bar_length = int(data['conversion_rate'] / 2)
bar = '█' * bar_length
print(f"{step:10s} | {data['user_count']:4d} 用户 | {data['conversion_rate']:5.1f}% {bar}")
if i > 0:
prev_step = funnel_steps[i - 1]
prev_rate = funnel_result[prev_step]['conversion_rate']
drop_off = prev_rate - data['conversion_rate']
print(f" ↓ 流失 {drop_off:.1f}%")
第四步:生成行为分析报告
from last30days import ReportBuilder
# 计算各项指标
behavior_summary = filtered_logs.groupby('behavior').agg({
'log_id': 'count',
'user_id': 'nunique'
}).rename(columns={
'log_id': 'total_count',
'user_id': 'unique_users'
})
behavior_summary['percentage'] = (
behavior_summary['total_count'] / behavior_summary['total_count'].sum() * 100
)
print("\n用户行为分布:")
print("-" * 50)
print(behavior_summary.sort_values('total_count', ascending=False))
# 设备分布
device_stats = filtered_logs.groupby('device').agg({
'log_id': 'count',
'user_id': 'nunique'
}).rename(columns={
'log_id': 'total_count',
'user_id': 'unique_users'
})
print("\n设备分布:")
print("-" * 50)
print(device_stats)
场景三:财务数据月度对比
财务分析是另一个常见场景,让我们看看如何使用这个项目进行月度财务数据对比。
第一步:创建财务模拟数据
import random
def generate_financial_data():
"""生成模拟财务数据"""
records = []
base_date = datetime.now() - timedelta(days=90)
categories = {
'收入': ['商品销售', '服务收入', '会员订阅', '广告收入'],
'支出': ['采购成本', '运营费用', '人力成本', '营销费用', '租金水电']
}
for i in range(200):
record_date = base_date + timedelta(days=random.randint(0, 90))
is_income = random.random() > 0.4
category_type = '收入' if is_income else '支出'
records.append({
'transaction_id': f'TXN{record_date.strftime("%Y%m")}{i:04d}',
'date': record_date,
'type': category_type,
'category': random.choice(categories[category_type]),
'amount': abs(int(random.gauss(10000, 5000))),
'description': f'业务交易 {i+1}'
})
return pd.DataFrame(records)
# 生成财务数据
financial_df = generate_financial_data()
print("生成财务数据:")
print(financial_df.head(10))
第二步:计算各月汇总
from last30days import TimeWindow
# 创建两个时间窗口进行对比
tw_current = TimeWindow(days=30) # 本月
tw_last = TimeWindow(days=60, end_days_ago=30) # 上月
# 过滤数据
current_month = financial_df[tw_current.filter_date(financial_df['date'])].copy()
last_month = financial_df[tw_last.filter_date(financial_df['date'])].copy()
print(f"本月数据: {len(current_month)} 条")
print(f"上月数据: {len(last_month)} 条")
第三步:计算环比数据
def calculate_summary(df):
"""计算财务汇总"""
income = df[df['type'] == '收入']['amount'].sum()
expense = df[df['type'] == '支出']['amount'].sum()
profit = income - expense
return {
'income': income,
'expense': expense,
'profit': profit,
'profit_rate': (profit / income * 100) if income > 0 else 0
}
current_summary = calculate_summary(current_month)
last_summary = calculate_summary(last_month)
print("\n" + "=" * 60)
print("月度财务对比报告")
print("=" * 60)
print(f"{'指标':<15} {'本月':>15} {'上月':>15} {'环比':>15}")
print("-" * 60)
for metric in ['income', 'expense', 'profit', 'profit_rate']:
current_val = current_summary[metric]
last_val = last_summary[metric]
if metric == 'profit_rate':
current_str = f"{current_val:.2f}%"
last_str = f"{last_val:.2f}%"
change = current_val - last_val
change_str = f"{change:+.2f}%"
else:
current_str = f"¥{current_val:,.0f}"
last_str = f"¥{last_val:,.0f}"
if last_val > 0:
change = (current_val - last_val) / last_val * 100
change_str = f"{change:+.1f}%"
else:
change_str = "N/A"
print(f"{metric:<15} {current_str:>15} {last_str:>15} {change_str:>15}")
常见使用场景
场景一:自动化定时报表
你可以将 last30days-skill 集成到定时任务中,自动生成周期性报表:
import schedule
import time
from datetime import datetime
from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder
def generate_daily_report():
"""每日定时任务:生成销售日报"""
print(f"[{datetime.now()}] 开始生成日报...")
# 假设这是从数据库获取的数据
# sales_data = fetch_sales_from_db()
tw = TimeWindow(days=7) # 最近7天
report = ReportBuilder(sales_data, timewindow=tw)
# 生成HTML报告
html_content = report.build_html_report(title="每日销售报表")
# 保存报告
filename = f"daily_report_{datetime.now().strftime('%Y%m%d')}.html"
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
print(f"[{datetime.now()}] 日报已生成: {filename}")
def generate_weekly_report():
"""每周定时任务:生成销售周报"""
print(f"[{datetime.now()}] 开始生成周报...")
tw = TimeWindow(days=30) # 最近30天
report = ReportBuilder(sales_data, timewindow=tw)
# 生成JSON格式的周报
json_content = report.build_json_report(title="每周销售报表")
filename = f"weekly_report_{datetime.now().strftime('%Y%m%d')}.json"
with open(filename, 'w', encoding='utf-8') as f:
import json
json.dump(json_content, f, ensure_ascii=False, indent=2)
print(f"[{datetime.now()}] 周报已生成: {filename}")
# 设置定时任务
schedule.every().day.at("08:00").do(generate_daily_report)
schedule.every().monday.at("09:00").do(generate_weekly_report)
# 运行调度器
while True:
schedule.run_pending()
time.sleep(60)
场景二:多数据源联合分析
当你的数据分布在多个数据源时,可以使用 DataSource 统一管理:
from last30days import TimeWindow, DataSource
# 创建数据源配置
sources = {
'orders': DataSource(
type='mysql',
connection_params={
'host': 'localhost',
'database': 'ecommerce',
'query': 'SELECT * FROM orders WHERE order_date >= {start_date}'
}
),
'customers': DataSource(
type='mysql',
connection_params={
'host': 'localhost',
'database': 'ecommerce',
'query': 'SELECT * FROM customers WHERE created_date >= {start_date}'
}
),
'logs': DataSource(
type='csv',
file_path='./data/user_logs.csv'
)
}
# 创建时间窗口
tw = TimeWindow(days=30)
# 加载所有数据
all_data = {}
for name, source in sources.items():
all_data[name] = source.load(timewindow=tw)
print(f"已加载 {name}: {len(all_data[name])} 条记录")
# 合并分析
combined_analysis = merge_and_analyze(all_data)
print("多数据源联合分析完成!")
场景三:自定义日期过滤器
对于特殊需求,你可以继承 DateFilter 类创建自定义过滤器:
from last30days import DateFilter, TimeWindow
class BusinessDayFilter(DateFilter):
"""排除法定节假日的过滤器"""
def __init__(self, timewindow, holidays=None):
super().__init__(timewindow)
self.holidays = holidays or []
def filter(self, df, date_column):
"""过滤出业务日(非周末且非节假日)"""
import pandas as pd
filtered = df[
(df[date_column].dt.dayofweek < 5) & # 排除周末
(~df[date_column].dt.date.isin(self.holidays)) # 排除节假日
]
# 进一步应用时间窗口
return filtered[
(filtered[date_column] >= self.timewindow.start_date) &
(filtered[date_column] <= self.timewindow.end_date)
]
# 使用自定义过滤器
tw = TimeWindow(days=30)
custom_filter = BusinessDayFilter(
tw,
holidays=['2024-01-01', '2024-02-10', '2024-05-01']
)
business_day_data = custom_filter.filter(data_df, 'date_column')
print(f"业务日数据量: {len(business_day_data)}")
技巧与最佳实践
性能优化技巧
1. 避免在循环中创建时间窗口
# 低效的做法
for day in date_range:
tw = TimeWindow(days=30) # 每次都创建新实例
result = process_data(tw, day)
# 高效的做法
tw = TimeWindow(days=30) # 一次性创建
for day in date_range:
result = process_data(tw, day) # 复用实例
2. 使用向量化操作而非逐行处理
# 低效的做法
for index, row in df.iterrows():
if tw.contains(row['date']):
filtered.append(row)
# 高效的做法
filtered_df = df[tw.filter_date(df['date'])]
3. 对大数据集使用分块处理
from last30days import TimeWindow, DateFilter
tw = TimeWindow(days=30)
# 分块读取大型CSV文件
chunk_size = 10000
filtered_chunks = []
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
filtered_chunk = chunk[tw.filter_date(chunk['date'])]
if len(filtered_chunk) > 0:
filtered_chunks.append(filtered_chunk)
# 合并结果
result = pd.concat(filtered_chunks, ignore_index=True)
代码组织最佳实践
1. 封装为可复用的分析模块
# analytics_module.py
from last30days import TimeWindow, DateFilter, Aggregator, ReportBuilder
class SalesAnalyzer:
"""销售数据分析器"""
def __init__(self, data, timewindow=None):
self.data = data
self.timewindow = timewindow or TimeWindow(days=30)
self.filtered_data = None
def filter_by_timewindow(self):
"""按时间窗口过滤数据"""
self.filtered_data = self.data[
self.timewindow.filter_date(self.data['date'])
].copy()
return self
def calculate_kpis(self):
"""计算关键绩效指标"""
if self.filtered_data is None:
self.filter_by_timewindow()
return {
'total_revenue': self.filtered_data['amount'].sum(),
'order_count': len(self.filtered_data),
'avg_order_value': self.filtered_data['amount'].mean(),
'unique_customers': self.filtered_data['customer_id'].nunique()
}
def generate_report(self, format='json'):
"""生成报告"""
report = ReportBuilder(self.filtered_data, timewindow=self.timewindow)
return report.build_json_report(title="销售分析报告")
2. 使用配置文件管理参数
# config/report_config.yaml
reports:
sales:
default_window: 30
metrics:
- total_revenue
- order_count
- avg_order_value
group_by:
- category
- region
output_format: html
user_activity:
default_window: 7
metrics:
- active_users
- session_count
- avg_duration
group_by:
- device
- platform
output_format: json
# 使用配置
import yaml
with open('config/report_config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 创建分析器
from last30days import TimeWindow
sales_config = config['reports']['sales']
analyzer = SalesAnalyzer(
data=sales_data,
timewindow=TimeWindow(days=sales_config['default_window'])
)
错误处理与调试
1. 优雅处理日期解析错误
from last30days import TimeWindow, DateFilter
from datetime import datetime
def safe_date_parse(date_string):
"""安全的日期解析"""
formats = [
'%Y-%m-%d',
'%Y/%m/%d',
'%Y%m%d',
'%d-%m-%Y',
'%m/%d/%Y'
]
for fmt in formats:
try:
return datetime.strptime(date_string, fmt)
except ValueError:
continue
raise ValueError(f"无法解析日期: {date_string}")
# 使用安全的解析函数
df['parsed_date'] = df['date_string'].apply(safe_date_parse)
2. 调试时间窗口配置
from last30days import TimeWindow
# 调试模式:打印详细信息
class DebugTimeWindow(TimeWindow):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
print("=" * 50)
print("TimeWindow 配置信息:")
print(f" 类型: {self.__class__.__name__}")
print(f" 开始日期: {self.start_date}")
print(f" 结束日期: {self.end_date}")
print(f" 天数: {(self.end_date - self.start_date).days}")
print("=" * 50)
# 使用调试类
tw = DebugTimeWindow(days=30)
进阶功能
自定义时间窗口类型
除了内置的固定天数窗口,项目还支持创建自定义时间窗口:
from last30days import TimeWindow
from datetime import datetime, timedelta
class FiscalYearWindow(TimeWindow):
"""财年窗口(假设财年从4月开始)"""
def __init__(self, year=None):
if year is None:
year = datetime.now().year
# 财年从4月1日开始
if datetime.now().month < 4:
start_date = datetime(year - 1, 4, 1)
else:
start_date = datetime(year, 4, 1)
end_date = start_date + timedelta(days=365) - timedelta(days=1)
super().__init__(start_date=start_date, end_date=end_date)
# 使用财年窗口
fy_window = FiscalYearWindow(2024)
print(f"2024财年: {fy_window.start_date} 至 {fy_window.end_date}")
动态时间窗口表达式
from last30days import TimeWindow, parse_relative_date
# 支持的自然语言表达式
expressions = [
"最近7天",
"最近30天",
"上个月",
"本月",
"最近一个季度",
"最近一年"
]
for expr in expressions:
tw = parse_relative_date(expr)
print(f"{expr}: {tw.start_date} 至 {tw.end_date}")
数据可视化集成
import matplotlib.pyplot as plt
from last30days import TimeWindow, Aggregator
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False
tw = TimeWindow(days=30)
agg = Aggregator(filtered_data, timewindow=tw)
# 获取日度数据
daily_data = agg.aggregate_by_day('sales')
# 绘制趋势图
fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(daily_data['date'], daily_data['sales'], marker='o', linewidth=2)
ax.fill_between(daily_data['date'], daily_data['sales'], alpha=0.3)
ax.set_title('最近30天销售趋势', fontsize=16)
ax.set_xlabel('日期', fontsize=12)
ax.set_ylabel('销售额', fontsize=12)
ax.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('sales_trend.png', dpi=150)
plt.show()
常见问题解答
Q1: 如何处理不同时区的数据?
from last30days import TimeWindow
from datetime import datetime
import pytz
# 创建时间窗口时指定时区
tw = TimeWindow(
days=30,
timezone='Asia/Shanghai' # 或 'UTC', 'America/New_York' 等
)
# 时区转换
local_tz = pytz.timezone('Asia/Shanghai')
utc_tz = pytz.UTC
local_time = datetime.now(local_tz)
utc_time = local_time.astimezone(utc_tz)
Q2: 如何处理日期格式不统一的数据?
import pandas as pd
from last30days import TimeWindow
# 方法1:使用pandas的日期解析
df = pd.read_csv('data.csv', parse_dates=['date'])
# 方法2:手动统一格式
df['date'] = pd.to_datetime(df['date'], format='mixed')
# 方法3:创建日期标准化函数
def normalize_date(date_val):
"""标准化日期格式"""
if isinstance(date_val, str):
return pd.to_datetime(date_val)
elif hasattr(date_val, 'date'):
return pd.to_datetime(date_val.date())
else:
return date_val
df['normalized_date'] = df['date'].apply(normalize_date)
Q3: 如何处理大数据量导致的内存问题?
from last30days import TimeWindow
import pandas as pd
# 方法1:分批处理
tw = TimeWindow(days=30)
batch_size = 50000
results = []
for start_idx in range(0, len(large_df), batch_size):
end_idx = min(start_idx + batch_size, len(large_df))
batch = large_df.iloc[start_idx:end_idx]
filtered_batch = batch[tw.filter_date(batch['date'])]
results.append(filtered_batch)
final_result = pd.concat(results, ignore_index=True)
# 方法2:使用数据库过滤
# 在数据库层面完成过滤,减少数据传输量
query = f"""
SELECT * FROM large_table
WHERE date >= '{tw.start_date}'
AND date <= '{tw.end_date}'
"""
filtered_data = pd.read_sql(query, connection)
Q4: 如何添加自定义的聚合函数?
from last30days import Aggregator
import pandas as pd
class CustomAggregator(Aggregator):
"""自定义聚合器"""
def median_absolute_deviation(self, series):
"""计算MAD(绝对中位差)"""
median = series.median()
return (series - median).abs().median()
def percentile_80(self, series):
"""计算80百分位数"""
return series.quantile(0.8)
def weighted_average(self, values, weights):
"""计算加权平均"""
return (values * weights).sum() / weights.sum()
def custom_aggregation(self, df, group_by, metric):
"""执行自定义聚合"""
result = df.groupby(group_by).agg({
metric: [
('sum', 'sum'),
('mean', 'mean'),
('median', 'median'),
('std', 'std'),
('mad', self.median_absolute_deviation),
('p80', self.percentile_80)
]
})
return result
# 使用自定义聚合器
agg = CustomAggregator(data, timewindow=tw)
result = agg.custom_aggregation(df, 'category', 'sales')
相关项目推荐
如果你对 last30days-skill 感兴趣,以下相关项目也值得一看:
数据处理框架
- pandas: Python数据分析的基础库,
last30days-skill在其基础上提供了便捷的时间窗口功能。 - polars: 高性能DataFrame库,特别适合处理大型数据集。
- dask: 分布式计算库,可以处理超出内存容量的数据。
时间序列分析
- prophet: Facebook开源的时间序列预测库。
- statsmodels: 统计建模和时间序列分析工具。
- tsfresh: 自动提取时间序列特征的库。
可视化工具
- plotly: 交互式可视化库。
- streamlit: 快速构建数据应用的框架。
- dash: 基于Plotly的Python仪表盘框架。
报表生成
- reportlab: Python PDF报表生成库。
- weasyprint: HTML/CSS转PDF工具。
- jinja2: 模板引擎,可用于生成自定义报表。
结语
mvanhorn/last30days-skill 项目以其简洁优雅的设计,为数据分析工作中的时间窗口处理提供了一个高效的解决方案。通过统一的接口、灵活的配置和丰富的内置功能,它大大简化了「最近N天」类型分析的代码编写工作。
无论你是数据分析师、开发工程师,还是业务运营人员,只要涉及到时间窗口相关的数据处理,这个项目都能帮你提升效率、减少错误。希望这篇教程能够帮助你快速上手,并在实际工作中发挥作用。
如果你在使用过程中遇到问题或有好的建议,欢迎访问项目的 GitHub 页面参与讨论和贡献。
项目链接: https://github.com/mvanhorn/last30days-skill
推荐阅读:
- 项目官方文档
- pandas 时间序列处理指南
- Python 数据分析最佳实践
评论区