背景

刚开始写 Tokio 程序时,很多人都会觉得异步特别轻:

  • 一个请求一个 task
  • 来一个任务就 tokio::spawn
  • channel 一接就处理

代码看起来很流畅,吞吐也不错。

但一到高负载场景,问题很快就出来了:

  • 任务堆积越来越多
  • 内存不断上涨
  • 下游数据库或 HTTP 依赖被打爆
  • 延迟从毫秒飙到秒级

这时候根问题通常不是 Tokio 不够快,而是系统没有背压。

什么是背压

背压的本质是:当下游处理不过来时,上游必须感知并减速。

如果没有这层机制,异步系统就很容易变成“把问题排队排到内存里”。

一个最典型的错误写法:

loop {
    let job = accept_job().await;

    tokio::spawn(async move {
        process_job(job).await;
    });
}

这段代码的意思其实是:

  • 来多少任务都收
  • 能不能处理完以后再说

如果生产速度持续高于消费速度,系统一定会失控。

最简单的背压:有界 channel

相比无脑 spawn,更稳妥的起点通常是 bounded channel。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<Job>(1024);

    tokio::spawn(async move {
        while let Some(job) = rx.recv().await {
            process_job(job).await;
        }
    });

    loop {
        let job = accept_job().await;
        if tx.send(job).await.is_err() {
            break;
        }
    }
}

struct Job;

async fn accept_job() -> Job {
    Job
}

async fn process_job(_job: Job) {}

mpsc::channel(1024) 的关键不是“1024 这个数字”,而是它有上限。

队列满了以后,send().await 会阻塞,上游自然就被减速了。这就是最直接的背压传播。

只做排队不够,还要限制并发数

有界队列解决的是“无限堆积”,但没解决“同时打下游太猛”的问题。

比如你从 channel 收到任务后,又每个都 spawn 出去,那数据库照样会被瞬时打满。

这时候需要再加一层并发限制。

use std::sync::Arc;
use tokio::sync::{mpsc, Semaphore};

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel::<Job>(1024);
    let semaphore = Arc::new(Semaphore::new(64));

    tokio::spawn(async move {
        while let Some(job) = rx.recv().await {
            let permit = semaphore.clone().acquire_owned().await.unwrap();

            tokio::spawn(async move {
                let _permit = permit;
                process_job(job).await;
            });
        }
    });

    loop {
        let job = accept_job().await;
        if tx.send(job).await.is_err() {
            break;
        }
    }
}

这样即使队列里积压了很多任务,同时运行的处理任务也不会超过 64 个。

更现实的策略:过载时拒绝,而不是永远等

有些业务不能无限等待。

例如:

  • 在线接口请求
  • 用户实时操作
  • 消息消费有明确超时要求

如果系统已经过载,继续排队只会让用户等得更久,最终还是失败。这时候直接拒绝反而更诚实。

use tokio::sync::mpsc;

fn try_enqueue(tx: &mpsc::Sender<Job>, job: Job) -> Result<(), &'static str> {
    tx.try_send(job).map_err(|_| "queue is full")
}

这种模式很适合对延迟敏感的服务:

  • 队列有余量就收
  • 队列满了就快速失败
  • 客户端或上游决定是否重试

这比把请求闷在系统里拖 20 秒再超时,通常更可控。

下游依赖要单独限流

异步系统里还有一个常见误区:整个服务总并发限制住了,就以为安全了。

实际上不同下游的承受能力可能完全不同:

  • Redis 能扛 5000 并发
  • PostgreSQL 连接池可能只能扛 50
  • 第三方 HTTP 接口可能只能接受 10 个并发请求

这时最好按依赖类型做独立限制。

struct DependencyLimiter {
    db: Arc<Semaphore>,
    cache: Arc<Semaphore>,
    payment_api: Arc<Semaphore>,
}

impl DependencyLimiter {
    fn new() -> Self {
        Self {
            db: Arc::new(Semaphore::new(32)),
            cache: Arc::new(Semaphore::new(256)),
            payment_api: Arc::new(Semaphore::new(8)),
        }
    }
}

这比一个全局并发阈值更贴近真实系统约束。

背压设计的几个观察指标

如果你已经做了背压,建议再把这些指标打出来:

  • 队列当前长度
  • 队列满导致的拒绝次数
  • 正在执行的任务数
  • 等待 semaphore 的耗时
  • 下游超时和错误率

不然你只知道系统“变慢了”,但不知道是排队太多、并发太高,还是下游本身已经退化。

常见坑点

1. channel 设成无界

无界队列短期看起来省事,长期几乎一定会把压力转成内存问题。

2. 看到异步就默认能无限扩展

异步只是在减少阻塞等待,不是在消灭容量上限。

3. 没有拒绝策略

系统过载时如果只会一直排队,最终用户体验通常更差。

4. 只限服务,不限依赖

服务自身稳定了,不代表数据库、缓存、第三方接口也稳定。

总结

Tokio 很强,但异步系统的稳定性,关键不在于你能起多少任务,而在于你如何面对处理不过来的时刻。

一个成熟的背压设计,至少要回答这几个问题:

  • 队列能积压多少
  • 同时允许多少任务运行
  • 队列满了怎么办
  • 哪些下游需要单独保护

这些问题没有标准答案,但不回答它们,系统迟早会在高峰流量下替你回答。


异步让系统更高效,背压才让系统在高负载下依然像个系统。