背景
刚开始写 Tokio 程序时,很多人都会觉得异步特别轻:
- 一个请求一个 task
- 来一个任务就
tokio::spawn - channel 一接就处理
代码看起来很流畅,吞吐也不错。
但一到高负载场景,问题很快就出来了:
- 任务堆积越来越多
- 内存不断上涨
- 下游数据库或 HTTP 依赖被打爆
- 延迟从毫秒飙到秒级
这时候根问题通常不是 Tokio 不够快,而是系统没有背压。
什么是背压
背压的本质是:当下游处理不过来时,上游必须感知并减速。
如果没有这层机制,异步系统就很容易变成“把问题排队排到内存里”。
一个最典型的错误写法:
loop {
let job = accept_job().await;
tokio::spawn(async move {
process_job(job).await;
});
}
这段代码的意思其实是:
- 来多少任务都收
- 能不能处理完以后再说
如果生产速度持续高于消费速度,系统一定会失控。
最简单的背压:有界 channel
相比无脑 spawn,更稳妥的起点通常是 bounded channel。
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Job>(1024);
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
process_job(job).await;
}
});
loop {
let job = accept_job().await;
if tx.send(job).await.is_err() {
break;
}
}
}
struct Job;
async fn accept_job() -> Job {
Job
}
async fn process_job(_job: Job) {}
mpsc::channel(1024) 的关键不是“1024 这个数字”,而是它有上限。
队列满了以后,send().await 会阻塞,上游自然就被减速了。这就是最直接的背压传播。
只做排队不够,还要限制并发数
有界队列解决的是“无限堆积”,但没解决“同时打下游太猛”的问题。
比如你从 channel 收到任务后,又每个都 spawn 出去,那数据库照样会被瞬时打满。
这时候需要再加一层并发限制。
use std::sync::Arc;
use tokio::sync::{mpsc, Semaphore};
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel::<Job>(1024);
let semaphore = Arc::new(Semaphore::new(64));
tokio::spawn(async move {
while let Some(job) = rx.recv().await {
let permit = semaphore.clone().acquire_owned().await.unwrap();
tokio::spawn(async move {
let _permit = permit;
process_job(job).await;
});
}
});
loop {
let job = accept_job().await;
if tx.send(job).await.is_err() {
break;
}
}
}
这样即使队列里积压了很多任务,同时运行的处理任务也不会超过 64 个。
更现实的策略:过载时拒绝,而不是永远等
有些业务不能无限等待。
例如:
- 在线接口请求
- 用户实时操作
- 消息消费有明确超时要求
如果系统已经过载,继续排队只会让用户等得更久,最终还是失败。这时候直接拒绝反而更诚实。
use tokio::sync::mpsc;
fn try_enqueue(tx: &mpsc::Sender<Job>, job: Job) -> Result<(), &'static str> {
tx.try_send(job).map_err(|_| "queue is full")
}
这种模式很适合对延迟敏感的服务:
- 队列有余量就收
- 队列满了就快速失败
- 客户端或上游决定是否重试
这比把请求闷在系统里拖 20 秒再超时,通常更可控。
下游依赖要单独限流
异步系统里还有一个常见误区:整个服务总并发限制住了,就以为安全了。
实际上不同下游的承受能力可能完全不同:
- Redis 能扛 5000 并发
- PostgreSQL 连接池可能只能扛 50
- 第三方 HTTP 接口可能只能接受 10 个并发请求
这时最好按依赖类型做独立限制。
struct DependencyLimiter {
db: Arc<Semaphore>,
cache: Arc<Semaphore>,
payment_api: Arc<Semaphore>,
}
impl DependencyLimiter {
fn new() -> Self {
Self {
db: Arc::new(Semaphore::new(32)),
cache: Arc::new(Semaphore::new(256)),
payment_api: Arc::new(Semaphore::new(8)),
}
}
}
这比一个全局并发阈值更贴近真实系统约束。
背压设计的几个观察指标
如果你已经做了背压,建议再把这些指标打出来:
- 队列当前长度
- 队列满导致的拒绝次数
- 正在执行的任务数
- 等待 semaphore 的耗时
- 下游超时和错误率
不然你只知道系统“变慢了”,但不知道是排队太多、并发太高,还是下游本身已经退化。
常见坑点
1. channel 设成无界
无界队列短期看起来省事,长期几乎一定会把压力转成内存问题。
2. 看到异步就默认能无限扩展
异步只是在减少阻塞等待,不是在消灭容量上限。
3. 没有拒绝策略
系统过载时如果只会一直排队,最终用户体验通常更差。
4. 只限服务,不限依赖
服务自身稳定了,不代表数据库、缓存、第三方接口也稳定。
总结
Tokio 很强,但异步系统的稳定性,关键不在于你能起多少任务,而在于你如何面对处理不过来的时刻。
一个成熟的背压设计,至少要回答这几个问题:
- 队列能积压多少
- 同时允许多少任务运行
- 队列满了怎么办
- 哪些下游需要单独保护
这些问题没有标准答案,但不回答它们,系统迟早会在高峰流量下替你回答。
异步让系统更高效,背压才让系统在高负载下依然像个系统。