worker pool 的核心价值是可控并发,通过固定数量 worker 从共享通道取任务实现限流与复用;需用 sync.WaitGroup 和 done 通道实现优雅退出,避免任务丢失。

为什么不用 go f() 直接起 goroutine
直接用 go f() 启动大量任务,容易导致 goroutine 泛滥,内存暴涨甚至 OOM。尤其当任务来自网络请求、文件读取或数据库查询时,数量不可控。worker pool 的核心价值不是“并发”,而是“可控并发”——把任务排队、限流、复用 goroutine。
- 典型错误:每来一个 HTTP 请求就
go handle(req),QPS 上千时可能创建上万个 goroutine - 正确思路:固定 N 个长期运行的 worker,从共享
chan *Task拿任务,处理完继续取下一个 - 注意:worker 数量不等于 CPU 核心数;IO 密集型任务(如 HTTP 调用)可设为 10–100,CPU 密集型建议 ≤
runtime.NumCPU()
如何设计任务通道与退出机制
通道类型和关闭时机决定 worker 是否能干净退出。别用 chan Task(值拷贝开销大),优先用 chan *Task 或 chan func();退出不能靠 panic 或 os.Exit,必须支持 graceful shutdown。
- 任务通道用
chan interface{}灵活但 lose type safety,推荐chan Job(定义具体接口或结构体) - 不要在主 goroutine 关闭通道后立刻 return —— worker 可能还在读,要等所有 worker 空转退出
- 标准做法:用
sync.WaitGroup计数活跃 worker,配合done chan struct{}通知停止取新任务 - 示例中常见坑:
close(jobChan)后没等 worker 处理完剩余任务就退出,导致任务丢失
workerPool.Run() 的典型实现要点
一个健壮的 Run() 方法要处理启动、任务分发、错误传播、超时控制和回收。它不该阻塞主流程,但要提供同步等待入口(如 Wait())。
func (p *WorkerPool) Run() {
for i := 0; i < p.workers; i++ {
go func() {
defer p.wg.Done()
for {
select {
case job, ok := <-p.jobs:
if !ok {
return // 通道关闭,退出
}
job.Do()
case <-p.done:
return // 强制退出
}
}
}()
}
}-
jobs通道建议带缓冲(如make(chan Job, 1000)),避免生产者阻塞 - 每个 worker 必须用
defer p.wg.Done(),否则Wait()永远不返回 - 不要在 job.Do() 中 recover panic —— 应由上层统一处理,否则错误静默丢失
- 如果任务需返回结果,别用全局 map 存,改用
job.ResultChan 或回调函数
实际项目中容易被忽略的边界情况
真实场景下,worker pool 很少只跑“理想任务”。超时、重试、优先级、任务取消这些需求一加,逻辑复杂度指数上升。
立即学习“go语言免费学习笔记(深入)”;
- 任务带 context?必须把
ctx传进 job 结构体,并在Do()中 select 判断ctx.Done() - 需要动态扩缩容?别在运行时改 worker 数量,改用两级队列:先入内存队列,再由调度器按负载分发到不同 pool
- panic 发生在 job.Do() 中?外层要加
defer func(){ if r := recover(); r != nil { p.errCh - 日志打点缺失?每个 worker 应有唯一 ID(如
worker-3),否则并发日志无法归因
最麻烦的从来不是启动一堆 goroutine,而是让它们在出错、中断、扩容、监控全链路里都保持可观察、可终止、不丢任务。写完 Run() 只是开始,压测时看 goroutine 数是否稳定、pprof 查 block 链路、日志查 timeout 分布,这些才决定 pool 是否真可用。










