0

0

Pandas数据处理:高效筛选重复记录并保留指定数量的最新数据

心靈之曲

心靈之曲

发布时间:2025-09-05 15:57:08

|

670人浏览过

|

来源于php中文网

原创

Pandas数据处理:高效筛选重复记录并保留指定数量的最新数据

本教程旨在指导用户如何高效地从数据集中筛选重复记录,并为每个重复组保留指定数量(例如最后N条)的数据。我们将重点介绍Pandas中简洁高效的groupby().tail()方法,并与PySpark中基于窗口函数的方法进行对比,通过详细代码示例和最佳实践,帮助读者优化数据清洗流程。

问题场景描述

在数据清洗和预处理过程中,我们经常会遇到包含重复记录的数据集。这些重复记录可能基于一个或多个列的组合,但我们往往需要为每个重复组保留特定数量的记录,例如,只保留每个重复组中最新的n条记录。例如,在一个包含用户活动记录的dataframe中,我们可能希望针对每个用户(由first_name, last_name, sex等列定义),只保留其最新的3条活动记录。

假设我们有如下一个DataFrame:

id first_name last_name sex country
01 John Doe Male USA
02 John Doe Male Canada
03 John Doe Male Mexico
04 Mark Kay Male Italy
05 John Doe Male Spain
06 Mark Kay Male France
07 John Doe Male Peru
08 Mark Kay Male India
09 Mark Kay Male Laos
10 John Doe Male Benin

目标是基于first_name、last_name和sex列的组合识别重复项,并为每个组合保留最新的3条记录(根据id列的降序)。

基于Pandas的解决方案:使用groupby().tail()

对于在内存中操作的Pandas DataFrame,groupby().tail()方法提供了一种非常简洁且高效的解决方案。

核心思路

  1. 排序数据: 首先,确保DataFrame按照定义“最新”的标准进行排序。在我们的例子中,id越大表示越新,因此需要按id升序排序。
  2. 分组: 按照用于识别重复项的列(例如first_name, last_name, sex)进行分组。
  3. 选取尾部记录: 对每个分组应用tail(n)方法,这将返回该分组中最后N条记录。由于我们已经提前排序,这些“最后N条”就是我们想要的“最新N条”。

示例代码

import pandas as pd

# 原始DataFrame数据
data = {
    'id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    'first_name': ['John', 'John', 'John', 'Mark', 'John', 'Mark', 'John', 'Mark', 'Mark', 'John'],
    'last_name': ['Doe', 'Doe', 'Doe', 'Kay', 'Doe', 'Kay', 'Doe', 'Kay', 'Kay', 'Doe'],
    'sex': ['Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male', 'Male'],
    'country': ['USA', 'Canada', 'Mexico', 'Italy', 'Spain', 'France', 'Peru', 'India', 'Laos', 'Benin']
}

df = pd.DataFrame(data)

# 1. 根据'id'列对DataFrame进行排序,确保'tail(3)'能获取到最新的3条记录
# 如果'id'本身就是递增的,此步骤可确保正确性。
df_sorted = df.sort_values(by='id').copy()

# 2. 按照指定列进行分组,并为每个组保留最后3条记录
result_df = df_sorted.groupby(['first_name', 'last_name', 'sex']).tail(3)

# 3. (可选)重置索引,使索引连续
result_df = result_df.reset_index(drop=True)

# 显示结果DataFrame
print("处理后的DataFrame:")
print(result_df)

输出结果:

Timely
Timely

一款AI时间跟踪管理工具!

下载
处理后的DataFrame:
   id first_name last_name   sex country
0   5       John       Doe  Male   Spain
1   6       Mark       Kay  Male  France
2   7       John       Doe  Male    Peru
3   8       Mark       Kay  Male   India
4   9       Mark       Kay  Male    Laos
5  10       John       Doe  Male   Benin

基于PySpark的解决方案:使用窗口函数

对于大规模分布式数据集,例如在Apache Spark环境中使用PySpark,groupby().tail()方法不再适用。此时,窗口函数(Window Functions)是实现此功能的标准且高效的方式。

核心思路

  1. 定义窗口规范: 使用Window.partitionBy()定义分组的列,并使用orderBy()定义组内排序的列。为了获取“最新”的记录,排序通常是降序(例如,id降序)。
  2. 生成行号: 在每个分区(即每个重复组)内,根据排序规则为每条记录生成一个行号(row_number())。
  3. 过滤: 筛选出row_number小于或等于N的记录。

示例代码(PySpark风格)

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 假设df是一个Spark DataFrame
# 这里为了示例,我们创建一个模拟的SparkSession和DataFrame
spark = SparkSession.builder.appName("FilterDuplicates").getOrCreate()

data = [
    (1, 'John', 'Doe', 'Male', 'USA'),
    (2, 'John', 'Doe', 'Male', 'Canada'),
    (3, 'John', 'Doe', 'Male', 'Mexico'),
    (4, 'Mark', 'Kay', 'Male', 'Italy'),
    (5, 'John', 'Doe', 'Male', 'Spain'),
    (6, 'Mark', 'Kay', 'Male', 'France'),
    (7, 'John', 'Doe', 'Male', 'Peru'),
    (8, 'Mark', 'Kay', 'Male', 'India'),
    (9, 'Mark', 'Kay', 'Male', 'Laos'),
    (10, 'John', 'Doe', 'Male', 'Benin')
]
columns = ['id', 'first_name', 'last_name', 'sex', 'country']
df_spark = spark.createDataFrame(data, columns)

