背景

Go 的并发模型是其最强大的特性之一。goroutine + channel 的组合让我们能以极低的成本构建高性能的并发系统。

今天聊聊 Pipeline 模式——一种将数据处理流程抽象为一系列阶段的编程范式。

什么是 Pipeline

想象工厂流水线:原料从一端进入,经过多个工序处理,最终成品从另一端出来。

func main() {
    // 生成数据
    data := generate(1, 2, 3, 4, 5)

    // 流水线:平方 -> 过滤偶数 -> 输出
    result := pipeline(data,
        square,
        filterEven,
        printResult,
    )

    <-result.done // 等待完成
}

实战:图片处理流水线

假设我们要处理一批图片:下载 → 缩放 → 添加水印 → 上传。

type Image struct {
    URL  string
    Data []byte
}

func ProcessImages(urls []string) error {
    downloads := make(chan Image, 100)
    resized := make(chan Image, 100)
    watermarked := make(chan Image, 100)

    var wg sync.WaitGroup

    // 下载阶段
    wg.Add(1)
    go func() {
        defer wg.Done()
        for _, url := range urls {
            img, err := download(url)
            if err != nil {
                log.Printf("下载失败: %v", err)
                continue
            }
            downloads <- img
        }
        close(downloads)
    }()

    // 缩放阶段 (3个worker)
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for img := range downloads {
                resizedImg, _ := resize(img, 800, 600)
                resized <- resizedImg
            }
        }()
    }

    // 水印阶段 (2个worker)
    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for img := range resized {
                watermarkedImg, _ := watermark(img, "© My Blog")
                watermarked <- watermarkedImg
            }
        }()
    }

    // 上传阶段
    wg.Add(1)
    go func() {
        defer wg.Done()
        for img := range watermarked {
            if err := upload(img); err != nil {
                log.Printf("上传失败: %v", err)
            }
        }
    }()

    wg.Wait()
    return nil
}

优雅的错误处理

Pipeline 中如何处理错误?一个不错的方案是用错误 channel

func ProcessWithErrors(urls []string) {
    data := generate(urls)
    result := make(chan Result)
    errs := make(chan error, 10)

    for item := range data {
        go func(item Item) {
            res, err := process(item)
            if err != nil {
                errs <- err
                return
            }
            result <- res
        }(item)
    }

    for r := range result {
        fmt.Println("结果:", r)
    }

    close(errs)
    for err := range errs {
        fmt.Println("错误:", err)
    }
}

总结

Pipeline 模式让复杂的数据处理流程变得清晰、可控、可扩展。

优点:

  • 解耦各处理阶段
  • 天然支持并发
  • 易于理解和维护

注意点:

  • 合理的 buffer 大小
  • 做好错误处理
  • 记得 close channel 或用 WaitGroup

好的架构不是一蹴而就的,是逐步迭代出来的。