0

0

PySpark中高效移除重复数据的两种策略

聖光之護

聖光之護

发布时间:2025-08-30 16:01:11

|

358人浏览过

|

来源于php中文网

原创

PySpark中高效移除重复数据的两种策略

本文详细阐述了在PySpark环境中处理重复数据的两种主要方法:针对原生PySpark SQL DataFrame的dropDuplicates()和针对PySpark Pandas DataFrame的drop_duplicates()。文章深入分析了这两种函数的用法、适用场景及关键区别,并通过代码示例和注意事项,指导用户根据其DataFrame类型选择最合适的去重策略,确保数据处理的准确性和效率。

PySpark中重复数据处理概述

在数据处理和分析中,移除重复记录是数据清洗的关键步骤之一,尤其是在处理大规模数据集时。pyspark作为大数据处理的强大框架,提供了高效的机制来识别和消除dataframe中的重复行。然而,由于pyspark生态系统的发展,目前存在两种主要的dataframe类型,它们各自拥有不同的去重api:原生的pyspark.sql.dataframe和基于pandas api的pyspark.pandas.dataframe。理解这两种类型的差异及其对应的去重方法,对于编写健壮且高效的pyspark代码至关重要。

使用 pyspark.sql.DataFrame.dropDuplicates() 进行去重

pyspark.sql.DataFrame是PySpark的核心数据结构,它提供了类似于关系型数据库表的操作接口。对于这种类型的DataFrame,去重操作通过dropDuplicates()方法实现。

函数签名与用法

dropDuplicates()函数可以接受一个可选的列名列表作为参数,用于指定在哪些列上进行重复检查。如果不指定任何列,则默认会检查所有列。

DataFrame.dropDuplicates(subset=None)
  • subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。

示例代码

假设我们有一个包含客户ID的PySpark SQL DataFrame,我们希望移除重复的客户ID。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 初始化SparkSession
spark = SparkSession.builder.appName("DropDuplicatesSQL").getOrCreate()

# 创建一个示例PySpark SQL DataFrame
data = [("C001", "Alice"), ("C002", "Bob"), ("C001", "Alice"), ("C003", "Charlie"), ("C002", "Bob")]
columns = ["CUSTOMER_ID", "NAME"]
df_sql = spark.createDataFrame(data, columns)

print("原始 PySpark SQL DataFrame:")
df_sql.show()

# 1. 对所有列进行去重
df_distinct_all = df_sql.dropDuplicates()
print("所有列去重后的 DataFrame:")
df_distinct_all.show()

# 2. 仅根据 'CUSTOMER_ID' 列进行去重
# 注意:当仅根据子集去重时,对于重复的子集行,Spark会保留其中任意一行,其非子集列的值可能不确定。
# 在此示例中,由于(C001, Alice)是完全重复的,所以行为一致。
# 但如果数据是 (C001, Alice) 和 (C001, David),则去重后会保留其中一个。
df_distinct_id = df_sql.dropDuplicates(subset=["CUSTOMER_ID"])
print("根据 'CUSTOMER_ID' 列去重后的 DataFrame:")
df_distinct_id.show()

# 停止SparkSession
spark.stop()

输出示例:

Rationale
Rationale

Rationale 是一款可帮助企业主、经理和个人做出艰难的决定的AI工具

下载
原始 PySpark SQL DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C001|  Alice|
|       C003|Charlie|
|       C002|    Bob|
+-----------+-------+

所有列去重后的 DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C003|Charlie|
+-----------+-------+

根据 'CUSTOMER_ID' 列去重后的 DataFrame:
+-----------+-------+
|CUSTOMER_ID|   NAME|
+-----------+-------+
|       C001|  Alice|
|       C002|    Bob|
|       C003|Charlie|
+-----------+-------+

使用 pyspark.pandas.DataFrame.drop_duplicates() 进行去重

PySpark Pandas API(pyspark.pandas)旨在为熟悉Pandas库的用户提供一个在Spark上运行的相似接口。对于通过pyspark.pandas创建或转换而来的DataFrame,其去重方法与Pandas中的drop_duplicates()保持一致。

函数签名与用法

