0

0

PySpark DataFrame缺失值智能填充策略:基于多条件连接

聖光之護

聖光之護

发布时间:2025-10-12 13:31:17

|

1031人浏览过

|

来源于php中文网

原创

PySpark DataFrame缺失值智能填充策略:基于多条件连接

本文详细介绍了如何在pyspark中通过多条件连接(multiple conditional joins)和`coalesce`函数,智能地填充一个dataframe中依赖于另一个dataframe的缺失值。教程演示了如何针对不同缺失字段(如序列号和邮件)采用不同的连接键进行分步填充,并处理无匹配情况,确保数据完整性和准确性。

问题描述

在数据处理过程中,我们经常遇到需要从一个数据源(或DataFrame)中获取信息来补充另一个数据源中的缺失值的情况。本教程将解决一个具体场景:给定两个DataFrame,persons 和 people,我们需要根据特定的业务逻辑填充 persons DataFrame中 serial_no 和 mail 列的缺失值。

具体要求如下:

  1. 如果 persons DataFrame中的 serial_no 缺失,则尝试通过 mail 列与 people DataFrame的 e_mail 列进行连接,以获取 people DataFrame中的 s_no 值来填充 serial_no。
  2. 如果 persons DataFrame中的 mail 缺失,则尝试通过 serial_no 列(可能是原始值,也可能是第一步填充后的值)与 people DataFrame的 s_no 列进行连接,以获取 people DataFrame中的 e_mail 值来填充 mail。
  3. 如果以上两种连接都未能找到匹配值,则将缺失值填充为字符串 "NA"。

数据准备

首先,我们创建两个示例PySpark DataFrame来模拟 persons 和 people 数据。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"), # serial_no 缺失
    ("Robert", 20, 299011, None), # mail 缺失
    ("Hill", 78, None, "hill@example.com") # serial_no 缺失
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()

输出的原始DataFrame如下:

原始 persons DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|     null|will@example.com|
|Robert| 20|   299011|            null|
|  Hill| 78|     null|hill@example.com|
+------+---+---------+----------------+

原始 people DataFrame:
+------+------+----------------+
|  name|  s_no|          e_mail|
+------+------+----------------+
|  John|100483|john@example.com|
|   Sam|448900| sam@example.com|
|  Will|229809|will@example.com|
|Robert|299011|            null|
|  Hill|567233|hill@example.com|
+------+------+----------------+

解决方案:分步连接与合并

为了满足上述复杂的填充逻辑,我们将采用分步连接(Sequential Joins)的方法。首先填充 serial_no,然后利用可能已更新的 serial_no 信息填充 mail。coalesce 函数在这里扮演了关键角色,它能够返回其参数列表中的第一个非空表达式。

步骤一:填充缺失的 serial_no

在这一步中,我们关注 persons DataFrame中 serial_no 列的缺失值。根据需求,如果 serial_no 缺失,我们尝试通过 persons.mail 与 people.e_mail 进行左连接来获取 people.s_no。

# 步骤一:通过邮件地址连接,填充缺失的 serial_no
# 使用别名避免列名冲突
serials_enriched = persons.alias("p").join(
    people.alias("pe"),
    col("p.mail") == col("pe.e_mail"), # 连接条件:persons的mail与people的e_mail
    "left" # 左连接,保留persons所有行
).select(
    col("p.name"),
    col("p.age"),
    # 使用coalesce函数:优先选择p.serial_no,其次是pe.s_no,最后是"NA"
    coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
    col("p.mail")
)

print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()

输出结果:

SlidesAI
SlidesAI

使用SlidesAI的AI在几秒钟内创建演示文稿幻灯片

下载
填充 serial_no 后的 DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|            null|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+

可以看到,Will 和 Hill 的 serial_no 已经成功从 people DataFrame中获取并填充。Robert 的 serial_no 本身不缺失,所以保持不变。

步骤二:填充缺失的 mail

现在,我们使用在步骤一中已经填充了 serial_no 的 serials_enriched DataFrame。我们将它与 people DataFrame再次进行左连接,这次的连接条件是 serial_no 与 s_no。目标是填充 mail 列的缺失值。

