处理任意数量的 Future

在前一节中,当我们从使用两个 future 切换到三个时,我们也必须从使用 join 切换到使用 join3。每次我们更改想要 join 的 future 数量时都必须调用不同的函数,这会很烦人。幸运的是,我们有一个宏形式的 join,我们可以向其传递任意数量的参数。它还处理 future 本身的等待。因此,我们可以重写 Listing 17-13 中的代码,以使用 join! 而不是 join3,如 Listing 17-14 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
Listing 17-14: 使用 join! 等待多个 future

与在 joinjoin3join4 等之间切换相比,这绝对是一个进步!然而,即使是这种宏形式也仅在我们预先知道 future 的数量时才有效。但在现实世界的 Rust 中,将 future 推入集合,然后等待其中一些或所有 future 完成是一种常见的模式。

要检查集合中的所有 future,我们需要迭代并 join 所有 future。 trpl::join_all 函数接受任何实现 Iterator trait 的类型,您在 Iterator trait 和 next 方法第 13 章中学习过,因此它似乎正是我们需要的。让我们尝试将 future 放入 vector 中,并将 join! 替换为 join_all,如 Listing 17-15 所示。

extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Listing 17-15: 将匿名 future 存储在 vector 中并调用 join_all

不幸的是,此代码无法编译。相反,我们得到这个错误