drop_duplicates()函数提供了更丰富的参数,以控制去重行为,例如保留哪个重复项(第一个、最后一个或不保留)。

DataFrame.drop_duplicates(subset=None, keep='first', inplace=False, ignore_index=False)
  • subset: 可选参数,一个字符串列表,指定用于识别重复行的列。如果为None,则所有列都将用于去重。
  • keep: 字符串,可选值有'first'、'last'或False。
    • 'first': 保留第一个出现的重复行。
    • 'last': 保留最后一个出现的重复行。
    • False: 删除所有重复行(即,如果某行有重复,则该行及其所有重复项都会被删除)。
  • inplace: 布尔值,如果为True,则在原始DataFrame上进行操作并返回None;如果为False,则返回一个新DataFrame。
  • ignore_index: 布尔值,如果为True,则重置结果DataFrame的索引。

示例代码

import pyspark.pandas as ps
from pyspark.sql import SparkSession

# 初始化SparkSession (pyspark.pandas 会自动使用现有的SparkSession)
spark = SparkSession.builder.appName("DropDuplicatesPandas").getOrCreate()

# 创建一个示例PySpark Pandas DataFrame
data = {"CUSTOMER_ID": ["C001", "C002", "C001", "C003", "C002"],
        "NAME": ["Alice", "Bob", "Alice", "Charlie", "Bob"]}
psdf = ps.DataFrame(data)

print("原始 PySpark Pandas DataFrame:")
print(psdf)

# 1. 对所有列进行去重 (默认 keep='first')
psdf_distinct_all = psdf.drop_duplicates()
print("所有列去重后的 DataFrame:")
print(psdf_distinct_all)

# 2. 仅根据 'CUSTOMER_ID' 列进行去重,保留第一个
psdf_distinct_id_first = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='first')
print("根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:")
print(psdf_distinct_id_first)

# 3. 仅根据 'CUSTOMER_ID' 列进行去重,保留最后一个
psdf_distinct_id_last = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep='last')
print("根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:")
print(psdf_distinct_id_last)

# 4. 仅根据 'CUSTOMER_ID' 列进行去重,删除所有重复项
psdf_distinct_id_false = psdf.drop_duplicates(subset=["CUSTOMER_ID"], keep=False)
print("根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:")
print(psdf_distinct_id_false)

# 停止SparkSession (如果需要,但通常在脚本结束时自动停止)
spark.stop()

输出示例:

原始 PySpark Pandas DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
2        C001    Alice
3        C003  Charlie
4        C002      Bob

所有列去重后的 DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (保留第一个) 后的 DataFrame:
  CUSTOMER_ID     NAME
0        C001    Alice
1        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (保留最后一个) 后的 DataFrame:
  CUSTOMER_ID     NAME
2        C001    Alice
4        C002      Bob
3        C003  Charlie

根据 'CUSTOMER_ID' 列去重 (删除所有重复项) 后的 DataFrame:
  CUSTOMER_ID     NAME
3        C003  Charlie

选择正确的去重方法:关键区别与注意事项

选择dropDuplicates()还是drop_duplicates()的核心在于你正在操作的DataFrame类型。

  1. DataFrame类型识别:

    • 如果你通过spark.createDataFrame()或读取Spark数据源(如Parquet、CSV)创建DataFrame,你得到的是pyspark.sql.DataFrame。此时应使用dropDuplicates()。
    • 如果你通过pyspark.pandas.DataFrame()构造函数创建DataFrame,或者将pyspark.sql.DataFrame通过df.to_pandas_on_spark()(或旧版df.to_pandas())转换为pyspark.pandas.DataFrame,那么你应该使用drop_duplicates()。

    你可以通过type(df)或df.__class__.__name__来检查DataFrame的类型。

  2. API一致性:

    • dropDuplicates()是Spark原生的API,其行为和性能优化是基于Spark分布式计算模型设计的。
    • drop_duplicates()则遵循Pandas的API规范,对于熟悉Pandas的用户来说更直观。它在底层会转换为Spark操作,但其接口与Pandas保持高度一致。
  3. 功能差异:

    • dropDuplicates()相对简洁,主要关注去重本身。当基于子集去重时,它保留哪个重复项是不确定的(通常是Spark内部优化决定的任意一个)。
    • drop_duplicates()提供了keep参数,允许你精确控制保留第一个、最后一个还是删除所有重复项,这在某些业务场景下非常有用。
  4. 性能考量: 两种方法在底层都会触发Spark的distinct或groupBy操作,这通常涉及到数据的shuffle(混洗),对于大规模数据集而言,shuffle是计算密集型操作。因此,无论使用哪种方法,都应注意其对性能的影响。

