0

0

Dask DataFrame groupby 模式(Mode)聚合的实现指南

碧海醫心

碧海醫心

发布时间:2025-11-15 12:13:37

|

700人浏览过

|

来源于php中文网

原创

Dask DataFrame groupby 模式(Mode)聚合的实现指南

本教程详细阐述了如何在 dask dataframe 中对分组数据执行模式(mode)聚合。由于 dask 不直接提供 `groupby.agg` 的模式函数,文章通过自定义 `dask.dataframe.aggregation` 类,实现 `chunk`、`agg` 和 `finalize` 阶段的逻辑,从而有效地在分布式环境中计算分组模式,并提供完整的示例代码和注意事项。

引言:Dask Groupby 模式聚合的挑战

在数据分析中,查找一组数据的众数(mode)是一项常见操作。Pandas DataFrame 提供了 Series.mode() 方法,并且可以方便地与 groupby().agg() 结合使用,以计算每个分组的众数。然而,在处理大规模数据集时,Dask DataFrame 成为一个强大的分布式计算工具。尽管 Dask 提供了丰富的聚合功能,但其内置的 groupby().aggregate() 方法并不直接支持像 Pandas Series.mode 这样的聚合操作。这意味着,如果我们需要在 Dask DataFrame 中进行分组众数计算,就需要自定义聚合逻辑。

Pandas 中的模式聚合(作为参考)

在深入 Dask 的自定义聚合之前,我们首先回顾一下在 Pandas 中如何轻松实现这一功能。这有助于理解我们希望在 Dask 中复制的行为。

import pandas as pd
import numpy as np

# 示例数据
data_pandas = pd.DataFrame({
    'status': ['pending', 'pending', 'pending', 'canceled', 'canceled', 'canceled', 'confirmed', 'confirmed', 'confirmed'],
    'clientId': ['A', 'B', 'C', 'A', 'D', 'C', 'A', 'B', 'C'],
    'partner': ['A', np.nan, 'C', 'A', np.nan, 'C', 'A', np.nan, 'C'],
    'product': ['afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard', 'afiliates', 'pre-paid', 'giftcard'],
    'brand': ['brand_4', 'brand_2', 'brand_3', 'brand_1', 'brand_2', 'brand_3', 'brand_1', 'brand_3', 'brand_3'],
    'gmv': [100, 100, 100, 100, 100, 100, 100, 100, 100]
})

data_pandas = data_pandas.astype({
    "partner": "category",
    "status": "category",
    "product": "category",
    "brand": "category"
})

# 使用 Pandas 计算分组模式
mode_pandas = data_pandas.groupby(["clientId", "product"], observed=True).agg({"brand": pd.Series.mode})
print("Pandas Groupby Mode Result:")
print(mode_pandas)

Pandas 的 Series.mode 能够返回一个 Series,其中包含所有频率最高的值(如果存在多个众数)。

自定义 Dask 聚合函数:dask.dataframe.Aggregation