error[E0308]: mismatched types --> src/main.rs:45:37 | 10 | let tx1_fut = async move { | ---------- the expected `async` block ... 24 | let rx_fut = async { | ----- the found `async` block ... 45 | let futures = vec![tx1_fut, rx_fut, tx_fut]; | ^^^^^^ expected `async` block, found a different `async` block | = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}` found `async` block `{async block@src/main.rs:24:22: 24:27}` = note: no two async blocks, even if identical, have the same type = help: consider pinning your async block and casting it to a trait object

这可能令人惊讶。毕竟,没有一个 async 代码块返回任何内容,因此每个代码块都生成一个 Future<Output = ()>。请记住,Future 是一个 trait,但编译器会为每个 async 代码块创建一个唯一的枚举。您不能将两个不同的手写结构体放入 Vec 中,并且相同的规则适用于编译器生成的不同枚举。

为了使这项工作正常进行,我们需要使用trait 对象,就像我们在 “从 run 函数返回错误”第 12 章中所做的那样。(我们将在第 18 章中详细介绍 trait 对象。)使用 trait 对象使我们能够将这些类型生成的每个匿名 future 视为相同的类型,因为它们都实现了 Future trait。

注意:在第 8 章的 使用枚举存储多个值部分中,我们讨论了另一种将多种类型包含在 Vec 中的方法:使用枚举来表示 vector 中可能出现的每种类型。但是,我们在这里不能这样做。首先,我们无法命名不同的类型,因为它们是匿名的。其次,我们首先使用 vector 和 join_all 的原因是为了能够处理 future 的动态集合,我们只关心它们具有相同的输出类型。

我们首先将 vec! 中的每个 future 包装在 Box::new 中,如 Listing 17-16 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures = vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)]; trpl::join_all(futures).await; }); }
Listing 17-16: 使用 Box::new 对齐 Vec 中 future 的类型

不幸的是,此代码仍然无法编译。实际上,对于第二个和第三个 Box::new 调用,以及引用 Unpin trait 的新错误,我们都得到了与之前相同的基本错误。我们稍后会回到 Unpin 错误。首先,让我们通过显式注释 futures 变量的类型来修复 Box::new 调用中的类型错误(参见 Listing 17-17)。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{future::Future, time::Duration}; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures: Vec<Box<dyn Future<Output = ()>>> = vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)]; trpl::join_all(futures).await; }); }
Listing 17-17: 通过使用显式类型声明修复其余的类型不匹配错误

此类型声明有点复杂,因此让我们逐步了解它

  1. 最内层的类型是 future 本身。我们通过编写 Future<Output = ()> 显式地指出 future 的输出是 unit 类型 ()
  2. 然后,我们用 dyn 注释 trait,将其标记为动态。
  3. 整个 trait 引用都包装在 Box 中。
  4. 最后,我们显式声明 futures 是一个包含这些项的 Vec

这已经产生了很大的不同。现在,当我们运行编译器时,我们只得到提到 Unpin 的错误。虽然有三个错误,但它们的内容非常相似。

error[E0308]: mismatched types --> src/main.rs:46:46 | 10 | let tx1_fut = async move { | ---------- the expected `async` block ... 24 | let rx_fut = async { | ----- the found `async` block ... 46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)]; | -------- ^^^^^^ expected `async` block, found a different `async` block | | | arguments to this function are incorrect | = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}` found `async` block `{async block@src/main.rs:24:22: 24:27}` = note: no two async blocks, even if identical, have the same type = help: consider pinning your async block and casting it to a trait object note: associated function defined here --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12 | 255 | pub fn new(x: T) -> Self { | ^^^ error[E0308]: mismatched types --> src/main.rs:46:64 | 10 | let tx1_fut = async move { | ---------- the expected `async` block ... 30 | let tx_fut = async move { | ---------- the found `async` block ... 46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)]; | -------- ^^^^^^ expected `async` block, found a different `async` block | | | arguments to this function are incorrect | = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}` found `async` block `{async block@src/main.rs:30:22: 30:32}` = note: no two async blocks, even if identical, have the same type = help: consider pinning your async block and casting it to a trait object note: associated function defined here --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12 | 255 | pub fn new(x: T) -> Self { | ^^^ error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned --> src/main.rs:48:24 | 48 | trpl::join_all(futures).await; | -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future` | | | required by a bound introduced by this call | = note: consider using the `pin!` macro consider using `Box::pin` if you need to access the pinned value outside of the current scope = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future` note: required by a bound in `join_all` --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14 | 102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item> | -------- required by a bound in this function ... 105 | I::Item: Future, | ^^^^^^ required by this bound in `join_all` error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned --> src/main.rs:48:9 | 48 | trpl::join_all(futures).await; | ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future` | = note: consider using the `pin!` macro consider using `Box::pin` if you need to access the pinned value outside of the current scope = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future` note: required by a bound in `futures_util::future::join_all::JoinAll` --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8 | 27 | pub struct JoinAll<F> | ------- required by a bound in this struct 28 | where 29 | F: Future, | ^^^^^^ required by this bound in `JoinAll` error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned --> src/main.rs:48:33 | 48 | trpl::join_all(futures).await; | ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future` | = note: consider using the `pin!` macro consider using `Box::pin` if you need to access the pinned value outside of the current scope = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future` note: required by a bound in `futures_util::future::join_all::JoinAll` --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8 | 27 | pub struct JoinAll<F> | ------- required by a bound in this struct 28 | where 29 | F: Future, | ^^^^^^ required by this bound in `JoinAll`

多内容需要消化,所以让我们分解一下。消息的第一部分告诉我们,第一个 async 代码块 (src/main.rs:8:23: 20:10) 未实现 Unpin trait,并建议使用 pin!Box::pin 来解决它。在本章的后面部分,我们将深入探讨有关 PinUnpin 的更多细节。但目前,我们可以按照编译器的建议来摆脱困境。在 Listing 17-18 中,我们首先使用 Pin 包装每个 Box 来更新 futures 的类型注释。其次,我们使用 Box::pin 来 pin future 本身。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
Listing 17-18: 使用 PinBox::pin 使 Vec 类型检查通过

如果我们编译并运行此代码,我们最终会得到我们希望的输出

received 'hi' received 'more' received 'from' received 'messages' received 'the' received 'for' received 'future' received 'you'

呼!

这里还有更多内容需要探索。首先,使用 Pin<Box<T>> 会增加少量开销,因为使用 Box 将这些 future 放在堆上——而我们这样做只是为了使类型对齐。毕竟,我们实际上并不需要堆分配:这些 future 是此特定函数的局部变量。如前所述,Pin 本身是一种包装类型,因此我们可以获得 Vec 中具有单一类型的好处——我们最初使用 Box 的原因——而无需进行堆分配。我们可以直接将 Pin 与每个 future 一起使用,使用 std::pin::pin 宏。

但是,我们仍然必须显式声明 pinned 引用的类型;否则,Rust 仍然不知道将这些解释为动态 trait 对象,而这正是我们在 Vec 中需要的。因此,我们在定义每个 future 时使用 pin!,并将 futures 定义为包含 pinned 的可变引用到动态 future 类型的 Vec,如 Listing 17-19 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Listing 17-19: 直接将 Pinpin! 宏一起使用,以避免不必要的堆分配

到目前为止,我们忽略了我们可能具有不同的 Output 类型这一事实。例如,在 Listing 17-20 中,a 的匿名 future 实现了 Future<Output = u32>b 的匿名 future 实现了 Future<Output = &str>,而 c 的匿名 future 实现了 Future<Output = bool>

文件名: src/main.rs
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
Listing 17-20: 三个具有不同类型的 future

我们可以使用 trpl::join! 来等待它们,因为它允许我们传入多种 future 类型并生成这些类型的元组。我们不能使用 trpl::join_all,因为它要求传入的所有 future 都具有相同的类型。请记住,正是这个错误开始了我们关于 Pin 的冒险之旅!

这是一个基本的权衡:我们可以处理动态数量的 future 和 join_all,只要它们都具有相同的类型,或者我们可以处理固定数量的 future 和 join 函数或 join! 宏,即使它们具有不同的类型。这与我们在 Rust 中处理任何其他类型时面临的情况相同。Future 并不特殊,即使我们有一些不错的语法可以处理它们,这是一件好事。

竞速 Future

当我们使用 join 系列函数和宏 “join” future 时,我们需要所有 future 都完成才能继续。但是,有时我们只需要一组 future 中的一些 future 完成才能继续——有点类似于将一个 future 与另一个 future 竞速。

在 Listing 17-21 中,我们再次使用 trpl::race 来运行两个 future,slowfast,彼此竞争。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
Listing 17-21: 使用 race 获取先完成的 future 的结果

每个 future 在开始运行时都会打印一条消息,通过调用并等待 sleep 暂停一段时间,然后在完成时打印另一条消息。然后,我们将 slowfast 都传递给 trpl::race,并等待其中一个完成。(这里的结果并不令人意外:fast 获胜。)与我们在 “我们的第一个 Async 程序”中使用的 race 不同,我们在这里只是忽略了它返回的 Either 实例,因为所有有趣的行文都发生在 async 代码块的主体中。

请注意,如果您翻转 race 的参数顺序,即使 fast future 始终首先完成,“started” 消息的顺序也会发生变化。这是因为此特定 race 函数的实现是不公平的。它始终按照参数传递的顺序运行 future。其他实现公平的,并且会随机选择首先轮询哪个 future。无论我们使用的 race 实现是否公平,其中一个 future 都将在另一个任务可以启动之前运行到其主体中的第一个 await

回想一下 我们的第一个 Async 程序在每个 await 点,如果正在等待的 future 尚未准备好,Rust 都会给运行时一个暂停任务并切换到另一个任务的机会。反之亦然:Rust 在 await 点暂停 async 代码块并将控制权交还给运行时。await 点之间的所有内容都是同步的。

这意味着如果您在没有 await 点的 async 代码块中执行大量工作,则该 future 将阻止任何其他 future 取得进展。您有时可能会听到这被称为一个 future 饿死其他 future。在某些情况下,这可能没什么大不了的。但是,如果您正在进行某种昂贵的设置或长时间运行的工作,或者如果您有一个 future 将无限期地执行某个特定任务,则需要考虑何时何地将控制权交还给运行时。

同样,如果您有长时间运行的阻塞操作,async 可以成为一种有用的工具,用于提供程序的不同部分相互关联的方式。

但是,在这些情况下,您如何将控制权交还给运行时呢?

将控制权让给运行时

让我们模拟一个长时间运行的操作。Listing 17-22 引入了一个 slow 函数。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Listing 17-22: 使用 thread::sleep 模拟慢速操作

此代码使用 std::thread::sleep 而不是 trpl::sleep,以便调用 slow 会阻塞当前线程一段时间(以毫秒为单位)。我们可以使用 slow 来代表真实世界中既长时间运行又阻塞的操作。

在 Listing 17-23 中,我们使用 slow 来模拟在一对 future 中执行此类 CPU 密集型工作。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Listing 17-23: 使用 thread::sleep 模拟慢速操作

首先,每个 future 仅在执行完大量慢速操作才将控制权交还给运行时。如果您运行此代码,您将看到此输出

'a' started. 'a' ran for 30ms 'a' ran for 10ms 'a' ran for 20ms 'b' started. 'b' ran for 75ms 'b' ran for 10ms 'b' ran for 15ms 'b' ran for 350ms 'a' finished.

与我们之前的示例一样,race 仍然在 a 完成后立即完成。但是,两个 future 之间没有交错。 a future 执行其所有工作,直到 trpl::sleep 调用被等待,然后 b future 执行其所有工作,直到其自身的 trpl::sleep 调用被等待,最后 a future 完成。为了允许两个 future 在它们的慢速任务之间取得进展,我们需要 await 点,以便我们可以将控制权交还给运行时。这意味着我们需要一些可以 await 的东西!

我们已经可以在 Listing 17-23 中看到这种交接正在发生:如果我们删除 a future 末尾的 trpl::sleep,它将在 b future 完全不运行的情况下完成。让我们尝试使用 sleep 函数作为起点,让操作切换停止进展,如 Listing 17-24 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 35); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Listing 17-24: 使用 sleep 让操作切换停止进展

在 Listing 17-24 中,我们在每次调用 slow 之间添加了带有 await 点的 trpl::sleep 调用。现在,两个 future 的工作是交错的

'a' started. 'a' ran for 30ms 'b' started. 'b' ran for 75ms 'a' ran for 10ms 'b' ran for 10ms 'a' ran for 20ms 'b' ran for 15ms 'a' finished.

a future 仍然会运行一段时间,然后将控制权交给 b,因为它在调用 trpl::sleep 之前调用了 slow,但在此之后,future 会在其中一个 future 命中 await 点时来回切换。在本例中,我们在每次调用 slow 后都执行了此操作,但我们可以以最适合我们的方式分解工作。

但我们真的不想在这里sleep:我们希望尽可能快地取得进展。我们只需要将控制权交还给运行时。我们可以直接使用 yield_now 函数来做到这一点。在 Listing 17-25 中,我们将所有这些 sleep 调用替换为 yield_now

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 35); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Listing 17-25: 使用 yield_now 让操作切换停止进展

此代码既更清楚地表达了实际意图,并且可能比使用 sleep 快得多,因为诸如 sleep 使用的计时器通常对其粒度有限制。例如,我们正在使用的 sleep 版本始终至少睡眠一毫秒,即使我们向其传递了一纳秒的 Duration。同样,现代计算机非常快:它们在一毫秒内可以完成很多工作!

您可以通过设置一个小基准来亲眼看看这一点,例如 Listing 17-26 中的基准。(这不是一种特别严格的性能测试方法,但足以显示这里的差异。)

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
Listing 17-26: 比较 sleepyield_now 的性能

在这里,我们跳过所有状态打印,将一纳秒的 Duration 传递给 trpl::sleep,并让每个 future 单独运行,future 之间没有切换。然后我们运行 1,000 次迭代,看看使用 trpl::sleep 的 future 与使用 trpl::yield_now 的 future 相比花费了多长时间。

使用 yield_now 的版本快得多

这意味着 async 即使对于计算密集型任务也可能很有用,具体取决于您的程序还在做什么,因为它提供了一个有用的工具来构建程序不同部分之间的关系。这是一种协作式多任务处理,其中每个 future 都有权决定何时通过 await 点交出控制权。因此,每个 future 也都有责任避免阻塞太长时间。在某些基于 Rust 的嵌入式操作系统中,这是唯一一种多任务处理方式!

当然,在实际代码中,您通常不会在每一行上交替进行函数调用和 await 点。虽然以这种方式让出控制权相对便宜,但它并非免费。在许多情况下,尝试分解计算密集型任务可能会使其速度明显变慢,因此有时为了整体性能,最好让操作短暂阻塞。始终进行测量以查看代码的实际性能瓶颈在哪里。但是,如果您确实看到大量工作按顺序发生,而您期望它们并发发生,那么底层的动态非常重要,需要牢记!

构建我们自己的 Async 抽象

我们还可以将 future 组合在一起以创建新模式。例如,我们可以使用我们已经拥有的 async 构建块构建一个 timeout 函数。完成后,结果将是另一个构建块,我们可以使用它来创建更多的 async 抽象。

Listing 17-27 显示了我们期望此 timeout 如何与慢速 future 一起工作。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_millis(100)).await; "I finished!" }; match timeout(slow, Duration::from_millis(10)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); }
Listing 17-27: 使用我们想象的 timeout 运行具有时间限制的慢速操作

让我们实现它!首先,让我们考虑一下 timeout 的 API

  • 它本身需要是一个 async 函数,以便我们可以 await 它。
  • 它的第一个参数应该是要运行的 future。我们可以使其通用,以使其可以与任何 future 一起使用。
  • 它的第二个参数将是最大等待时间。如果我们使用 Duration,这将使其易于传递给 trpl::sleep
  • 它应该返回一个 Result。如果 future 成功完成,则 Result 将是 Ok,其中包含 future 生成的值。如果超时首先经过,则 Result 将是 Err,其中包含超时等待的持续时间。

Listing 17-28 显示了此声明。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{future::Future, time::Duration}; fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_millis(10)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { // Here is where our implementation will go! }
Listing 17-28: 定义 timeout 的签名

这满足了我们对类型的目标。现在让我们考虑一下我们需要的行为:我们希望将传入的 future 与持续时间进行竞速。我们可以使用 trpl::sleep 从持续时间创建一个计时器 future,并使用 trpl::race 将该计时器与调用者传入的 future 一起运行。

我们还知道 race 是不公平的,它按照参数传递的顺序轮询参数。因此,我们首先将 future_to_try 传递给 race,以便即使 max_time 是非常短的持续时间,它也有机会完成。如果 future_to_try 首先完成,race 将返回 Left,其中包含来自 future_to_try 的输出。如果 timer 首先完成,race 将返回 Right,其中包含计时器的 () 输出。

在 Listing 17-29 中,我们匹配等待 trpl::race 的结果。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::{future::Future, time::Duration}; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Listing 17-29: 使用 racesleep 定义 timeout

如果 future_to_try 成功并且我们得到 Left(output),则我们返回 Ok(output)。如果睡眠计时器相反地经过并且我们得到 Right(()),则我们使用 _ 忽略 (),并返回 Err(max_time)

这样,我们就有了一个由两个其他 async 助手构建的工作 timeout。如果我们运行我们的代码,它将在超时后打印失败模式

Failed after 2 seconds

由于 future 可以与其他 future 组合,因此您可以使用较小的 async 构建块构建真正强大的工具。例如,您可以使用相同的方法将超时与重试相结合,然后将它们与网络调用等操作一起使用(本章开头的示例之一)。

在实践中,您通常会直接使用 asyncawait,其次是使用诸如 joinjoin_allrace 等函数和宏。您只需要偶尔使用 pin 即可将 future 与这些 API 一起使用。

我们现在已经看到了多种同时处理多个 future 的方法。接下来,我们将研究如何使用在一段时间内按顺序处理多个 future。以下是您可能需要首先考虑的更多事项

  • 我们使用了带有 join_allVec 来等待某个组中的所有 future 完成。您如何使用 Vec 按顺序处理一组 future 呢?这样做有什么权衡?

  • 看看来自 futures crate 的 futures::stream::FuturesUnordered 类型。使用它与使用 Vec 有什么不同?(不要担心它来自 crate 的 stream 部分;它可以与任何 future 集合完美配合。)