总结

PySpark提供了两种强大且高效的方法来处理DataFrame中的重复数据:pyspark.sql.DataFrame的dropDuplicates()和pyspark.pandas.DataFrame的drop_duplicates()。理解它们各自的适用场景和功能特性是编写高效PySpark代码的关键。在实践中,务必根据你当前操作的DataFrame类型来选择正确的去重函数。当需要更精细地控制重复项的保留策略时,pyspark.pandas.DataFrame.drop_duplicates()的keep参数提供了更大的灵活性。始终牢记,去重操作可能涉及数据混洗,因此在处理超大规模数据集时,应评估其性能影响。

相关专题

更多
数据分析工具有哪些
数据分析工具有哪些

数据分析工具有Excel、SQL、Python、R、Tableau、Power BI、SAS、SPSS和MATLAB等。详细介绍:1、Excel,具有强大的计算和数据处理功能;2、SQL,可以进行数据查询、过滤、排序、聚合等操作;3、Python,拥有丰富的数据分析库;4、R,拥有丰富的统计分析库和图形库;5、Tableau,提供了直观易用的用户界面等等。

676

2023.10.12

SQL中distinct的用法
SQL中distinct的用法

SQL中distinct的语法是“SELECT DISTINCT column1, column2,...,FROM table_name;”。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

320

2023.10.27

SQL中months_between使用方法
SQL中months_between使用方法

在SQL中,MONTHS_BETWEEN 是一个常见的函数,用于计算两个日期之间的月份差。想了解更多SQL的相关内容,可以阅读本专题下面的文章。

346

2024.02.23

SQL出现5120错误解决方法
SQL出现5120错误解决方法

SQL Server错误5120是由于没有足够的权限来访问或操作指定的数据库或文件引起的。想了解更多sql错误的相关内容,可以阅读本专题下面的文章。

1094

2024.03.06

sql procedure语法错误解决方法
sql procedure语法错误解决方法

sql procedure语法错误解决办法:1、仔细检查错误消息;2、检查语法规则;3、检查括号和引号;4、检查变量和参数;5、检查关键字和函数;6、逐步调试;7、参考文档和示例。想了解更多语法错误的相关内容,可以阅读本专题下面的文章。

357

2024.03.06

oracle数据库运行sql方法
oracle数据库运行sql方法

运行sql步骤包括:打开sql plus工具并连接到数据库。在提示符下输入sql语句。按enter键运行该语句。查看结果,错误消息或退出sql plus。想了解更多oracle数据库的相关内容,可以阅读本专题下面的文章。

675

2024.04.07

sql中where的含义
sql中where的含义

sql中where子句用于从表中过滤数据,它基于指定条件选择特定的行。想了解更多where的相关内容,可以阅读本专题下面的文章。

571

2024.04.29

sql中删除表的语句是什么
sql中删除表的语句是什么

sql中用于删除表的语句是drop table。语法为drop table table_name;该语句将永久删除指定表的表和数据。想了解更多sql的相关内容,可以阅读本专题下面的文章。

414

2024.04.29

PHP 表单处理与文件上传安全实战
PHP 表单处理与文件上传安全实战

本专题聚焦 PHP 在表单处理与文件上传场景中的实战与安全问题,系统讲解表单数据获取与校验、XSS 与 CSRF 防护、文件类型与大小限制、上传目录安全配置、恶意文件识别以及常见安全漏洞的防范策略。通过贴近真实业务的案例,帮助学习者掌握 安全、规范地处理用户输入与文件上传的完整开发流程。

5

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
CSS3 教程
CSS3 教程

共18课时 | 4.5万人学习

PostgreSQL 教程
PostgreSQL 教程

共48课时 | 7万人学习

Django 教程
Django 教程

共28课时 | 3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号