0

0

Python Asyncio:优雅地管理与终止长时间运行的任务

霞舞

霞舞

发布时间:2025-07-22 14:40:14

|

246人浏览过

|

来源于php中文网

原创

python asyncio:优雅地管理与终止长时间运行的任务

本文旨在探讨在Python asyncio异步编程中,如何有效管理和终止可能长时间阻塞的任务,以避免程序无限期等待。我们将重点介绍 asyncio.wait 和 asyncio.wait_for 这两个关键工具,它们提供了设置任务超时机制的能力。通过详细的代码示例和最佳实践,您将学会如何确保异步应用程序在预设时间内响应并关闭,即使某些I/O操作不活跃。

1. 问题背景:asyncio.gather 的局限性

在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。

考虑以下场景:

import asyncio

stop = False

async def watch_task1(client):
    while not stop:
        # 假设 client.ws.get_data() 可能长时间没有数据返回
        await client.ws.get_data()
        print("Task 1 received data")

async def watch_task2(client):
    while not stop:
        # 假设 client.ws.get_news() 也可能长时间没有数据返回
        await client.ws.get_news()
        print("Task 2 received news")

async def stop_after(delay_seconds):
    global stop
    await asyncio.sleep(delay_seconds)
    print(f"Stopping after {delay_seconds} seconds...")
    stop = True

class MockClient:
    async def get_data(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def get_news(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_gather():
    client = MockClient()
    tasks = [
        watch_task1(client),
        watch_task2(client),
        stop_after(5), # 尝试在5秒后停止
    ]

    try:
        # 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"An exception occurred: {e}")
    print("Main gather finished.")

# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束

在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。

2. 解决方案:使用 asyncio.wait 进行超时控制

为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。

立即学习Python免费学习笔记(深入)”;

2.1 asyncio.wait 概述

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。

DreamGen
DreamGen

一个AI驱动的角色扮演和故事写作的平台

下载
  • aws: 一个由协程、任务或 Future 组成的集合。
  • timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。
  • return_when: 可选参数,定义何时返回。常用值包括:
    • asyncio.ALL_COMPLETED (默认): 等待所有任务完成。
    • asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。
    • asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。

asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。

2.2 实现超时停止逻辑

import asyncio

stop = False # 这是一个共享状态,用于控制协程的内部循环

async def watch_task1(client):
    try:
        while not stop:
            print("Task 1: Waiting for data...")
            await client.ws.get_data() # 可能会阻塞
            print("Task 1: Data received.")
    except asyncio.CancelledError:
        print("Task 1: Cancelled.")
    finally:
        print("Task 1: Exiting.")

async def watch_task2(client):
    try:
        while not stop:
            print("Task 2: Waiting for news...")
            await client.ws.get_news() # 可能会阻塞
            print("Task 2: News received.")
    except asyncio.CancelledError:
        print("Task 2: Cancelled.")
    finally:
        print("Task 2: Exiting.")

# MockClient 保持不变
class MockClient:
    async def get_data(self):
        # 模拟长时间阻塞,但为了演示,将其缩短
        await asyncio.sleep(5)
    async def get_news(self):
        # 模拟长时间阻塞
        await asyncio.sleep(5)
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_wait_timeout():
    client = MockClient()
    tasks_to_run = [
        asyncio.create_task(watch_task1(client)), # 显式创建任务
        asyncio.create_task(watch_task2(client)),
    ]

    print("Starting tasks with a 3-second timeout...")
    # 设置一个全局超时,例如3秒
    done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)

    print("\n--- After asyncio.wait ---")
    print(f"Completed tasks ({len(done)}):")
    for task in done:
        try:
            # 获取任务结果,如果任务抛出异常,这里会重新抛出
            result = task.result()
            print(f"  Task finished successfully: {task.get_name()}, Result: {result}")
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} was cancelled (expected for pending tasks).")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception: {e}")

    print(f"\nPending tasks ({len(pending)}):")
    for task in pending:
        print(f"  Task {task.get_name()} is still pending. Cancelling...")
        task.cancel() # 取消未完成的任务
        try:
            await task # 等待任务真正结束,以便其处理 CancelledError
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} successfully cancelled and cleaned up.")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception during cancellation cleanup: {e}")

    print("Main wait_timeout finished.")

# 运行主函数
if __name__ == "__main__":
    asyncio.run(main_wait_timeout())

代码解释:

  1. 显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。
  2. 设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。
  3. 处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。
  4. 处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。
  5. 协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try...except asyncio.CancelledError...finally 块来演示这一点。
  6. 等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。

3. 替代方案:asyncio.wait_for

如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。

async def example_wait_for():
    client = MockClient()
    try:
        print("Attempting to get data with a 2-second timeout...")
        # watch_task1 内部的循环会因为超时而中断
        await asyncio.wait_for(watch_task1(client), timeout=2)
        print("watch_task1 finished within timeout.")
    except asyncio.TimeoutError:
        print("watch_task1 timed out!")
    except Exception as e:
        print(f"watch_task1 raised an unexpected exception: {e}")
    print("Example wait_for finished.")

# 如果在 main_wait_timeout 之后运行
# asyncio.run(example_wait_for())

asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。

4. 注意事项与最佳实践

  • 任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。
  • 资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。
  • 选择 wait 还是 wait_for:
    • asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。
    • asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。
  • return_exceptions=True 与 task.result():
    • asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。
    • 对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。

5. 总结

在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

718

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

627

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

744

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

617

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1236

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

575

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

700

2023.08.11

php源码安装教程大全
php源码安装教程大全

本专题整合了php源码安装教程,阅读专题下面的文章了解更多详细内容。

74

2025.12.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 0.6万人学习

Django 教程
Django 教程

共28课时 | 2.6万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.0万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号