Rust Async 并发编程:处理任意数量的 Future
1. 使用 join!
处理多个未知数量的 Future
当我们只需要等待两个或三个异步任务完成时,可以使用 trpl::join
、trpl::join3
或更通用的 join!
宏:
join!(fut1, fut2, fut3);
它们可以并行地等待多个异步任务完成,并将结果打包成一个元组。这对于固定数量的任务非常方便。然而,如果需要处理的是动态数量的 Future,则无法简单地在编译期写下所有的 Future。
2. join_all
与装箱的 Future
在真实场景中,常常需要将若干 Future
保存在一个集合中,然后等待它们全部结束。比如:
use trpl::{join_all};
use std::future::Future;
let futures = vec![
future_a(),
future_b(),
future_c()
];
join_all(futures).await;
这种写法在某些情况下可能报错,原因是:
- 匿名 Future:使用
async
块创建的 Future 类型是匿名的,每个async
块都对应独特的编译器生成类型,编译器无法把它们看作同一种类型。 - Rust 需要在编译期就确定
Vec
里元素类型一致,因此需要 “装箱(Box)” + “类型擦除(Trait Object)”。 - 另外,默认情况下,这些匿名 Future 不一定实现
Unpin
,会导致在使用join_all
时出现编译错误。
2.1 解决方案:Pin
+ Box::pin
要让不同的匿名 Future 能放进同一个 Vec
里,可以将它们转换为 Pin<Box<dyn Future<Output = ()>>>
,类似:
use std::{pin::Pin, future::Future};
let fut_a = Box::pin(async move {
// ...
});
let fut_b = Box::pin(async move {
// ...
});
let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![fut_a, fut_b];
trpl::join_all(futures).await;
这样做可以:
- 用
Pin<Box<dyn Future<Output = T>>>
来进行类型擦除,将所有匿名的 Future 看作相同的 “Future trait object”。 Box::pin
使得我们可以安全地 “固定(Pin)” 该 Future,避免它在内存中移动,也满足了join_all
对Unpin
的约束。
2.2 进一步优化:pin!
宏
如果想避免在堆上分配,可以使用标准库的宏 pin!
(或者类似的工具)对每一个 Future 进行 “固定(Pin)”,再将其装入集合:
use std::{pin::pin, future::Future};
let fut_a = pin!(async move {
// ...
});
let fut_b = pin!(async move {
// ...
});
let futures: Vec<Pin<&mut (dyn Future<Output = ()>)>> = vec![fut_a, fut_b];
trpl::join_all(futures).await;
原理类似,只是避免了堆分配,在一些场景会有更好的性能。
3. “竞赛”执行:race
如果我们只需让其中 任意一个 Future 先完成就返回结果,而不是等待全部完成,可以使用 trpl::race
之类的函数:
let slow = async {
println!("Slow future started.");
sleep(Duration::from_secs(2)).await;
"slow"
};
let fast = async {
println!("Fast future started.");
sleep(Duration::from_millis(500)).await;
"fast"
};
match trpl::race(slow, fast).await {
Either::Left(val) => println!("Slow finished first: {}", val),
Either::Right(val) => println!("Fast finished first: {}", val),
}
这里的 race
会并行运行两个 Future,并在第一个完成的那一刻立即返回。如果要同时处理多个并且只关心先结束的一个,则可以使用嵌套的方式,或在外部用别的抽象工具。
4. yield_now
与手动让出执行权
在某些计算密集(CPU-bound)的场景下,我们可能需要在 async
块中显式地让出执行权:
- 如果使用
std::thread::sleep
会阻塞线程,这并不是异步。 - 如果使用
trpl::sleep
,实际上是让 runtime 进行定时器管理,但这会导致最小粒度往往是毫秒级。 - 如果只想给其他任务一个公平的机会,可以使用
yield_now
立即返回给 runtime,由 runtime 去调度其他任务。
例如:
use trpl::yield_now;
let fut = async {
do_some_cpu_work();
yield_now().await; // 主动让出
do_more_cpu_work();
};
这样可以让其他任务在此时机获得执行权,从而提升系统整体的并发性。
5. timeout
:组合异步工具构建更复杂逻辑
异步编程的优势在于可以方便地组合不同的异步构件来实现更高阶的功能,比如 timeout
:
- 启动一个任务
future_to_try
。 - 另一个任务启动一个
timer
,在设定时长后完成。 - 用
race
同时等待两个任务:如果future_to_try
先完成则成功,否则超时。
伪代码如下:
async fn timeout<F, T>(future_to_try: F, max_time: Duration)
-> Result<T, Duration>
where
F: Future<Output = T>,
{
let timer = trpl::sleep(max_time);
match trpl::race(future_to_try, timer).await {
Either::Left(output) => Ok(output),
Either::Right(_) => Err(max_time),
}
}
通过这类组合技术,我们能够在保证高并发的同时灵活地处理超时、重试、限流等实际需求。
6.总结
join!
和join_all
:分别在编译期或运行期管理多个 Future;前者适合已知固定数量,后者处理动态数量。- 使用
Pin
及Box::pin
:解决匿名 Future 的类型擦除及移动问题。 race
适合只关心先结束的异步任务。yield_now
可在 CPU 密集任务中主动让出执行权,提高系统并发度。- 自定义
timeout
等高阶抽象:通过异步组合实现复杂并发需求。
在实际开发中,选择何种并发模式需要取决于具体场景。Rust 提供了从语言层级到运行时的全栈异步能力,结合社区的强大库生态,让我们能够用更少的心智负担来编写高性能的异步并发代码。