
本文介绍如何通过 `acks_late=true` 和 `reject_on_worker_lost=true` 配合使用,使 celery 在 worker 异常终止(如被 sigkill 杀死)时,自动将未完成任务重新放回队列,避免任务丢失,无需依赖长时 `visibility_timeout`。
Celery 默认采用“预取确认”(ack on receipt)机制:任务一旦被 worker 拉取,即向 Broker 发送 ACK,即使 worker 后续崩溃,Broker 也认为该任务已成功处理,从而导致任务丢失。为解决这一问题,需启用延迟确认(late acknowledgment)与worker 失联拒绝机制,二者协同可实现毫秒级故障感知与任务回滚。
✅ 核心配置说明
| 配置项 | 作用 | 推荐值 |
|---|---|---|
| task_acks_late=True | 延迟 ACK 至任务执行完成后发送(而非拉取时),确保失败/中断时任务仍保留在队列中 | True |
| task_reject_on_worker_lost=True | 当 worker 进程意外退出(如 SIGKILL、OOM Kill、崩溃)且任务尚未完成时,主动向 Broker 发送 REJECT 并设置 requeue=True,使任务立即重回队列头部 | True |
⚠️ 注意:reject_on_worker_lost=True 仅在 acks_late=True 生效时起作用;若未启用 acks_late,任务早已被 ACK,Broker 不再管理其生命周期,此参数无效。
? 使用方式(推荐粒度:任务级)
你可在单个任务装饰器中精准控制容错行为,避免全局配置影响其他任务:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379//')
@app.task(acks_late=True, reject_on_worker_lost=True)
def process_payment(order_id: str) -> dict:
# 模拟可能被中断的长时间操作
import time
time.sleep(30) # 若此时 worker 被 kill -9,任务将自动重入队
return {"status": "completed", "order_id": order_id}也可在应用级别统一启用(适用于所有任务):
app.conf.update(
task_acks_late=True,
task_reject_on_worker_lost=True,
)? 补充说明与最佳实践
- Broker 兼容性:该机制在 Redis 和 RabbitMQ 上均稳定支持;若使用 Redis,请确保版本 ≥ 5.0 且未禁用 client-output-buffer-limit 等关键配置。
- 幂等性必须前置:因任务可能被重复执行,业务逻辑(如扣款、发信)务必设计为幂等,建议结合唯一任务 ID(task_id)或业务单号做去重校验。
- 不替代健康监控:此方案解决的是「瞬时故障」下的任务兜底,不能替代对 worker 进程、资源、心跳的主动监控(如 Prometheus + Celery Exporter)。
- 避免滥用 requeue=True:频繁重入队可能导致任务雪崩,建议配合 max_retries=3 和指数退避(countdown)提升鲁棒性:
@app.task(
acks_late=True,
reject_on_worker_lost=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 3},
default_retry_delay=60 # 首次重试延迟 60 秒
)
def fetch_external_data(url: str):
...通过合理组合 acks_late 与 reject_on_worker_lost,你可以在不牺牲吞吐的前提下,显著提升 Celery 任务系统的可靠性与弹性——真正实现“worker 可死,任务不死”。










