
并发任务管理中的挑战
在asyncio异步编程中,我们经常需要同时运行多个并发任务。asyncio.gather是一个常用的工具,它能够并发地运行多个协程,并等待它们全部完成。然而,当某些任务可能因为等待外部事件(如网络数据、消息队列消息)而长时间阻塞,甚至无限期地不返回时,asyncio.gather的默认行为就显得力不从心了。它会一直等待所有任务完成,导致整个程序无法在预设的时间内退出,即使我们通过某种机制(如设置全局标志)尝试通知任务停止,如果任务内部的await操作本身是阻塞的,任务也无法及时响应停止信号。
asyncio.wait:实现精确超时控制
为了解决上述问题,asyncio提供了更为灵活的asyncio.wait方法。与asyncio.gather不同,asyncio.wait允许我们为一组并发任务设置一个整体的超时时间。它不会等待所有任务完成,而是在达到指定超时时间后立即返回,并告知哪些任务已完成,哪些仍在等待中。
asyncio.wait函数的基本签名如下: asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
- aws: 一个可迭代对象,包含要等待的Future或协程对象。
- timeout: 可选参数,指定等待的最长时间(秒)。如果在此时间内所有任务未能完成,wait将提前返回。
- return_when: 可选参数,指定何时返回。默认为ALL_COMPLETED(所有任务完成或超时),其他选项包括FIRST_COMPLETED(第一个任务完成时返回)和FIRST_EXCEPTION(第一个任务抛出异常时返回)。
asyncio.wait返回两个集合:done和pending。
- done: 包含在等待期间已完成(正常完成或抛出异常)的任务。
- pending: 包含在等待期间尚未完成的任务。
以下是一个示例,展示如何使用asyncio.wait来管理带有超时限制的并发任务:
立即学习“Python免费学习笔记(深入)”;
import asyncio
import time
# 模拟长时间运行或阻塞的网络I/O任务
async def watch_task_data():
print(f"[{time.time():.2f}] watch_task_data: 启动,模拟等待数据...")
try:
# 模拟长时间等待网络数据,可能永不返回
await asyncio.sleep(100) # 模拟阻塞100秒
print(f"[{time.time():.2f}] watch_task_data: 收到数据并完成。")
except asyncio.CancelledError:
print(f"[{time.time():.2f}] watch_task_data: 任务被取消。")
except Exception as e:
print(f"[{time.time():.2f}] watch_task_data: 发生异常: {e}")
finally:
print(f"[{time.time():.2f}] watch_task_data: 结束。")
async def watch_task_news():
print(f"[{time.time():.2f}] watch_task_news: 启动,模拟等待新闻...")
try:
# 模拟另一个长时间等待新闻的任务
await asyncio.sleep(100) # 模拟阻塞100秒
print(f"[{time.time():.2f}] watch_task_news: 收到新闻并完成。")
except asyncio.CancelledError:
print(f"[{time.time():.2f}] watch_task_news: 任务被取消。")
except Exception as e:
print(f"[{time.time():.2f}] watch_task_news: 发生异常: {e}")
finally:
print(f"[{time.time():.2f}] watch_task_news: 结束。")
async def main():
tasks = [
watch_task_data(),
watch_task_news(),
]
print(f"[{time.time():.2f}] 主程序:开始等待任务,最长等待5秒...")
# 设置整体超时为5秒
done, pending = await asyncio.wait(tasks, timeout=5)
print(f"[{time.time():.2f}] 主程序:等待结束。已完成任务数: {len(done)}, 未完成任务数: {len(pending)}")
# 1. 处理已完成的任务 (done 集合)
# 这些任务可能已正常完成,也可能在超时前抛出异常
for task in done:
try:
result = task.result() # 获取任务结果,如果任务抛出异常,这里会重新抛出
print(f"[{time.time():.2f}] 已完成任务结果: {result if result is not None else '无'}")
except asyncio.CancelledError:
# 理论上,done集合中的任务不应该被取消,除非在wait返回前被外部取消
print(f"[{time.time():.2f}] 已完成任务被取消 (异常情况)")
except Exception as e:
print(f"[{time.time():.2f}] 已完成任务发生异常: {e}")
# 2. 处理未完成的任务 (pending 集合)
# 这些任务在超时时仍未完成,通常需要显式取消它们以释放资源
for task in pending:
print(f"[{time.time():.2f}] 主程序:正在取消未完成任务: {task.get_name() if hasattr(task, 'get_name') else task}")
task.cancel() # 发送取消信号
try:
# 推荐等待任务真正结束,以确保任务有机会执行清理逻辑
await task
except asyncio.CancelledError:
print(f"[{time.time():.2f}] 未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 已确认取消。")
except Exception as e:
print(f"[{time.time():.2f}] 取消未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 时发生异常: {e}")
print(f"[{time.time():.2f}] 主程序:所有任务处理完毕。")
if __name__ == "__main__":
asyncio.run(main())运行上述代码,你会发现尽管watch_task_data和watch_task_news内部模拟了100秒的阻塞,但整个main函数会在大约5秒后退出,并且会打印出任务被取消的信息。
优雅地处理已完成与未完成任务
在使用asyncio.wait后,正确处理done和pending集合至关重要:
处理 done 集合: 对于done集合中的每个任务,你可以调用task.result()来获取任务的返回值。如果任务在执行过程中抛出了异常,task.result()会重新抛出该异常,因此需要使用try...except块来捕获并处理。
-
处理 pending 集合: pending集合中的任务是在超时时仍未完成的任务。为了避免资源泄露或程序僵死,通常需要显式地取消这些任务。
- task.cancel(): 调用task.cancel()会向目标任务发送一个asyncio.CancelledError异常。这个异常会在任务内部下一个await点被抛出。
- 任务内部的响应: 被取消的任务必须能够捕获asyncio.CancelledError,并在其except块中执行必要的资源清理工作(例如关闭文件、网络连接等)。如果任务不处理CancelledError,它将向上冒泡,可能导致程序崩溃或行为异常。
- await task: 在调用task.cancel()之后,最佳实践是再次await task。这样做可以确保任务有机会完成其清理逻辑并真正终止。如果任务内部正确处理了CancelledError,那么await task将再次抛出CancelledError,我们可以捕获它。
替代方案:asyncio.wait_for
除了asyncio.wait,asyncio.wait_for也是一个有用的工具,它用于为单个协程或Future设置超时。如果指定的协程在超时时间内没有完成,asyncio.wait_for会取消该协程并抛出asyncio.TimeoutError。
示例:
async def limited_task():
try:
await asyncio.wait_for(some_long_running_coroutine(), timeout=10)
print("limited_task: 任务在10秒内完成。")
except asyncio.TimeoutError:
print("limited_task: 任务超时。")asyncio.wait_for适用于只需要对特定单个任务进行超时控制的场景,而asyncio.wait则更适合对一组任务进行整体超时管理。
注意事项与最佳实践
- 任务内部的取消处理: 任何可能被取消的异步任务都应该在其内部的try...except asyncio.CancelledError块中实现资源清理逻辑。这是异步编程中确保健壮性的关键一环。
- 资源清理: 无论是正常完成还是被取消,确保任务所持有的所有外部资源(如数据库连接、文件句柄、网络套接字等)都能被妥善关闭和释放。finally块是执行清理操作的理想位置。
- 异常处理: 当从done集合中的任务获取结果时,务必捕获task.result()可能抛出的任何异常。
-
asyncio.gather与asyncio.wait的选择:
- 如果需要等待所有任务完成,并且不需要整体超时控制,或者希望所有任务的异常都被聚合抛出,可以使用asyncio.gather(配合return_exceptions=True可以避免单个任务异常导致整个gather失败)。
- 如果需要对一组任务设置整体超时,并希望在超时后能够处理已完成和未完成的任务,那么asyncio.wait是更合适的选择。
总结
在asyncio应用中,有效管理并发任务的生命周期,特别是处理可能无限期阻塞的任务并实施超时控制,是构建稳定、响应迅速系统的关键。asyncio.wait提供了强大的能力,允许开发者精确控制任务组的等待行为,区分已完成和未完成的任务,并通过显式取消机制优雅地终止长时间运行的任务。结合任务内部对CancelledError的妥善处理,我们可以确保即使在复杂的异步场景下,程序也能按预期及时响应并释放资源。










