答案:使用Python多线程和queue.Queue可实现生产者-消费者模型,生产者生成任务并放入队列,消费者从队列取出任务处理,通过put和get的阻塞机制保证线程安全,生产者结束后向队列发送None作为结束信号,消费者接收到后退出,配合task_done和join确保所有任务完成,适用于爬虫、日志处理等异步任务场景。

使用Python多线程实现任务队列(生产者-消费者模型)
在Python中,利用多线程和队列可以轻松实现“生产者-消费者”模型。该模型适用于需要异步处理任务的场景,比如爬虫、日志处理、后台任务调度等。
Python标准库中的 queue.Queue 是线程安全的,非常适合在多线程环境中作为任务队列使用。配合 threading 模块,我们可以创建多个生产者线程生成任务,多个消费者线程处理任务。
立即学习“Python免费学习笔记(深入)”;
1. 核心组件说明
queue.Queue:用于存放任务的线程安全队列,支持阻塞式 put 和 get 操作。
threading.Thread:创建线程来运行生产者和消费者函数。
threading.Event 或通过发送结束信号控制消费者退出。
通常做法是:当生产者完成任务后,向队列中放入特定数量的“结束标志”(如 None),消费者取到该标志后退出循环。
2. 基础实现代码示例
import threading import queue import time import random创建一个容量为10的任务队列
task_queue = queue.Queue(maxsize=10)
消费者停止数量(用于等待所有消费者结束)
num_workers = 3
def producer(): """生产者:生成任务并放入队列""" for i in range(10): task = f"任务-{i}" print(f"生产者:生成 {task}") task_queue.put(task) time.sleep(random.uniform(0.1, 0.5)) # 模拟生产间隔
# 发送结束信号给每个消费者 for _ in range(num_workers): task_queue.put(None) print("生产者完成,已发送结束信号")def consumer(worker_id): """消费者:从队列取出任务并处理""" while True: task = task_queue.get() if task is None: # 结束信号 print(f"消费者-{worker_id} 收到结束信号,退出") task_queue.task_done() break print(f"消费者-{worker_id} 正在处理 {task}") time.sleep(random.uniform(0.2, 0.8)) # 模拟处理时间 task_queue.task_done()
启动生产者和消费者线程
if name == "main":
创建并启动消费者线程
threads = [] for i in range(num_workers): t = threading.Thread(target=consumer, args=(i,)) t.start() threads.append(t) # 启动生产者线程 pt = threading.Thread(target=producer) pt.start() # 等待生产者完成 pt.join() # 等待所有消费者完成 for t in threads: t.join() print("所有任务处理完毕")3. 关键点解析
- 线程安全:Queue 内部使用锁机制,确保多线程访问安全,无需手动加锁。
- 阻塞操作:put() 和 get() 默认阻塞,队列满时生产者等待,队列空时消费者等待。
- 任务完成通知:调用 task_done() 表示一个任务已被处理,配合 join() 可等待所有任务完成。
- 优雅退出:通过向队列放入 None 来通知消费者退出,避免死循环。
4. 扩展建议
- 可使用 queue.PriorityQueue 实现优先级任务调度。
- 使用 concurrent.futures.ThreadPoolExecutor + 队列可更方便管理线程池。
- 加入异常处理,防止某个消费者因错误退出导致任务堆积。
- 监控队列长度,防止内存溢出(特别是无界队列)。
基本上就这些。这个模型简单高效,适合大多数IO密集型任务的并发处理。










