管道模式利用goroutine和channel实现数据的多阶段处理,适用于ETL、图像处理等场景。示例中通过gen生成数据、square计算平方,最后消费结果,形成“生产-传输-消费”流程。可扩展为多阶段,并通过扇出(多个worker并行)和扇入(合并结果)提升性能。使用定向channel增强类型安全,合理使用buffer和context控制生命周期,避免goroutine泄漏。

在Go语言中,channel 和 goroutine 是并发编程的核心。将它们结合使用可以轻松实现管道模式(Pipeline Pattern),这是一种将数据流经多个处理阶段的设计方式,每个阶段由一个或多个goroutine执行,通过channel传递数据。
什么是管道模式
管道模式将任务拆分为多个连续的处理阶段,前一阶段的输出作为下一阶段的输入。这种模式适合数据处理、ETL流程、图像处理等场景。Go中的channel天然支持这种“生产-传输-消费”结构。
基本结构:三步构建管道
一个典型的管道包含三个部分:生成数据、处理数据、消费结果。每个部分运行在独立的goroutine中,用channel连接。
示例:整数平方管道
立即学习“go语言免费学习笔记(深入)”;
package main
import "fmt"
// 阶段1:生成数字
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 阶段2:计算平方
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 阶段3:消费并打印结果
func main() {
// 构建管道:gen → square → print
numbers := gen(2, 3, 4, 5)
squares := square(numbers)
for result := range squares {
fmt.Println(result)
}
}多阶段与扇出/扇入优化
复杂管道可包含多个处理阶段,也可通过“扇出”提升性能(多个goroutine并行处理),再“扇入”合并结果。
示例:扇出+扇入
// 扇出:启动多个worker并行处理
func merge(cs []<-chan int) <-chan int {
var inputs []<-chan int
for _, c := range cs {
inputs = append(inputs, c)
}
out := make(chan int)
go func() {
defer close(out)
for _, c := range inputs {
for val := range c {
out <- val
}
}
}()
return out
}
// 使用多个square worker
workers := 3
var chans []<-chan int
for i := 0; i < workers; i++ {
chans = append(chans, square(numbers))
}
merged := merge(chans)注意事项与最佳实践
- 始终关闭发送端的channel,避免接收方死锁
- 使用
<-chan T
和chan<- T
限定channel方向,提高类型安全 - 合理设置buffered channel大小,平衡性能与内存
- 配合
context.Context
实现超时或取消控制 - 避免goroutine泄漏:确保所有goroutine能正常退出
基本上就这些。管道模式利用Go的并发原语,让数据流动清晰自然,代码易于理解与扩展。










