Worker Pool通过固定数量的goroutine从任务队列中取任务执行,避免频繁创建goroutine,控制并发量,提升系统吞吐量与稳定性。

在高并发场景下,Golang 中直接为每个任务启动一个 goroutine 会带来调度开销和资源竞争问题。使用 Worker Pool(工作池)模式可以有效控制并发数量,复用 goroutine,从而提高系统的吞吐量和稳定性。
Worker Pool 是什么?
Worker Pool 模式通过预先创建一组固定数量的 worker(工作协程),从一个任务队列中不断读取任务并执行。这种方式避免了频繁创建和销毁 goroutine 的开销,同时能限制最大并发数,防止系统资源被耗尽。
典型结构包括:
- 一个任务 channel,用于接收待处理的任务
- 一组固定数量的 worker goroutine,监听任务 channel
- 一个结果 channel(可选),用于收集执行结果
如何实现一个简单的 Worker Pool
以下是一个基础但实用的 Worker Pool 实现示例:
立即学习“go语言免费学习笔记(深入)”;
func main() {
taskCh := make(chan func(), 100)
workers := 4
// 启动 worker 池
for i := 0; i
go func() {
for task := range taskCh {
task()
}
}()
}
// 提交任务
for i := 0; i
num := i
taskCh
time.Sleep(500 time.Millisecond)
fmt.Printf("Task %d done by %s\n", num, time.Now().Format("15:04:05"))
}
}
close(taskCh)
// 等待所有 worker 结束(生产环境建议用 sync.WaitGroup)
time.Sleep(2 time.Second)
}
这个例子中,我们创建了 4 个 worker,它们共享同一个任务 channel。任务以函数形式提交,worker 取出后立即执行。
优化:带缓冲和等待机制的 Worker Pool
为了更安全地管理生命周期,可以引入 sync.WaitGroup 来确保所有任务完成后再退出:
type WorkerPool struct {
taskCh chan func()
workers int
wg sync.WaitGroup
}
func NewWorkerPool(workers, queueSize int) WorkerPool {
return &WorkerPool{
taskCh: make(chan func(), queueSize),
workers: workers,
}
}
func (wp WorkerPool) Start() {
for i := 0; i
wp.wg.Add(1)
go func() {
defer wp.wg.Done()
for task := range wp.taskCh {
task()
}
}()
}
}
func (wp WorkerPool) Submit(task func()) {
wp.taskCh
}
func (wp WorkerPool) Stop() {
close(wp.taskCh)
wp.wg.Wait()
}
使用方式:
pool := NewWorkerPool(4, 100)pool.Start()
for i := 0; i i := i
pool.Submit(func() {
time.Sleep(300 * time.Millisecond)
fmt.Printf("处理任务 %d\n", i)
})
}
pool.Stop()
适用场景与性能提升点
Worker Pool 特别适合以下场景:
- I/O 密集型任务,如 HTTP 请求、文件读写、数据库操作
- 大量短时任务需要并发处理
- 需要控制资源使用上限,避免系统过载
带来的性能优势包括:
- 减少 goroutine 创建/销毁开销
- 降低调度器压力
- 防止因并发过高导致内存溢出或连接数超限
- 更容易做速率控制和监控
基本上就这些。合理设置 worker 数量(通常参考 CPU 核心数或 I/O 延迟特性),配合适当的任务队列长度,Worker Pool 能显著提升 Golang 程序的吞吐能力和稳定性。










