Python多线程任务调度需结合Lock与Queue:Lock保护共享资源避免竞态,Queue实现线程安全的任务分发;二者协同确保并发稳而准。

Python多线程中,任务调度不是简单地开几个Thread就完事——关键在控制并发节奏、避免资源争抢、保证任务有序执行。核心靠两样:锁(threading.Lock)管“谁来改”,队列(queue.Queue)管“谁来干”。二者配合,才能让多线程真正稳而准。
用Lock防止共享数据被乱改
多个线程同时读写同一变量(比如计数器、缓存字典、文件句柄),不加锁极易出错——看似+1的操作实际分“读-改-写”三步,中间可能被其他线程插队,导致结果丢失。
正确做法是:把临界区(即访问共享资源的代码段)用lock.acquire()和lock.release()包住;更推荐用with lock:自动管理,不怕忘记释放。
- 不要锁整个函数,只锁真正需要同步的几行,否则拖慢并发效率
- 避免嵌套加锁或跨函数传锁,容易引发死锁;如必须多资源,按固定顺序获取锁
- 对只读操作通常不用锁,但若读操作依赖某状态(如“先检查再创建”),仍需加锁保证原子性
用Queue实现线程安全的任务分发
queue.Queue天生线程安全,内部已用锁保护,适合做生产者-消费者模型:主线程或生产者线程往队列塞任务,工作线程从队列取任务执行。它比手动维护列表+Lock更简洁、更可靠。
立即学习“Python免费学习笔记(深入)”;
常用模式是启动固定数量的工作线程,持续调用q.get()阻塞等待任务,处理完调用q.task_done();主程序用q.join()等待所有任务完成。
- 设
maxsize可限制队列长度,防止内存爆掉(比如上传任务积压时主动限流) - 用
q.put_nowait()或q.get_nowait()做非阻塞尝试,配合try/except queue.Full或queue.Empty处理瞬时无任务情况 - 队列里放什么?建议放轻量任务对象(如元组
(func, args, kwargs)或自定义任务类),别直接塞大数据或打开的文件句柄
调度逻辑:按优先级/延迟/周期分发任务
基础队列是FIFO,但真实调度常需更智能策略。Python标准库提供queue.PriorityQueue(按数字优先级排序)和queue.LifoQueue(栈式),满足多数场景。
若需延迟执行(如5秒后发邮件)或周期执行(每30秒查一次状态),Queue本身不支持,需结合threading.Timer或外部调度器(如APScheduler);但注意:Timer回调仍在新线程中运行,涉及共享资源仍要加锁。
-
PriorityQueue里存元组(priority_num, task_data),数字越小优先级越高;注意priority相同时,task_data必须可比较,否则报错 - 避免在任务函数里长时间阻塞(如sleep、requests请求),会卡住整个工作线程;应设timeout并捕获异常
- 任务失败怎么办?建议在工作线程内捕获异常、记录日志,并决定是否重试(可把任务重新put回队列,加重试计数防无限循环)
实战小例子:带锁统计+队列分发的爬虫调度器
假设要并发抓100个网页,统计成功/失败数,并保存HTML到本地——共享的统计字典和文件写入需锁保护,URL列表用队列分发:
import threading import queue import requestsurl_queue = queue.Queue() stats = {"success": 0, "fail": 0} stats_lock = threading.Lock()
def worker(): while True: try: url = url_queue.get(timeout=1) resp = requests.get(url, timeout=5) with stats_lock: if resp.status_code == 200: stats["success"] += 1 else: stats["fail"] += 1
保存文件等操作……
except queue.Empty: break except Exception as e: with stats_lock: stats["fail"] += 1 finally: url_queue.task_done()启动5个工人线程
for _ in range(5): t = threading.Thread(target=worker) t.start()
塞入100个URL
for i in range(100): url_queue.put(f"https://www.php.cn/link/e82a88d937e60267fd2c866b01131ada % 3}")
url_queue.join() # 等待全部完成 print(stats)
这个结构清晰分离了任务分发、并发执行、状态同步三部分,扩展性强,也便于加日志、监控或熔断逻辑。










