背景 Go 的并发模型是其最强大的特性之一。goroutine + channel 的组合让我们能以极低的成本构建高性能的并发系统。
今天聊聊 Pipeline 模式——一种将数据处理流程抽象为一系列阶段的编程范式。
什么是 Pipeline 想象工厂流水线:原料从一端进入,经过多个工序处理,最终成品从另一端出来。
func main() { // 生成数据 data := generate(1, 2, 3, 4, 5) // 流水线:平方 -> 过滤偶数 -> 输出 result := pipeline(data, square, filterEven, printResult, ) <-result.done // 等待完成 } 实战:图片处理流水线 假设我们要处理一批图片:下载 → 缩放 → 添加水印 → 上传。
type Image struct { URL string Data []byte } func ProcessImages(urls []string) error { downloads := make(chan Image, 100) resized := make(chan Image, 100) watermarked := make(chan Image, 100) var wg sync.WaitGroup // 下载阶段 wg.Add(1) go func() { defer wg.Done() for _, url := range urls { img, err := download(url) if err != nil { log.Printf("下载失败: %v", err) continue } downloads <- img } close(downloads) }() // 缩放阶段 (3个worker) for i := 0; i < 3; i++ { wg.Add(1) go func() { defer wg.Done() for img := range downloads { resizedImg, _ := resize(img, 800, 600) resized <- resizedImg } }() } // 水印阶段 (2个worker) for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() for img := range resized { watermarkedImg, _ := watermark(img, "© My Blog") watermarked <- watermarkedImg } }() } // 上传阶段 wg.Add(1) go func() { defer wg.Done() for img := range watermarked { if err := upload(img); err != nil { log.Printf("上传失败: %v", err) } } }() wg.Wait() return nil } 优雅的错误处理 Pipeline 中如何处理错误?一个不错的方案是用错误 channel:
...