
1. 问题背景与低效实现
在数据分析中,我们经常需要对dataframe中的每一行,根据其特定属性(如商店id)和时间戳,回溯查询满足特定条件的历史数据并进行聚合计算。例如,对于每一笔交易,我们可能需要计算在过去15天内同一商店某种特定商品的出现次数。
传统的实现方式通常涉及使用循环(如iterrows())遍历DataFrame的每一行,然后在循环内部对DataFrame进行过滤和计算。这种方法虽然直观,但对于大型数据集而言,其性能瓶颈非常明显,因为它涉及大量的重复数据筛选操作,且Python循环的效率远低于底层优化的Pandas操作。
考虑以下一个典型的低效实现示例:
import pandas as pd
import numpy as np
from tqdm import tqdm
# 示例数据
data = pd.DataFrame({
'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'
cum_items = []
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")
subset = data[["shop", "execution_date", "item"]]
# 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
subset['is_stuff'] = (subset['item'] == 'stuff').astype(int)
for i, row in tqdm(subset.iterrows(), total=len(subset)):
# 过滤条件:同一商店,且日期严格早于当前日期减去15天
fdf = (subset
.loc[(subset.shop == row["shop"])]
.loc[subset.execution_date < (row["execution_date"] - pd.Timedelta(15, unit='d'))]
)
if len(fdf) == 0:
cum_items.append(np.nan)
else:
cum_items.append(fdf['is_stuff'].sum()) # 计算 'stuff' 的出现次数
data["cum_items_iter"] = cum_items
print("迭代计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'cum_items_iter']].head(10))上述代码通过逐行遍历,为每行构建一个子DataFrame并执行过滤和求和操作。对于包含数万甚至数百万行的数据集,这种方法将导致极长的运行时间。
2. 使用 groupby_rolling 进行向量化优化
为了显著提升性能,我们需要利用Pandas的向量化操作。groupby_rolling是处理此类分组-时间窗口聚合问题的强大工具。它允许我们对DataFrame进行分组,并在每个组内定义一个滑动时间窗口进行计算,从而避免显式循环。
触发式加载精美特效企业网站源码使用jquery实现了很多精美的触发式加载特效,网站首页在随着访客的滚动条滚动过程中会出现很多触发式加载的特殊效果,让这个网站的风格瞬间显得非常的高大上,让你的企业品牌在访客心中留下更深的影响。当然,我们在使用jquery特效的同时也要注意程序对搜索引擎的友好型,所以这一点儿作者也有考虑到,已经尽可能的对js和css脚本进行精简和优化,尽可能的加快网站加载速度,同时也
核心思想:
- 数据预处理: 将需要计数的条件(如item == 'stuff')转换为数值类型(1或0),以便进行求和。
- 排序: 确保数据按分组键和时间列进行排序,这是rolling操作正确定义时间窗口的前提。
- 分组与滚动: 使用groupby()指定分组键,然后使用rolling()定义时间窗口,并指定基于哪一列进行滚动。
- 聚合: 在每个滚动窗口内执行聚合操作(如sum())。
- 结果对齐: 将rolling操作的结果重新对齐到原始DataFrame的索引。
下面是使用groupby_rolling实现上述需求的向量化代码:
import pandas as pd
import numpy as np
# 示例数据(与上文相同)
data = pd.DataFrame({
'shop': [1, 2, 1, 2, 1, 2, 1, 2, 1, 2] * 3,
'execution_date': pd.to_datetime(pd.date_range(start='2023-11-01', periods=30, freq='D')),
'item': np.random.choice(['stuff', 'other', 'another'], size=30)
})
data.loc[data.index % 3 == 0, 'item'] = 'stuff'
data.loc[data.index % 7 == 0, 'item'] = 'stuff'
# 1. 确保日期列为datetime类型
data.execution_date = pd.to_datetime(data.execution_date, errors="coerce")
# 2. 预处理:将需要计数的条件转换为数值(1或0)
# 假设我们要计算 'item' 列中 'stuff' 的出现次数
data['is_stuff'] = (data['item'] == 'stuff').astype(int)
# 3. 排序:按商店和日期排序,这对于滚动操作至关重要
# 注意:groupby_rolling 可能会打乱原始顺序,因此在计算前排序并保留原始索引是好习惯
subset_sorted = data.sort_values(['shop', 'execution_date']).copy()
# 4. 使用 groupby_rolling 进行向量化计算
# '15D' 定义了一个15天的窗口,包括当前日期并向前追溯15天
# on='execution_date' 指定了滚动基于的日期列
data['cum_items_vec'] = (subset_sorted.groupby('shop')
.rolling('15D', on='execution_date')['is_stuff']
.sum()
.reset_index(level=0, drop=True) # 移除groupby引入的shop级别索引
.reindex(data.index) # 将结果重新对齐到原始DataFrame的索引
)
# 由于滚动窗口默认包含当前点,如果需要排除当前点,可以进一步处理
# 例如,如果'is_stuff'在当前日期为1,则减去它
# data['cum_items_vec'] = data['cum_items_vec'] - data['is_stuff']
print("\n向量化计算结果 (部分):")
print(data[['shop', 'execution_date', 'item', 'is_stuff', 'cum_items_vec']].head(10))
# 比较迭代和向量化结果(如果窗口定义一致)
# 注意:本例中,迭代方法和向量化方法对时间窗口的定义略有不同
# 迭代方法:D' < D - 15天 (严格早于15天前)
# 向量化方法:[D - 15天, D] (过去15天,包含当前日期)
# 因此,结果通常不会完全相同,但向量化方法解决了“过去N天”的常见需求关于时间窗口的说明: 原问题中迭代方法的条件是 subset.execution_date
3. 关键注意事项
- 数据类型: 确保用于定义时间窗口的列是Pandas的datetime类型。如果不是,需要使用pd.to_datetime()进行转换。
- 数据排序: 在应用groupby_rolling之前,务必根据分组键和时间列对DataFrame进行排序。rolling操作依赖于数据的顺序来正确地构建时间窗口。
- 窗口定义: 理解rolling()函数中时间偏移字符串(如'15D')的含义。它定义了一个以当前点为终点的固定长度时间窗口。例如,'15D'表示从当前日期开始,向前追溯15天的窗口,包括当前日期。
- 结果对齐: groupby_rolling操作会生成一个带有MultiIndex的Series。为了将结果添加回原始DataFrame,需要使用reset_index(level=0, drop=True)移除分组键作为索引级别,然后使用reindex(data.index)将结果重新对齐到原始DataFrame的索引。
- 条件计数转换为数值: 如果需要对特定条件(如字符串值)进行计数,应先将该条件转换为布尔值(True/False),再转换为整数(1/0),然后进行sum()操作。
4. 总结
通过将低效的iterrows()循环替换为Pandas的groupby_rolling函数,我们能够实现对DataFrame中条件滚动累加计算的显著性能提升。这种向量化方法不仅代码更简洁,而且利用了Pandas底层C语言优化,极大地加快了大数据集的处理速度。掌握groupby_rolling是进行高效时间序列数据分析的关键技能之一。