Dask 提供了一个 dask.dataframe.Aggregation 类,允许用户定义自定义的分布式聚合操作。这个类需要三个核心函数:chunk、agg 和 finalize,它们分别对应分布式计算的不同阶段。

  1. chunk 函数:局部计数chunk 函数在 Dask 的每个分区(chunk)上独立运行。它的目标是为每个分组键计算目标列中每个值的频率。对于众数计算,这意味着在每个分区内,我们需要统计每个 brand 值出现的次数。pd.Series.value_counts() 是实现这一目标的理想工具。

    def chunk(s):
        """
        在每个 Dask 分区上执行,计算每个值的频率。
        输入是一个 Pandas Series。
        """
        return s.value_counts()
  2. agg 函数:合并中间结果agg 函数负责合并 chunk 函数在不同分区上产生的中间结果。由于 chunk 函数返回的是每个值及其计数的 Series,agg 函数需要将这些 Series 合并,并对相同的值的计数进行求和,从而得到全局的频率统计。

    def agg(s0):
        """
        合并来自不同分区的中间结果(频率计数)。
        输入是一个 Pandas Series,其索引包含分组键和值,值是计数。
        """
        # _selected_obj 是 Dask 内部结构,代表了聚合的 Series。
        # groupby(level=s0._selected_obj.index.names) 确保按原始分组键和值进行求和。
        _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
        # 过滤掉计数为0或负数的情况
        _intermediate = _intermediate[_intermediate > 0]
        return _intermediate
  3. finalize 函数:确定最终模式finalize 函数在所有 agg 操作完成后运行,它接收合并后的全局频率计数,并从中确定最终的众数。这个函数需要能够处理可能存在多个众数的情况,即返回所有频率最高的值。

    LogoMaker
    LogoMaker

    免费在线制作Logo,在几分钟内完成标志设计

    下载
    def finalize(s):
        """
        从合并后的频率计数中确定最终的众数。
        输入是一个 Pandas Series,其索引包含分组键和值,值是合并后的计数。
        """
        # 获取原始分组键的层级(不包括聚合列的值本身)
        level = list(range(s.index.nlevels - 1))
        # 对每个分组,找出频率最高的项
        # s.groupby(level=level) 按原始分组键重新分组
        # apply(lambda x: x[x == x.max()]) 找出每个组内频率等于最大频率的所有项
        return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])

在 Dask DataFrame 中应用自定义模式聚合

定义好 chunk、agg 和 finalize 函数后,我们可以将它们封装到 dask.dataframe.Aggregation 对象中,然后将其传递给 Dask DataFrame 的 groupby().aggregate() 方法。

import dask.dataframe as dd
from dask.dataframe import Aggregation

# 将 Pandas DataFrame 转换为 Dask DataFrame
df_dask = dd.from_pandas(data_pandas, npartitions=1) # npartitions=1 简化示例,实际应用中可根据数据大小调整

