0

0

使用Python多进程优化大数据量匹配与筛选性能

心靈之曲

心靈之曲

发布时间:2025-10-02 15:09:01

|

402人浏览过

|

来源于php中文网

原创

使用Python多进程优化大数据量匹配与筛选性能

本文旨在解决Python处理大数据量列表匹配与筛选时遇到的性能瓶颈,特别是当传统多线程方案效果不佳时。我们将深入探讨如何利用Python的multiprocessing模块,结合Manager实现进程间数据共享,以及合理的任务分块策略,显著提升CPU密集型任务的执行效率,从而将耗时数十分钟的操作缩短至可接受的范围。

1. 问题背景与挑战

在处理大规模数据集时,例如需要在一个包含数万条记录的json列表中(json_list)查找并匹配另一个包含数千个标记(marking)的列表中的元素,性能往往成为一个关键挑战。具体场景是,json_list中的每个字典包含一个code字段,我们需要将marking列表中的每个字符串与json_list中元素的code字段进行相似度匹配。匹配规则是使用difflib.sequencematcher计算相似度,当相似度为1(完全匹配)或介于0.98到0.99之间时,认为匹配成功。

原始的单线程或简单的多线程(threading)实现,在数据量庞大时(json_list超过23,000条,marking超过3,000条),可能需要20分钟甚至更长时间才能完成。这主要是因为Python的全局解释器锁(GIL)限制了多线程在CPU密集型任务上的并行执行能力。即使创建了多个线程,它们也无法同时在多个CPU核心上运行Python字节码,导致性能提升不明显。

2. 多进程(Multiprocessing)的解决方案

为了克服GIL的限制,Python提供了multiprocessing模块,它允许创建独立的进程,每个进程都有自己的Python解释器和内存空间。这意味着不同的进程可以在不同的CPU核心上真正并行执行CPU密集型任务,从而显著提高性能。

本教程将展示如何利用multiprocessing库来优化上述数据匹配和筛选过程。

2.1 核心组件介绍

  • multiprocessing.Process: 用于创建和管理新的进程。
  • multiprocessing.Manager: 用于创建可以在不同进程之间共享的数据结构(如列表、字典等)。这是解决进程间通信和数据共享的关键,因为普通Python对象在进程间默认不共享。
  • difflib.SequenceMatcher: 用于计算两个序列(字符串)的相似度。

2.2 匹配逻辑函数 find_marking

这个函数负责执行单个标记与JSON数据项的匹配逻辑。它保持不变,因为它是一个纯计算函数,不涉及并发问题。

立即学习Python免费学习笔记(深入)”;

iWebShop开源商城系统
iWebShop开源商城系统

iWebShop是一款基于PHP语言及MYSQL数据库开发的B2B2C多用户开源免费的商城系统,系统支持自营和多商家入驻、集成微信商城、手机商城、移动端APP商城、三级分销、视频电商直播、微信小程序等于一体,它可以承载大数据量且性能优良,还可以跨平台,界面美观功能丰富是电商建站首选源码。iWebShop开源商城系统 v5.14 更新日志:新增商品编辑页面规格图片上传优化商品详情页面规格图片与主图切

下载
from difflib import SequenceMatcher

def find_marking(x: str, y: dict) -> dict | None:
    """
    比较标记字符串x与JSON数据项y中的'code'字段的相似度。
    如果相似度为1或在0.98到0.99之间,则返回y,否则返回None。
    """
    text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

注意: 确保y字典中包含'code'键,否则y.get('code', '')可以提供一个默认值,避免KeyError。

2.3 多进程筛选主函数 eliminate_marking

这个函数是整个解决方案的核心,它协调多个进程来并行处理匹配任务。

import math
from multiprocessing import Process, Manager

