C++ 无锁队列:从 CAS 到内存序

背景 只要做过高并发服务、游戏引擎或者低延迟组件,迟早会碰到一个问题:锁太重了。 典型场景包括: 生产者线程持续推消息 消费者线程高频拉取任务 临界区很短,但锁竞争很激烈 延迟指标对尾部抖动非常敏感 这时候很多人第一反应是“上无锁队列”。 方向没错,但无锁代码最危险的地方在于:看起来能跑,不代表一定正确。 尤其在 C++ 里,只会用 compare_exchange_weak 还不够,真正决定正确性的往往是内存序。 无锁不等于没有同步 先澄清一个常见误区: mutex 是同步 原子变量也是同步 无锁结构只是把同步方式从“阻塞锁”换成了“原子操作 + 内存可见性约束”。 也就是说,你不是不需要同步了,而是需要更精确地控制同步。 一个最简单的 SPSC 环形队列 先从单生产者、单消费者模型说起。这个模型更适合作为理解内存序的起点。 #include <atomic> #include <array> #include <cstddef> template <typename T, std::size_t N> class SpscQueue { public: bool push(const T& value) { const auto tail = tail_.load(std::memory_order_relaxed); const auto next = (tail + 1) % N; if (next == head_.load(std::memory_order_acquire)) { return false; } buffer_[tail] = value; tail_.store(next, std::memory_order_release); return true; } bool pop(T& value) { const auto head = head_.load(std::memory_order_relaxed); if (head == tail_.load(std::memory_order_acquire)) { return false; } value = buffer_[head]; head_.store((head + 1) % N, std::memory_order_release); return true; } private: std::array<T, N> buffer_{}; std::atomic<std::size_t> head_{0}; std::atomic<std::size_t> tail_{0}; }; 这个实现不复杂,但已经体现了两个关键点: ...

2026年4月16日 · 2 分钟 · BvBeJ

Rust Tokio 背压控制:异步系统别只会拼命 spawn

背景 刚开始写 Tokio 程序时,很多人都会觉得异步特别轻: 一个请求一个 task 来一个任务就 tokio::spawn channel 一接就处理 代码看起来很流畅,吞吐也不错。 但一到高负载场景,问题很快就出来了: 任务堆积越来越多 内存不断上涨 下游数据库或 HTTP 依赖被打爆 延迟从毫秒飙到秒级 这时候根问题通常不是 Tokio 不够快,而是系统没有背压。 什么是背压 背压的本质是:当下游处理不过来时,上游必须感知并减速。 如果没有这层机制,异步系统就很容易变成“把问题排队排到内存里”。 一个最典型的错误写法: loop { let job = accept_job().await; tokio::spawn(async move { process_job(job).await; }); } 这段代码的意思其实是: 来多少任务都收 能不能处理完以后再说 如果生产速度持续高于消费速度,系统一定会失控。 最简单的背压:有界 channel 相比无脑 spawn,更稳妥的起点通常是 bounded channel。 use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel::<Job>(1024); tokio::spawn(async move { while let Some(job) = rx.recv().await { process_job(job).await; } }); loop { let job = accept_job().await; if tx.send(job).await.is_err() { break; } } } struct Job; async fn accept_job() -> Job { Job } async fn process_job(_job: Job) {} mpsc::channel(1024) 的关键不是“1024 这个数字”,而是它有上限。 ...

2026年4月16日 · 2 分钟 · BvBeJ

Go 并发模式:Pipeline 实战

背景 Go 的并发模型是其最强大的特性之一。goroutine + channel 的组合让我们能以极低的成本构建高性能的并发系统。 今天聊聊 Pipeline 模式——一种将数据处理流程抽象为一系列阶段的编程范式。 什么是 Pipeline 想象工厂流水线:原料从一端进入,经过多个工序处理,最终成品从另一端出来。 func main() { // 生成数据 data := generate(1, 2, 3, 4, 5) // 流水线:平方 -> 过滤偶数 -> 输出 result := pipeline(data, square, filterEven, printResult, ) <-result.done // 等待完成 } 实战:图片处理流水线 假设我们要处理一批图片:下载 → 缩放 → 添加水印 → 上传。 type Image struct { URL string Data []byte } func ProcessImages(urls []string) error { downloads := make(chan Image, 100) resized := make(chan Image, 100) watermarked := make(chan Image, 100) var wg sync.WaitGroup // 下载阶段 wg.Add(1) go func() { defer wg.Done() for _, url := range urls { img, err := download(url) if err != nil { log.Printf("下载失败: %v", err) continue } downloads <- img } close(downloads) }() // 缩放阶段 (3个worker) for i := 0; i < 3; i++ { wg.Add(1) go func() { defer wg.Done() for img := range downloads { resizedImg, _ := resize(img, 800, 600) resized <- resizedImg } }() } // 水印阶段 (2个worker) for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() for img := range resized { watermarkedImg, _ := watermark(img, "© My Blog") watermarked <- watermarkedImg } }() } // 上传阶段 wg.Add(1) go func() { defer wg.Done() for img := range watermarked { if err := upload(img); err != nil { log.Printf("上传失败: %v", err) } } }() wg.Wait() return nil } 优雅的错误处理 Pipeline 中如何处理错误?一个不错的方案是用错误 channel: ...

2026年4月10日 · 2 分钟 · BvBeJ