# 定义自定义的 Dask 模式聚合
mode_dask_agg = Aggregation(
    name="mode", # 聚合的名称
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute() # .compute() 触发计算并返回 Pandas DataFrame

print("\nDask Groupby Mode Result:")
print(mode_dask_result)

完整示例代码

以下是包含所有步骤的完整示例代码:

from pandas import DataFrame, Series, NA
import pandas as pd
from dask.dataframe import from_pandas, Aggregation
import dask.dataframe as dd
import numpy as np

# 1. 准备数据
data = DataFrame(
    {
        "status": [
            "pending", "pending", "pending", "canceled", "canceled", "canceled", "confirmed", "confirmed", "confirmed",
        ],
        "clientId": ["A", "B", "C", "A", "D", "C", "A", "B", "C"],
        "partner": ["A", NA, "C", "A", NA, "C", "A", NA, "C"],
        "product": [
            "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard", "afiliates", "pre-paid", "giftcard",
        ],
        "brand": [
            "brand_4", "brand_2", "brand_3", "brand_1", "brand_2", "brand_3", "brand_1", "brand_3", "brand_3",
        ],
        "gmv": [100, 100, 100, 100, 100, 100, 100, 100, 100],
    }
)

data = data.astype(
    {
        "partner": "category",
        "status": "category",
        "product": "category",
        "brand": "category",
    }
)

# 2. Pandas 模式聚合(作为对比)
mode_pandas = data.groupby(["clientId", "product"], observed=True).agg(
    {"brand": Series.mode}
)
print("Pandas Groupby Mode Result:")
print(mode_pandas)

# 3. 转换为 Dask DataFrame
df_dask = from_pandas(data, npartitions=1)

# 4. 定义 Dask 自定义聚合函数的三个阶段
def chunk(s):
    """在每个 Dask 分区上执行,计算每个值的频率。"""
    return s.value_counts()

def agg(s0):
    """合并来自不同分区的中间结果(频率计数)。"""
    _intermediate = s0._selected_obj.groupby(level=s0._selected_obj.index.names).sum()
    _intermediate = _intermediate[_intermediate > 0]
    return _intermediate

def finalize(s):
    """从合并后的频率计数中确定最终的众数。"""
    level = list(range(s.index.nlevels - 1))
    return s.groupby(level=level, group_keys=False).apply(lambda x: x[x == x.max()])

# 5. 创建 dask.dataframe.Aggregation 对象
mode_dask_agg = Aggregation(
    name="mode",
    chunk=chunk,
    agg=agg,
    finalize=finalize,
)

# 6. 在 Dask DataFrame 上应用自定义聚合
mode_dask_result = df_dask.groupby(["clientId", "product"], observed=True, dropna=True).aggregate(
    {"brand": mode_dask_agg}
).compute()

print("\nDask Groupby Mode Result:")
print(mode_dask_result)

注意事项与 Dask/Pandas 差异

尽管上述自定义聚合旨在模拟 Pandas Series.mode 的行为,但在某些特定情况下,Dask 的结果可能与 Pandas 略有不同。这通常发生在以下情况:

  • 多个众数(Multi-mode): 当一个分组中存在多个值具有相同的最高频率时,Pandas 的 Series.mode 会返回一个包含所有这些众数的 Series。我们自定义的 finalize 函数也尝试处理这种情况,返回所有具有最大频率的值。
  • 数据类型和 NaN 处理: Dask 和 Pandas 在处理分类数据或 NaN 值时可能存在细微差异。dropna=True 参数在 Dask 的 groupby 中可以控制是否在分组前删除 NaN 值。在自定义聚合函数中,也需要确保对 NaN 的处理逻辑符合预期。
  • 性能考量: 自定义聚合虽然功能强大,但其性能可能不如 Dask 内置的、高度优化的聚合函数。对于非常大的数据集,应评估其性能影响。

在实际应用中,建议对比 Dask 和 Pandas 在小规模数据集上的结果,以验证自定义聚合的正确性,并理解任何潜在的差异。

总结

通过 dask.dataframe.Aggregation 类,我们成功地为 Dask DataFrame 的 groupby 操作实现了自定义的模式聚合功能。这种方法不仅解决了 Dask 不直接支持 Series.mode 的问题,也展示了 Dask 框架在处理复杂分布式聚合任务时的灵活性和可扩展性。理解 chunk、agg 和 finalize 三个阶段的工作原理是构建高效、正确自定义聚合的关键。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

318

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

225

2023.10.07

Python 时间序列分析与预测
Python 时间序列分析与预测

本专题专注讲解 Python 在时间序列数据处理与预测建模中的实战技巧,涵盖时间索引处理、周期性与趋势分解、平稳性检测、ARIMA/SARIMA 模型构建、预测误差评估,以及基于实际业务场景的时间序列项目实操,帮助学习者掌握从数据预处理到模型预测的完整时序分析能力。

48

2025.12.04

数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

293

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

216

2025.10.31

数据分析的方法
数据分析的方法

数据分析的方法有:对比分析法,分组分析法,预测分析法,漏斗分析法,AB测试分析法,象限分析法,公式拆解法,可行域分析法,二八分析法,假设性分析法。php中文网为大家带来了数据分析的相关知识、以及相关文章等内容。

447

2023.07.04

数据分析方法有哪几种
数据分析方法有哪几种

数据分析方法有:1、描述性统计分析;2、探索性数据分析;3、假设检验;4、回归分析;5、聚类分析。本专题为大家提供数据分析方法的相关的文章、下载、课程内容,供大家免费下载体验。

259

2023.08.07

网站建设功能有哪些
网站建设功能有哪些

网站建设功能包括信息发布、内容管理、用户管理、搜索引擎优化、网站安全、数据分析、网站推广、响应式设计、社交媒体整合和电子商务等功能。这些功能可以帮助网站管理员创建一个具有吸引力、可用性和商业价值的网站,实现网站的目标。

716

2023.10.16

笔记本电脑卡反应很慢处理方法汇总
笔记本电脑卡反应很慢处理方法汇总

本专题整合了笔记本电脑卡反应慢解决方法,阅读专题下面的文章了解更多详细内容。

1

2025.12.25

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 2.9万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号