def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
    """
    使用多进程并行地从json_list中匹配和筛选marking_list中的标记。

    Args:
        marking_list: 待匹配的标记字符串列表。
        json_list: 包含'code'字段的JSON字典列表。

    Returns:
        一个元组,包含两个列表:
        - result_mark: 成功匹配的标记列表。
        - result: 成功匹配的JSON数据项列表。
    """
    # 1. 初始化Manager和共享数据结构
    # Manager用于创建可在进程间共享的列表,以收集结果。
    manager = Manager()
    result_mark = manager.list()  # 共享列表,用于存储成功匹配的标记
    result = manager.list()       # 共享列表,用于存储成功匹配的JSON数据项

    def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
                            shared_result_mark: Manager.list, shared_result: Manager.list):
        """
        每个进程执行的任务函数。
        它遍历分配给它的标记子列表,并尝试在data_scrap中找到匹配项。
        """
        # data_scrap是json_list的一个副本,每个进程独立操作。
        # 注意:这里的data_scrap是json_list的浅拷贝,对其内部字典的修改会影响原始字典
        # 但对其列表结构(如remove操作)的修改仅影响当前进程的副本。
        # 鉴于我们的目标是收集匹配项,这种拷贝方式是安全的。

        for marking_item in sub_marking_list: # 遍历当前进程负责的标记子列表
            for data in data_scrap:           # 遍历json_list的副本
                result_data = find_marking(marking_item, data)
                if result_data:
                    # 找到匹配项后,将其添加到共享列表中
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    # 这里的remove操作只影响当前进程的data_scrap副本,
                    # 并不影响其他进程的副本或原始json_list。
                    # 如果目标是真正从原始json_list中移除,需要更复杂的同步机制。
                    # 在当前场景下,我们主要关注收集匹配结果。
                    # data_scrap.remove(data) 
                    # 如果一个标记只需要匹配一次,可以在找到后跳出内层循环
                    break # 一个marking_item找到一个匹配后就跳出,避免重复匹配

    # 2. 任务分块与进程创建
    processes = []
    chunk_size = 100  # 每个进程处理的marking_list块的大小

    # 计算需要创建的进程数量
    # 这里将marking_list分成块,每个进程处理一个或多个块。
    # 另一种常见策略是基于CPU核心数创建进程。
    num_chunks = math.ceil(len(marking_list) / chunk_size)

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue # 避免创建空任务的进程

        p = Process(
            target=__process_eliminate,
            # args参数传递给目标函数。
            # json_list[:] 创建了一个json_list的浅拷贝,确保每个进程有独立的副本。
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start() # 启动进程

    # 3. 等待所有进程完成
    for p in processes:
        p.join() # 阻塞主进程,直到当前进程执行完毕

    # 4. 关闭Manager并返回结果
    manager.shutdown() # 在所有进程完成后关闭Manager
    return list(result_mark), list(result) # 将Manager.list转换为普通list返回

2.4 完整示例代码

为了方便测试,我们创建一些模拟数据:

import math
import time
import random
import string
from difflib import SequenceMatcher
from multiprocessing import Process, Manager

# 模拟数据
def generate_fake_data(num_json, num_marking):
    json_list = []
    for i in range(num_json):
        code_val = ''.join(random.choices(string.digits, k=6))
        json_list.append({
            "code": code_val,
            "phone_number": f"1{random.randint(1000000000, 9999999999)}",
            "email": f"user{i}@example.com",
            "address": f"address_fake_{i}",
            "note": f"note dummy {i}"
        })

    marking = []
    # 确保有一些匹配项
    for i in range(num_marking // 2):
        # 从json_list中随机取一个code作为marking
        marking.append(random.choice(json_list)['code'])
    # 添加一些不匹配的marking
    for i in range(num_marking // 2, num_marking):
        marking.append(''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5, 8))))

    random.shuffle(marking) # 打乱顺序
    return json_list, marking

# 假设的 find_marking 函数
def find_marking(x: str, y: dict) -> dict | None:
    text_match = SequenceMatcher(None, x, y.get('code', '')).ratio()
    if text_match == 1 or (0.98 <= text_match < 0.99):
        return y
    return None

