
引言:多文件加载与自定义处理的挑战
在数据分析领域,我们经常需要处理存储在多个文件中的数据集,例如按产品、日期或区域划分的 csv 文件。一个常见的需求是在合并这些文件时,为每行数据添加一个标识其来源的列(例如,哪个产品的数据)。虽然 polars 提供了 pl.read_csv("data_*.csv") 这样的便捷方法来批量读取文件,但它默认不会在结果 dataframe 中包含文件名信息。如果我们需要为每个文件添加一个基于文件名的自定义列(如 product_code),传统的做法可能是逐一读取、添加列、然后合并,但这可能无法充分利用 polars 的性能优势,尤其是在处理大量文件或大型文件时。
本教程将展示如何利用 Polars 的惰性计算(LazyFrame)机制,以一种高效且内存友好的方式解决这一问题,实现并行文件处理并动态添加自定义元数据。
利用 Polars LazyFrame 实现高效加载与自定义处理
Polars 的 scan_csv 函数是解决此问题的关键。它返回一个 LazyFrame,而不是立即加载数据到内存。LazyFrame 允许我们构建一系列数据转换操作,这些操作直到调用 collect() 方法时才会被执行。当处理多个 LazyFrame 并使用 pl.concat 合并它们时,Polars 能够并行地读取和处理这些文件,从而显著提高效率。
核心步骤
- 识别目标文件: 使用 pathlib 模块方便地查找符合特定模式的所有 CSV 文件。
- 创建 LazyFrame 并添加自定义列: 对于每个找到的文件,使用 pl.scan_csv 创建一个 LazyFrame。然后,利用 with_columns 方法添加一个新列,其值来源于当前文件名。
- 合并 LazyFrame 并收集结果: 将所有带有自定义列的 LazyFrame 放入一个列表中,然后使用 pl.concat 将它们合并。最后,调用 collect() 触发计算并获取最终的 DataFrame。
示例代码
假设我们有以下结构的 CSV 文件:
- data_product_1.csv
- data_product_2.csv
- data_product_3.csv
每个文件内容类似: data_product_1.csv:
data,value 2000-01-01,1 2000-01-02,2
我们的目标是得到一个合并后的 DataFrame,其中包含一个名为 product_code 的新列,显示例如 product_1 或 product_2。
首先,确保你已经安装了 Polars:pip install polars
import polars as pl
from pathlib import Path
# 模拟创建示例 CSV 文件
# 在实际应用中,这些文件应已存在
Path("data_product_1.csv").write_text("data,value\n2000-01-01,1\n2000-01-02,2")
Path("data_product_2.csv").write_text("data,value\n2000-01-01,3\n2000-01-02,4")
Path("data_product_3.csv").write_text("data,value\n2000-01-01,4\n2000-01-02,5")
# 1. 查找所有符合模式的 CSV 文件
# Path().glob("data_*.csv") 将返回一个迭代器,包含当前目录下所有匹配的文件路径
file_paths = Path().glob("data_*.csv")
# 2. 为每个文件创建 LazyFrame 并添加自定义列
# 我们希望 product_code 是 'product_1' 而不是 'data_product_1.csv'
lazy_frames = []
for f_path in file_paths:
# 从文件名中提取 'product_X' 部分
# f_path.name 获取文件名,如 'data_product_1.csv'
# .replace(".csv", "") 移除文件扩展名
# .replace("data_", "") 移除前缀 'data_'
product_code = f_path.name.replace(".csv", "").replace("data_", "")
# 创建 LazyFrame 并添加 product_code 列
lf = pl.scan_csv(f_path).with_columns(
pl.lit(product_code).alias("product_code")
)
lazy_frames.append(lf)
# 3. 合并 LazyFrame 并收集结果
# pl.concat 默认会对 LazyFrames 进行并行计算
if lazy_frames: # 确保有文件被找到
df_combined = pl.concat(lazy_frames).collect()
print(df_combined)
else:
print("没有找到匹配的 CSV 文件。")
# 清理模拟文件 (可选)
Path("data_product_1.csv").unlink()
Path("data_product_2.csv").unlink()
Path("data_product_3.csv").unlink()输出结果
执行上述代码后,你将得到一个类似以下结构的 Polars DataFrame:
shape: (6, 3) ┌────────────┬───────┬──────────────┐ │ data ┆ value ┆ product_code │ │ --- ┆ --- ┆ --- │ │ str ┆ i64 ┆ str │ ╞════════════╪═══════╪══════════════╡ │ 2000-01-01 ┆ 1 ┆ product_1 │ │ 2000-01-02 ┆ 2 ┆ product_1 │ │ 2000-01-01 ┆ 3 ┆ product_2 │ │ 2000-01-02 ┆ 4 ┆ product_2 │ │ 2000-01-01 ┆ 4 ┆ product_3 │ │ 2000-01-02 ┆ 5 ┆ product_3 │ └────────────┴───────┴──────────────┘
注意事项与最佳实践
- 惰性求值与并行化: scan_csv 创建 LazyFrame,它只记录操作而不立即执行。pl.concat 在处理 LazyFrame 列表时,能够将每个文件的读取和初步处理并行化,从而显著提升性能。collect() 是触发所有操作执行的最终步骤。
- 内存管理: 对于非常大的数据集,LazyFrame 机制尤其有用,因为它避免了一次性将所有数据加载到内存中,而是按需处理数据块。
- 文件名解析: 示例中使用了 replace() 方法从文件名中提取 product_code。如果文件名模式更复杂,可以考虑使用正则表达式 (re 模块) 进行更灵活的字符串解析。
- 错误处理: 在生产环境中,考虑添加文件不存在或文件损坏时的错误处理机制。
- Schema 一致性: pl.concat 假定所有输入的 DataFrame(或 LazyFrame)具有兼容的列名和数据类型。如果文件结构不一致,可能会导致错误或意外的结果。
- Polars 的发展: Polars 社区活跃,未来可能会直接在 pl.read_csv 或 pl.scan_csv 中添加类似 DuckDB filename=true 的参数,以更简洁的方式实现此功能。但在当前版本中,上述 LazyFrame 方法是推荐的高效解决方案。
总结
通过结合 Polars 的 scan_csv、with_columns 和 concat 方法,我们能够优雅且高效地解决多文件加载、自定义处理和合并的需求。这种基于 LazyFrame 的方法不仅提供了强大的并行处理能力,还优化了内存使用,使其成为处理大规模多文件数据集的理想选择。掌握这一模式将极大地提升您使用 Polars 进行数据处理的效率和灵活性。










