
1. 问题背景:asyncio.gather 的局限性
在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。
考虑以下场景:
import asyncio
stop = False
async def watch_task1(client):
while not stop:
# 假设 client.ws.get_data() 可能长时间没有数据返回
await client.ws.get_data()
print("Task 1 received data")
async def watch_task2(client):
while not stop:
# 假设 client.ws.get_news() 也可能长时间没有数据返回
await client.ws.get_news()
print("Task 2 received news")
async def stop_after(delay_seconds):
global stop
await asyncio.sleep(delay_seconds)
print(f"Stopping after {delay_seconds} seconds...")
stop = True
class MockClient:
async def get_data(self):
await asyncio.sleep(100) # 模拟长时间阻塞
async def get_news(self):
await asyncio.sleep(100) # 模拟长时间阻塞
async def sleep(self, delay):
await asyncio.sleep(delay)
async def main_gather():
client = MockClient()
tasks = [
watch_task1(client),
watch_task2(client),
stop_after(5), # 尝试在5秒后停止
]
try:
# 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
print(f"An exception occurred: {e}")
print("Main gather finished.")
# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。
2. 解决方案:使用 asyncio.wait 进行超时控制
为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。
立即学习“Python免费学习笔记(深入)”;
2.1 asyncio.wait 概述
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。
- aws: 一个由协程、任务或 Future 组成的集合。
- timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。
- return_when: 可选参数,定义何时返回。常用值包括:
- asyncio.ALL_COMPLETED (默认): 等待所有任务完成。
- asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。
- asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。
asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。
2.2 实现超时停止逻辑
import asyncio
stop = False # 这是一个共享状态,用于控制协程的内部循环
async def watch_task1(client):
try:
while not stop:
print("Task 1: Waiting for data...")
await client.ws.get_data() # 可能会阻塞
print("Task 1: Data received.")
except asyncio.CancelledError:
print("Task 1: Cancelled.")
finally:
print("Task 1: Exiting.")
async def watch_task2(client):
try:
while not stop:
print("Task 2: Waiting for news...")
await client.ws.get_news() # 可能会阻塞
print("Task 2: News received.")
except asyncio.CancelledError:
print("Task 2: Cancelled.")
finally:
print("Task 2: Exiting.")
# MockClient 保持不变
class MockClient:
async def get_data(self):
# 模拟长时间阻塞,但为了演示,将其缩短
await asyncio.sleep(5)
async def get_news(self):
# 模拟长时间阻塞
await asyncio.sleep(5)
async def sleep(self, delay):
await asyncio.sleep(delay)
async def main_wait_timeout():
client = MockClient()
tasks_to_run = [
asyncio.create_task(watch_task1(client)), # 显式创建任务
asyncio.create_task(watch_task2(client)),
]
print("Starting tasks with a 3-second timeout...")
# 设置一个全局超时,例如3秒
done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)
print("\n--- After asyncio.wait ---")
print(f"Completed tasks ({len(done)}):")
for task in done:
try:
# 获取任务结果,如果任务抛出异常,这里会重新抛出
result = task.result()
print(f" Task finished successfully: {task.get_name()}, Result: {result}")
except asyncio.CancelledError:
print(f" Task {task.get_name()} was cancelled (expected for pending tasks).")
except Exception as e:
print(f" Task {task.get_name()} raised an exception: {e}")
print(f"\nPending tasks ({len(pending)}):")
for task in pending:
print(f" Task {task.get_name()} is still pending. Cancelling...")
task.cancel() # 取消未完成的任务
try:
await task # 等待任务真正结束,以便其处理 CancelledError
except asyncio.CancelledError:
print(f" Task {task.get_name()} successfully cancelled and cleaned up.")
except Exception as e:
print(f" Task {task.get_name()} raised an exception during cancellation cleanup: {e}")
print("Main wait_timeout finished.")
# 运行主函数
if __name__ == "__main__":
asyncio.run(main_wait_timeout())代码解释:
- 显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。
- 设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。
- 处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。
- 处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。
- 协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try...except asyncio.CancelledError...finally 块来演示这一点。
- 等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。
3. 替代方案:asyncio.wait_for
如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。
async def example_wait_for():
client = MockClient()
try:
print("Attempting to get data with a 2-second timeout...")
# watch_task1 内部的循环会因为超时而中断
await asyncio.wait_for(watch_task1(client), timeout=2)
print("watch_task1 finished within timeout.")
except asyncio.TimeoutError:
print("watch_task1 timed out!")
except Exception as e:
print(f"watch_task1 raised an unexpected exception: {e}")
print("Example wait_for finished.")
# 如果在 main_wait_timeout 之后运行
# asyncio.run(example_wait_for())asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。
4. 注意事项与最佳实践
- 任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。
- 资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。
-
选择 wait 还是 wait_for:
- asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。
- asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。
-
return_exceptions=True 与 task.result():
- asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。
- 对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。
5. 总结
在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。