# 假设的 eliminate_marking 函数(与上面定义的一致)
def eliminate_marking(marking_list: list[str], json_list: list[dict]) -> tuple[list[str], list[dict]]:
    manager = Manager()
    result_mark = manager.list()
    result = manager.list()

    def __process_eliminate(sub_marking_list: list[str], data_scrap: list[dict],
                            shared_result_mark: Manager.list, shared_result: Manager.list):
        for marking_item in sub_marking_list:
            for data in data_scrap:
                result_data = find_marking(marking_item, data)
                if result_data:
                    shared_result_mark.append(marking_item)
                    shared_result.append(result_data)
                    break # 一个marking_item找到一个匹配后就跳出

    processes = []
    # 这里的chunk_size可以根据实际CPU核心数和任务复杂度进行调整
    # 较小的chunk_size可能导致更多的进程创建和管理开销
    # 较大的chunk_size可能导致部分核心利用率不足
    chunk_size = 50 # 调整为50,以创建更多进程进行测试,更细粒度的任务分配

    # 优化:根据CPU核心数来决定进程数量,而不是简单地按chunk_size分块
    # 理想情况下,进程数不应超过CPU核心数
    # num_processes = os.cpu_count() or 1
    # marking_per_process = math.ceil(len(marking_list) / num_processes)
    # 
    # for i in range(num_processes):
    #     start_idx = i * marking_per_process
    #     end_idx = min((i + 1) * marking_per_process, len(marking_list))
    #     sub_marking_list = marking_list[start_idx:end_idx]
    #     ...

    # 当前实现是按chunk_size分块
    num_chunks = math.ceil(len(marking_list) / chunk_size)

    for i in range(num_chunks):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, len(marking_list))
        sub_marking_list = marking_list[start_idx:end_idx]

        if not sub_marking_list:
            continue

        p = Process(
            target=__process_eliminate,
            args=(sub_marking_list, json_list[:], result_mark, result)
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()
    manager.shutdown()
    return list(result_mark), list(result)

if __name__ == "__main__":
    # 生成模拟数据
    NUM_JSON = 23000
    NUM_MARKING = 3000
    print(f"生成 {NUM_JSON} 条JSON数据和 {NUM_MARKING} 条标记数据...")
    test_json_list, test_marking_list = generate_fake_data(NUM_JSON, NUM_MARKING)
    print("数据生成完毕。")

    start_time = time.time()
    eliminated_markings, eliminated_data = eliminate_marking(test_marking_list, test_json_list)
    end_time = time.time()

    print(f"\n多进程处理完成。")
    print(f"总耗时: {end_time - start_time:.2f} 秒")
    print(f"找到 {len(eliminated_markings)} 个匹配标记。")
    print(f"找到 {len(eliminated_data)} 条匹配数据。")

    # 验证部分结果
    if eliminated_markings:
        print(f"部分匹配标记示例: {eliminated_markings[:5]}")
    if eliminated_data:
        print(f"部分匹配数据示例: {eliminated_data[:2]}")

3. 注意事项与最佳实践

  1. GIL与多进程: 理解Python GIL是关键。对于CPU密集型任务,multiprocessing是比threading更好的选择,因为它绕过了GIL的限制,实现了真正的并行计算。
  2. 数据共享: 进程间数据共享比线程间复杂。普通Python对象在进程间默认不共享,需要使用multiprocessing.Manager创建共享数据结构,或者通过管道/队列进行通信。本例中Manager.list用于收集结果,避免了复杂的同步机制
  3. 数据拷贝: 在eliminate_marking函数中,我们通过json_list[:]将json_list的浅拷贝传递给每个进程。这意味着每个进程都在自己的json_list副本上进行查找。这样做的好处是避免了对同一共享json_list进行并发读写和删除操作的复杂同步问题。缺点是,如果目标是修改原始的json_list(例如,从中删除匹配项),这种方法不会直接实现。但对于“获取数据”的需求,收集匹配结果是更安全和常见的模式。
  4. 任务分块: 合理地划分任务(marking_list的chunk_size)对性能至关重要。过小的块可能导致过多的进程创建和管理开销;过大的块可能导致部分进程负载不均。通常,进程数量不应超过CPU的核心数。可以根据实际情况调整chunk_size或直接根据os.cpu_count()来分配任务。
  5. 进程开销: 进程的创建和销毁比线程更耗资源。对于非常短小的任务,多进程的开销可能抵消并行带来的收益。但在本例这种大数据量、CPU密集型任务中,多进程的优势非常明显。
  6. 错误处理: 在生产环境中,应考虑在进程中加入错误处理机制,例如使用try-except块,并记录异常,以提高程序的健壮性。
  7. Manager的生命周期: 确保在所有子进程完成后调用manager.shutdown()来清理Manager创建的资源。

4. 总结

通过将任务分解为独立的子任务并在多个进程中并行执行,结合multiprocessing.Manager实现结果的有效收集,我们成功地将大数据量列表匹配和筛选的性能提升了一个数量级。这种方法特别适用于那些受限于Python GIL的CPU密集型计算任务。理解multiprocessing的工作原理和最佳实践,能够帮助开发者在处理大规模数据时构建出更高效、更健壮的Python应用程序。

相关文章

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

715

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

625

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

739

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

617

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1235

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

575

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

699

2023.08.11

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

7

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 0.6万人学习

Django 教程
Django 教程

共28课时 | 2.6万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号