直接用go启动大量goroutine易崩,因内存暴涨、调度器过载、OOM;worker pool通过固定worker数、任务队列、复用执行者实现可控并发,50行内可构建生产级池子。

为什么直接用 go 启动大量 goroutine 容易崩
不是 goroutine 本身贵,而是没节制地创建会导致内存暴涨、调度器过载、甚至触发 OOM。比如读取 10 万行日志并逐行解析,如果每行都 go parseLine(line),瞬间可能起 10 万个 goroutine,而多数任务实际是 I/O 等待或 CPU 轻量计算,根本不需要这么多并发单元。
真正需要的是可控的并发度 + 任务排队 + 复用执行者。这正是 worker pool 的核心价值:用固定数量的长期运行 goroutine 消费任务队列,避免资源抖动。
- 默认 goroutine 栈初始仅 2KB,但频繁创建/销毁仍带来调度开销
- 无缓冲 channel 作为任务队列时,发送方会阻塞,天然实现背压
- worker 数量通常设为
runtime.NumCPU()或略高(如 ×1.5),而非硬写死 100
用 chan + for range 实现最简 worker pool
不依赖第三方库,50 行内可搭出生产可用的池子。关键在于任务通道类型定义、worker 死循环消费、以及主协程关闭信号的传递方式。
下面是最小可行示例,支持优雅关闭:
立即学习“go语言免费学习笔记(深入)”;
type Task func()
type WorkerPool struct {
tasks chan Task
done chan struct{}
}
func NewWorkerPool(workerCount int) *WorkerPool {
return &WorkerPool{
tasks: make(chan Task, 100), // 缓冲区防主协程阻塞
done: make(chan struct{}),
}
}
func (wp *WorkerPool) Start() {
for i := 0; i < cap(wp.tasks); i++ {
go wp.worker()
}
}
func (wp *WorkerPool) worker() {
for {
select {
case task := <-wp.tasks:
task()
case <-wp.done:
return
}
}
}
func (wp *WorkerPool) Submit(task Task) {
wp.tasks <- task
}
func (wp *WorkerPool) Shutdown() {
close(wp.done)
}
注意:cap(wp.tasks) 是通道容量,不是 worker 数量 —— 这里故意写错来提醒你:worker 数量应独立传入,别和通道容量混用。
sync.WaitGroup 和 context.Context 哪个更适合控制生命周期
WaitGroup 只解决“等所有任务结束”,不解决“中途取消”;Context 支持取消、超时、值传递,但需每个 worker 主动检查 ctx.Done()。二者常组合使用。
- 纯批处理(如导入 CSV):用
WaitGroup+ 关闭 channel 就够了 - 带用户中断或服务级超时(如 HTTP handler 中调用):必须用
context.WithTimeout,并在每个task内部检查ctx.Err() - 不要在 worker 内部用
select同时监听tasks和ctx.Done()—— 这会导致任务丢失;应在Submit时就拒绝已取消 context 的任务
任务分组的实际约束:如何让同组任务串行执行
常见需求:用户 A 的 10 个订单更新必须按顺序处理,但用户 B 的订单可与 A 并发。这不是靠加锁能解的 —— 锁会全局串行化,违背并发初衷。
正确做法是哈希分组 + 每组独占一个 channel:
- 对 group key(如
userID)做hash % poolSize,映射到固定 worker 子集 - 更稳妥是为每组建独立 channel + 单 worker,用
map[string]chan Task管理,配合sync.RWMutex读写 - 注意 map 并发写 panic:初始化分组 channel 必须在首次提交时原子完成,或预热所有可能的 key
分组逻辑一旦写死,就很难动态扩缩容。线上遇到热点 key(如大 V 用户请求暴增),单 worker 会成为瓶颈,此时需要二级分片或降级为异步重试。










