必须用 sync.WaitGroup 等待或 channel + range + close() 控制退出,否则主 goroutine 提前结束导致数据丢失;管道串联时需各阶段独立启动 goroutine 并关闭输出 channel,下游用 for range 消费。

为什么直接用 goroutine 启动清洗函数会丢失数据
因为没有同步机制,主 goroutine 在子任务完成前就退出,导致 runtime.Goexit 或程序直接结束。常见现象是:输入 100 条数据,输出只有 0–3 条,且每次运行结果不一致。
- 必须用
sync.WaitGroup显式等待所有清洗 goroutine 结束 - 或改用 channel +
range配合close()控制消费端退出 - 避免在 goroutine 内直接向未缓冲的 channel 发送 —— 会永久阻塞(除非有接收者已就绪)
用管道(channel)串联清洗步骤时如何避免死锁
典型死锁场景:cleanStage1 向 stage2Ch 发送数据,但 cleanStage2 还没启动或已提前退出,发送方卡住;或者多个 stage 共用一个未关闭的 channel,range 永不退出。
- 每个清洗阶段应独立启 goroutine,并在处理完全部输入后
close(outputCh) - 下游 stage 必须用
for val := range inputCh,不能用for { select { case v := 无限循环 - 若某阶段需丢弃脏数据,仍要保证输出 channel 的发送次数与输入逻辑匹配,否则上游可能因阻塞而卡死
func cleanStage1(in <-chan string, out chan<- int) {
defer close(out)
for line := range in {
if len(line) == 0 { continue }
if n, err := strconv.Atoi(line); err == nil {
out <- n * 2 // 示例:数值翻倍
}
}
}goroutine 泄漏的三个高发点
并发清洗中 goroutine 不退出,内存持续上涨,最终 OOM。最常发生在错误的 channel 使用模式上。
- 向已关闭的 channel 发送数据 → panic,但若 recover 了却没终止 goroutine,它会继续空转
- 从无发送者的 channel 无限
→ goroutine 永久挂起,无法被调度器回收 - 用
time.After做超时但没配合selectdefault 分支,导致 channel 接收永远等下去
真实清洗流程中要不要加缓冲 channel
取决于清洗耗时是否波动大、各阶段吞吐是否均衡。不加缓冲不是错,但容易暴露性能瓶颈。
立即学习“go语言免费学习笔记(深入)”;
- IO 密集型清洗(如调外部 API 校验手机号)→ 建议
make(chan T, 100),防瞬时积压阻塞上游 - CPU 密集型(如正则替换、JSON 解析)→ 缓冲大小建议设为
runtime.NumCPU()的 2–4 倍,避免线程切换开销过大 - 绝对不要用
make(chan T, 0)(即无缓冲)串联多个 CPU 密集 stage,这等于强制串行化
缓冲太大也不行,比如设成 10000,可能把本该失败的脏数据全缓存住,延迟报错,掩盖数据质量问题。