# 步骤二:通过序列号连接,填充缺失的 mail
# 注意:这里使用上一步生成的 serials_enriched DataFrame
final_df = serials_enriched.alias("se").join(
    people.alias("pe"),
    col("se.serial_no") == col("pe.s_no"), # 连接条件:serials_enriched的serial_no与people的s_no
    "left" # 左连接,保留serials_enriched所有行
).select(
    col("se.name"),
    col("se.age"),
    col("se.serial_no"),
    # 使用coalesce函数:优先选择se.mail,其次是pe.e_mail,最后是"NA"
    coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()

输出结果:

最终填充后的 DataFrame:
+------+---+---------+----------------+
|  name|age|serial_no|            mail|
+------+---+---------+----------------+
|  John| 25|   100483|john@example.com|
|   Sam| 49|   448900| sam@example.com|
|  Will| 63|   229809|will@example.com|
|Robert| 20|   299011|              NA|
|  Hill| 78|   567233|hill@example.com|
+------+---+---------+----------------+

最终结果显示,Robert 的 mail 列被填充为 "NA",因为 people DataFrame中与 Robert 的 s_no (299011) 对应的 e_mail 也是缺失的。其他行的 mail 值保持不变或已成功填充。

完整代码示例

为了方便,以下是整合了所有步骤的完整PySpark代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder.appName("DataFrameMissingValueFill").getOrCreate()

# 创建persons DataFrame
data_persons = [
    ("John", 25, 100483, "john@example.com"),
    ("Sam", 49, 448900, "sam@example.com"),
    ("Will", 63, None, "will@example.com"),
    ("Robert", 20, 299011, None),
    ("Hill", 78, None, "hill@example.com")
]
columns_persons = ["name", "age", "serial_no", "mail"]
persons = spark.createDataFrame(data_persons, columns_persons)

# 创建people DataFrame
data_people = [
    ("John", 100483, "john@example.com"),
    ("Sam", 448900, "sam@example.com"),
    ("Will", 229809, "will@example.com"),
    ("Robert", 299011, None),
    ("Hill", 567233, "hill@example.com")
]
columns_people = ["name", "s_no", "e_mail"]
people = spark.createDataFrame(data_people, columns_people)

print("原始 persons DataFrame:")
persons.show()
print("原始 people DataFrame:")
people.show()

# 步骤一:通过邮件地址连接,填充缺失的 serial_no
serials_enriched = persons.alias("p").join(
    people.alias("pe"),
    col("p.mail") == col("pe.e_mail"),
    "left"
).select(
    col("p.name"),
    col("p.age"),
    coalesce(col("p.serial_no"), col("pe.s_no"), lit("NA")).alias("serial_no"),
    col("p.mail")
)

print("填充 serial_no 后的 DataFrame:")
serials_enriched.show()

# 步骤二:通过序列号连接,填充缺失的 mail
final_df = serials_enriched.alias("se").join(
    people.alias("pe"),
    col("se.serial_no") == col("pe.s_no"),
    "left"
).select(
    col("se.name"),
    col("se.age"),
    col("se.serial_no"),
    coalesce(col("se.mail"), col("pe.e_mail"), lit("NA")).alias("mail")
)

print("最终填充后的 DataFrame:")
final_df.show()

# 停止SparkSession
spark.stop()

注意事项

  1. 连接顺序的重要性: 在本例中,填充 serial_no 的操作依赖于 mail,而填充 mail 的操作可能依赖于新填充的 serial_no。因此,连接的顺序至关重要。如果先尝试填充 mail,那么它可能无法利用到尚未填充的 serial_no 信息。
  2. people DataFrame中的重复值: 如果 people DataFrame中 e_mail 或 s_no 存在重复值,那么左连接可能会导致 persons DataFrame中的行被复制。在实际应用中,如果 people DataFrame可能包含重复的连接键,通常需要先对其进行去重或聚合,以确保一对一或一对多连接的预期行为。例如,可以使用 people.dropDuplicates(["e_mail"]) 或 people.groupBy("e_mail").agg(...)。
  3. coalesce 函数的灵活性: coalesce 函数非常强大,可以处理多个备选值。它会从左到右评估参数,并返回第一个非 null 的值。这使得在填充缺失值时能够设定优先级。
  4. 数据类型一致性: 确保在连接和合并操作中涉及的列具有兼容的数据类型。例如,serial_no (整数) 和 s_no (整数) 应该匹配,mail (字符串) 和 e_mail (字符串) 也应匹配。
  5. 性能考量: 对于非常大的DataFrame,多次连接操作可能会影响性能。可以考虑使用 broadcast hint (people.hint("broadcast")) 来优化小型DataFrame的连接,以减少数据混洗。

总结

本教程展示了如何利用PySpark的强大功能,通过多步左连接和 coalesce 函数,优雅且高效地解决DataFrame中复杂条件的缺失值填充问题。这种方法不仅能够处理多字段、多条件的填充需求,还能灵活应对无匹配的情况,确保最终数据的完整性和业务逻辑的准确性。掌握这种技术对于处理真实世界中的数据集成和清洗任务至关重要。

相关专题

更多
数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

297

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

216

2025.10.31

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

229

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

433

2024.03.01

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

248

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

205

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1435

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

609

2023.11.24

桌面文件位置介绍
桌面文件位置介绍

本专题整合了桌面文件相关教程,阅读专题下面的文章了解更多内容。

0

2025.12.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Java 教程
Java 教程

共578课时 | 39.6万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号