背景
Go 的并发模型是其最强大的特性之一。goroutine + channel 的组合让我们能以极低的成本构建高性能的并发系统。
今天聊聊 Pipeline 模式——一种将数据处理流程抽象为一系列阶段的编程范式。
什么是 Pipeline
想象工厂流水线:原料从一端进入,经过多个工序处理,最终成品从另一端出来。
func main() {
// 生成数据
data := generate(1, 2, 3, 4, 5)
// 流水线:平方 -> 过滤偶数 -> 输出
result := pipeline(data,
square,
filterEven,
printResult,
)
<-result.done // 等待完成
}
实战:图片处理流水线
假设我们要处理一批图片:下载 → 缩放 → 添加水印 → 上传。
type Image struct {
URL string
Data []byte
}
func ProcessImages(urls []string) error {
downloads := make(chan Image, 100)
resized := make(chan Image, 100)
watermarked := make(chan Image, 100)
var wg sync.WaitGroup
// 下载阶段
wg.Add(1)
go func() {
defer wg.Done()
for _, url := range urls {
img, err := download(url)
if err != nil {
log.Printf("下载失败: %v", err)
continue
}
downloads <- img
}
close(downloads)
}()
// 缩放阶段 (3个worker)
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for img := range downloads {
resizedImg, _ := resize(img, 800, 600)
resized <- resizedImg
}
}()
}
// 水印阶段 (2个worker)
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for img := range resized {
watermarkedImg, _ := watermark(img, "© My Blog")
watermarked <- watermarkedImg
}
}()
}
// 上传阶段
wg.Add(1)
go func() {
defer wg.Done()
for img := range watermarked {
if err := upload(img); err != nil {
log.Printf("上传失败: %v", err)
}
}
}()
wg.Wait()
return nil
}
优雅的错误处理
Pipeline 中如何处理错误?一个不错的方案是用错误 channel:
func ProcessWithErrors(urls []string) {
data := generate(urls)
result := make(chan Result)
errs := make(chan error, 10)
for item := range data {
go func(item Item) {
res, err := process(item)
if err != nil {
errs <- err
return
}
result <- res
}(item)
}
for r := range result {
fmt.Println("结果:", r)
}
close(errs)
for err := range errs {
fmt.Println("错误:", err)
}
}
总结
Pipeline 模式让复杂的数据处理流程变得清晰、可控、可扩展。
优点:
- 解耦各处理阶段
- 天然支持并发
- 易于理解和维护
注意点:
- 合理的 buffer 大小
- 做好错误处理
- 记得 close channel 或用 WaitGroup
好的架构不是一蹴而就的,是逐步迭代出来的。