当前位置: 首页 > article >正文

Rust Async 并发编程:处理任意数量的 Future

1. 使用 join! 处理多个未知数量的 Future

当我们只需要等待两个或三个异步任务完成时,可以使用 trpl::jointrpl::join3 或更通用的 join! 宏:

join!(fut1, fut2, fut3);

它们可以并行地等待多个异步任务完成,并将结果打包成一个元组。这对于固定数量的任务非常方便。然而,如果需要处理的是动态数量的 Future,则无法简单地在编译期写下所有的 Future。

2. join_all 与装箱的 Future

在真实场景中,常常需要将若干 Future 保存在一个集合中,然后等待它们全部结束。比如:

use trpl::{join_all};
use std::future::Future;

let futures = vec![
    future_a(),
    future_b(),
    future_c()
];

join_all(futures).await;

这种写法在某些情况下可能报错,原因是:

  • 匿名 Future:使用 async 块创建的 Future 类型是匿名的,每个 async 块都对应独特的编译器生成类型,编译器无法把它们看作同一种类型。
  • Rust 需要在编译期就确定 Vec 里元素类型一致,因此需要 “装箱(Box)” + “类型擦除(Trait Object)”。
  • 另外,默认情况下,这些匿名 Future 不一定实现 Unpin,会导致在使用 join_all 时出现编译错误。

2.1 解决方案:Pin + Box::pin

要让不同的匿名 Future 能放进同一个 Vec 里,可以将它们转换为 Pin<Box<dyn Future<Output = ()>>>,类似:

use std::{pin::Pin, future::Future};

let fut_a = Box::pin(async move {
    // ...
});

let fut_b = Box::pin(async move {
    // ...
});

let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![fut_a, fut_b];

trpl::join_all(futures).await;

这样做可以:

  • Pin<Box<dyn Future<Output = T>>> 来进行类型擦除,将所有匿名的 Future 看作相同的 “Future trait object”。
  • Box::pin 使得我们可以安全地 “固定(Pin)” 该 Future,避免它在内存中移动,也满足了 join_allUnpin 的约束。

2.2 进一步优化:pin!

如果想避免在堆上分配,可以使用标准库的宏 pin!(或者类似的工具)对每一个 Future 进行 “固定(Pin)”,再将其装入集合:

use std::{pin::pin, future::Future};

let fut_a = pin!(async move {
    // ...
});

let fut_b = pin!(async move {
    // ...
});

let futures: Vec<Pin<&mut (dyn Future<Output = ()>)>> = vec![fut_a, fut_b];

trpl::join_all(futures).await;

原理类似,只是避免了堆分配,在一些场景会有更好的性能。

3. “竞赛”执行:race

如果我们只需让其中 任意一个 Future 先完成就返回结果,而不是等待全部完成,可以使用 trpl::race 之类的函数:

let slow = async {
    println!("Slow future started.");
    sleep(Duration::from_secs(2)).await;
    "slow"
};

let fast = async {
    println!("Fast future started.");
    sleep(Duration::from_millis(500)).await;
    "fast"
};

match trpl::race(slow, fast).await {
    Either::Left(val) => println!("Slow finished first: {}", val),
    Either::Right(val) => println!("Fast finished first: {}", val),
}

这里的 race 会并行运行两个 Future,并在第一个完成的那一刻立即返回。如果要同时处理多个并且只关心先结束的一个,则可以使用嵌套的方式,或在外部用别的抽象工具。


4. yield_now 与手动让出执行权

在某些计算密集(CPU-bound)的场景下,我们可能需要在 async 块中显式地让出执行权:

  • 如果使用 std::thread::sleep 会阻塞线程,这并不是异步。
  • 如果使用 trpl::sleep,实际上是让 runtime 进行定时器管理,但这会导致最小粒度往往是毫秒级。
  • 如果只想给其他任务一个公平的机会,可以使用 yield_now 立即返回给 runtime,由 runtime 去调度其他任务。

例如:

use trpl::yield_now;

let fut = async {
    do_some_cpu_work();
    yield_now().await; // 主动让出
    do_more_cpu_work();
};

这样可以让其他任务在此时机获得执行权,从而提升系统整体的并发性。


5. timeout:组合异步工具构建更复杂逻辑

异步编程的优势在于可以方便地组合不同的异步构件来实现更高阶的功能,比如 timeout

  1. 启动一个任务 future_to_try
  2. 另一个任务启动一个 timer,在设定时长后完成。
  3. race 同时等待两个任务:如果 future_to_try 先完成则成功,否则超时。

伪代码如下:

async fn timeout<F, T>(future_to_try: F, max_time: Duration)
    -> Result<T, Duration>
where
    F: Future<Output = T>,
{
    let timer = trpl::sleep(max_time);
    match trpl::race(future_to_try, timer).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}

通过这类组合技术,我们能够在保证高并发的同时灵活地处理超时、重试、限流等实际需求。

6.总结

  • join!join_all:分别在编译期或运行期管理多个 Future;前者适合已知固定数量,后者处理动态数量。
  • 使用 PinBox::pin:解决匿名 Future 的类型擦除及移动问题。
  • race 适合只关心先结束的异步任务。
  • yield_now 可在 CPU 密集任务中主动让出执行权,提高系统并发度。
  • 自定义 timeout 等高阶抽象:通过异步组合实现复杂并发需求。

在实际开发中,选择何种并发模式需要取决于具体场景。Rust 提供了从语言层级到运行时的全栈异步能力,结合社区的强大库生态,让我们能够用更少的心智负担来编写高性能的异步并发代码。


http://www.kler.cn/a/567161.html

相关文章:

  • rust web框架actix和axum比较
  • AI编程Cursor高级技巧之Rules配置指南
  • Python接口测试实践:参数化测试、数据驱动测试和断言的使用
  • Transformer 代码剖析3 - 参数配置 (pytorch实现)
  • 蓝桥杯单片机第16届4T模拟赛三思路讲解
  • 基于Spring Boot的产业园区智慧公寓管理系统设计与实现(LW+源码+讲解)
  • Ansys Zemax | 使用衍射光学器件模拟增强现实 (AR) 系统的出瞳扩展器 (EPE):第 3 部分
  • Linux云计算SRE-第十五周
  • 机器学习基础概念详解:从入门到应用
  • 《OpenCV》——人脸检测
  • Linux上用C++和GCC开发程序实现两个不同MySQL实例下单个Schema稳定高效的数据迁移到其它MySQL实例
  • 【Java项目】基于SpringBoot和Vue的“智慧食堂”系统
  • Android 布局系列(五):GridLayout 网格布局的使用
  • 一文掌握 Scrapy 框架的详细使用,包括实战案例
  • 两数之和 Hot100
  • Mysql 语法再巩固
  • GitHub 语析 - 基于大模型的知识库与知识图谱问答平台
  • 从零搭建Tomcat:深入理解Java Web服务器的工作原理
  • 【Linux基础】Linux下的C编程指南
  • redis slaveof 命令 执行后为什么需要清库重新同步