# 定义窗口规范:按first_name, last_name, sex分组,按id降序排序
window_spec = Window.partitionBy('first_name', 'last_name', 'sex').orderBy(F.desc('id'))

# 为每个分区内的记录生成行号
df_with_row_number = df_spark.withColumn('row_number', F.row_number().over(window_spec))

# 过滤,只保留行号小于等于3的记录
filtered_df_spark = df_with_row_number.filter('row_number <= 3')

# 移除辅助列row_number
result_df_spark = filtered_df_spark.drop('row_number')

# 显示结果
print("处理后的Spark DataFrame:")
result_df_spark.show()

spark.stop()

输出结果:

处理后的Spark DataFrame:
+---+----------+---------+----+-------+
| id|first_name|last_name| sex|country|
+---+----------+---------+----+-------+
|  5|      John|      Doe|Male|  Spain|
|  7|      John|      Doe|Male|   Peru|
| 10|      John|      Doe|Male|  Benin|
|  6|      Mark|      Kay|Male| France|
|  8|      Kay |      Kay|Male|  India|
|  9|      Mark|      Kay|Male|   Laos|
+---+----------+---------+----+-------+

效率与选择考量

  • Pandas groupby().tail(): 对于数据集能够完全载入内存的情况,groupby().tail()方法通常非常高效且代码简洁。它是Pandas中处理此类问题的首选方法。其内部实现经过高度优化,能够有效处理分组和选择操作。
  • PySpark 窗口函数: 对于大规模分布式数据集,当数据量超出单机内存限制时,PySpark的窗口函数是唯一的选择。虽然代码可能比Pandas版本稍长,但它能在分布式集群上高效执行,避免了数据收集到单个节点的瓶颈。Spark的优化器能够智能地处理窗口操作,确保性能。

注意事项

  • 排序的重要性: 无论是Pandas还是PySpark,定义“最新”或“最旧”的关键在于正确的排序。如果“最新”是基于时间戳,则应按时间戳列排序。如果“最新”是基于某个ID,则按ID排序。
  • 性能优化:
    • 在Pandas中,如果DataFrame非常大,sort_values()可能会消耗较多内存和时间。确保你的系统有足够的资源。
    • 在PySpark中,窗口操作会涉及数据重分区(shuffle),这可能是一个耗时操作。合理选择partitionBy的列,避免创建过多或过少的分区,有助于优化性能。
  • reset_index(): 在Pandas中,groupby().tail()操作会保留原始索引。如果需要一个从0开始的连续索引,记得调用reset_index(drop=True)。

总结

本文详细介绍了在数据处理中,如何根据特定分组筛选重复记录并保留指定数量(N)的最新数据。对于内存中的数据集,Pandas的df.sort_values().groupby().tail(N)组合方法提供了一个简洁高效的解决方案。而对于分布式大数据集,PySpark的窗口函数(Window.partitionBy().orderBy().row_number())则是实现相同逻辑的标准且高性能途径。理解这两种方法的适用场景和实现原理,能帮助开发者根据实际需求选择最合适的工具和策略,从而高效地完成数据清洗任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

319

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

229

2023.10.07

Python 时间序列分析与预测
Python 时间序列分析与预测

本专题专注讲解 Python 在时间序列数据处理与预测建模中的实战技巧,涵盖时间索引处理、周期性与趋势分解、平稳性检测、ARIMA/SARIMA 模型构建、预测误差评估,以及基于实际业务场景的时间序列项目实操,帮助学习者掌握从数据预处理到模型预测的完整时序分析能力。

49

2025.12.04

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

957

2023.11.02

apache是什么意思
apache是什么意思

Apache是Apache HTTP Server的简称,是一个开源的Web服务器软件。是目前全球使用最广泛的Web服务器软件之一,由Apache软件基金会开发和维护,Apache具有稳定、安全和高性能的特点,得益于其成熟的开发和广泛的应用实践,被广泛用于托管网站、搭建Web应用程序、构建Web服务和代理等场景。本专题为大家提供了Apache相关的各种文章、以及下载和课程,希望对各位有所帮助。

403

2023.08.23

apache启动失败
apache启动失败

Apache启动失败可能有多种原因。需要检查日志文件、检查配置文件等等。想了解更多apache启动的相关内容,可以阅读本专题下面的文章。

924

2024.01.16

PHP 高并发与性能优化
PHP 高并发与性能优化

本专题聚焦 PHP 在高并发场景下的性能优化与系统调优,内容涵盖 Nginx 与 PHP-FPM 优化、Opcode 缓存、Redis/Memcached 应用、异步任务队列、数据库优化、代码性能分析与瓶颈排查。通过实战案例(如高并发接口优化、缓存系统设计、秒杀活动实现),帮助学习者掌握 构建高性能PHP后端系统的核心能力。

96

2025.10.16

PHP 数据库操作与性能优化
PHP 数据库操作与性能优化

本专题聚焦于PHP在数据库开发中的核心应用,详细讲解PDO与MySQLi的使用方法、预处理语句、事务控制与安全防注入策略。同时深入分析SQL查询优化、索引设计、慢查询排查等性能提升手段。通过实战案例帮助开发者构建高效、安全、可扩展的PHP数据库应用系统。

71

2025.11.13

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

150

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
RunnerGo从入门到精通
RunnerGo从入门到精通

共22课时 | 1.7万人学习

尚学堂Mahout视频教程
尚学堂Mahout视频教程

共18课时 | 3.2万人学习

Linux优化视频教程
Linux优化视频教程

共14课时 | 3.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号