处理任意数量的 Future
在前一节中,当我们从使用两个 future 切换到三个时,我们也必须从使用 join
切换到使用 join3
。每次我们更改想要 join 的 future 数量时都必须调用不同的函数,这会很烦人。幸运的是,我们有一个宏形式的 join
,我们可以向其传递任意数量的参数。它还处理 future 本身的等待。因此,我们可以重写 Listing 17-13 中的代码,以使用 join!
而不是 join3
,如 Listing 17-14 所示。
join!
等待多个 future与在 join
、join3
和 join4
等之间切换相比,这绝对是一个进步!然而,即使是这种宏形式也仅在我们预先知道 future 的数量时才有效。但在现实世界的 Rust 中,将 future 推入集合,然后等待其中一些或所有 future 完成是一种常见的模式。
要检查集合中的所有 future,我们需要迭代并 join 所有 future。 trpl::join_all
函数接受任何实现 Iterator
trait 的类型,您在 Iterator trait 和 next
方法第 13 章中学习过,因此它似乎正是我们需要的。让我们尝试将 future 放入 vector 中,并将 join!
替换为 join_all
,如 Listing 17-15 所示。
join_all
不幸的是,此代码无法编译。相反,我们得到这个错误
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a
different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
这可能令人惊讶。毕竟,没有一个 async 代码块返回任何内容,因此每个代码块都生成一个 Future<Output = ()>
。请记住,Future
是一个 trait,但编译器会为每个 async 代码块创建一个唯一的枚举。您不能将两个不同的手写结构体放入 Vec
中,并且相同的规则适用于编译器生成的不同枚举。
为了使这项工作正常进行,我们需要使用trait 对象,就像我们在 “从 run 函数返回错误”第 12 章中所做的那样。(我们将在第 18 章中详细介绍 trait 对象。)使用 trait 对象使我们能够将这些类型生成的每个匿名 future 视为相同的类型,因为它们都实现了 Future
trait。
注意:在第 8 章的 使用枚举存储多个值部分中,我们讨论了另一种将多种类型包含在 Vec
中的方法:使用枚举来表示 vector 中可能出现的每种类型。但是,我们在这里不能这样做。首先,我们无法命名不同的类型,因为它们是匿名的。其次,我们首先使用 vector 和 join_all
的原因是为了能够处理 future 的动态集合,我们只关心它们具有相同的输出类型。
我们首先将 vec!
中的每个 future 包装在 Box::new
中,如 Listing 17-16 所示。
Box::new
对齐 Vec
中 future 的类型不幸的是,此代码仍然无法编译。实际上,对于第二个和第三个 Box::new
调用,以及引用 Unpin
trait 的新错误,我们都得到了与之前相同的基本错误。我们稍后会回到 Unpin
错误。首先,让我们通过显式注释 futures
变量的类型来修复 Box::new
调用中的类型错误(参见 Listing 17-17)。
此类型声明有点复杂,因此让我们逐步了解它
- 最内层的类型是 future 本身。我们通过编写
Future<Output = ()>
显式地指出 future 的输出是 unit 类型()
。 - 然后,我们用
dyn
注释 trait,将其标记为动态。 - 整个 trait 引用都包装在
Box
中。 - 最后,我们显式声明
futures
是一个包含这些项的Vec
。
这已经产生了很大的不同。现在,当我们运行编译器时,我们只得到提到 Unpin
的错误。虽然有三个错误,但它们的内容非常相似。
error[E0308]: mismatched types
--> src/main.rs:46:46
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0308]: mismatched types
--> src/main.rs:46:64
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
30 | let tx_fut = async move {
| ---------- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:30:22: 30:32}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:24
|
48 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:9
|
48 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
这很多内容需要消化,所以让我们分解一下。消息的第一部分告诉我们,第一个 async 代码块 (src/main.rs:8:23: 20:10
) 未实现 Unpin
trait,并建议使用 pin!
或 Box::pin
来解决它。在本章的后面部分,我们将深入探讨有关 Pin
和 Unpin
的更多细节。但目前,我们可以按照编译器的建议来摆脱困境。在 Listing 17-18 中,我们首先使用 Pin
包装每个 Box
来更新 futures
的类型注释。其次,我们使用 Box::pin
来 pin future 本身。
Pin
和 Box::pin
使 Vec
类型检查通过如果我们编译并运行此代码,我们最终会得到我们希望的输出
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
呼!
这里还有更多内容需要探索。首先,使用 Pin<Box<T>>
会增加少量开销,因为使用 Box
将这些 future 放在堆上——而我们这样做只是为了使类型对齐。毕竟,我们实际上并不需要堆分配:这些 future 是此特定函数的局部变量。如前所述,Pin
本身是一种包装类型,因此我们可以获得 Vec
中具有单一类型的好处——我们最初使用 Box
的原因——而无需进行堆分配。我们可以直接将 Pin
与每个 future 一起使用,使用 std::pin::pin
宏。
但是,我们仍然必须显式声明 pinned 引用的类型;否则,Rust 仍然不知道将这些解释为动态 trait 对象,而这正是我们在 Vec
中需要的。因此,我们在定义每个 future 时使用 pin!
,并将 futures
定义为包含 pinned 的可变引用到动态 future 类型的 Vec
,如 Listing 17-19 所示。
Pin
与 pin!
宏一起使用,以避免不必要的堆分配到目前为止,我们忽略了我们可能具有不同的 Output
类型这一事实。例如,在 Listing 17-20 中,a
的匿名 future 实现了 Future<Output = u32>
,b
的匿名 future 实现了 Future<Output = &str>
,而 c
的匿名 future 实现了 Future<Output = bool>
。
我们可以使用 trpl::join!
来等待它们,因为它允许我们传入多种 future 类型并生成这些类型的元组。我们不能使用 trpl::join_all
,因为它要求传入的所有 future 都具有相同的类型。请记住,正是这个错误开始了我们关于 Pin
的冒险之旅!
这是一个基本的权衡:我们可以处理动态数量的 future 和 join_all
,只要它们都具有相同的类型,或者我们可以处理固定数量的 future 和 join
函数或 join!
宏,即使它们具有不同的类型。这与我们在 Rust 中处理任何其他类型时面临的情况相同。Future 并不特殊,即使我们有一些不错的语法可以处理它们,这是一件好事。
竞速 Future
当我们使用 join
系列函数和宏 “join” future 时,我们需要所有 future 都完成才能继续。但是,有时我们只需要一组 future 中的一些 future 完成才能继续——有点类似于将一个 future 与另一个 future 竞速。
在 Listing 17-21 中,我们再次使用 trpl::race
来运行两个 future,slow
和 fast
,彼此竞争。
race
获取先完成的 future 的结果每个 future 在开始运行时都会打印一条消息,通过调用并等待 sleep
暂停一段时间,然后在完成时打印另一条消息。然后,我们将 slow
和 fast
都传递给 trpl::race
,并等待其中一个完成。(这里的结果并不令人意外:fast
获胜。)与我们在 “我们的第一个 Async 程序”中使用的 race
不同,我们在这里只是忽略了它返回的 Either
实例,因为所有有趣的行文都发生在 async 代码块的主体中。
请注意,如果您翻转 race
的参数顺序,即使 fast
future 始终首先完成,“started” 消息的顺序也会发生变化。这是因为此特定 race
函数的实现是不公平的。它始终按照参数传递的顺序运行 future。其他实现是公平的,并且会随机选择首先轮询哪个 future。无论我们使用的 race 实现是否公平,其中一个 future 都将在另一个任务可以启动之前运行到其主体中的第一个 await
。
回想一下 我们的第一个 Async 程序在每个 await 点,如果正在等待的 future 尚未准备好,Rust 都会给运行时一个暂停任务并切换到另一个任务的机会。反之亦然:Rust 仅在 await 点暂停 async 代码块并将控制权交还给运行时。await 点之间的所有内容都是同步的。
这意味着如果您在没有 await 点的 async 代码块中执行大量工作,则该 future 将阻止任何其他 future 取得进展。您有时可能会听到这被称为一个 future 饿死其他 future。在某些情况下,这可能没什么大不了的。但是,如果您正在进行某种昂贵的设置或长时间运行的工作,或者如果您有一个 future 将无限期地执行某个特定任务,则需要考虑何时何地将控制权交还给运行时。
同样,如果您有长时间运行的阻塞操作,async 可以成为一种有用的工具,用于提供程序的不同部分相互关联的方式。
但是,在这些情况下,您如何将控制权交还给运行时呢?
将控制权让给运行时
让我们模拟一个长时间运行的操作。Listing 17-22 引入了一个 slow
函数。
thread::sleep
模拟慢速操作此代码使用 std::thread::sleep
而不是 trpl::sleep
,以便调用 slow
会阻塞当前线程一段时间(以毫秒为单位)。我们可以使用 slow
来代表真实世界中既长时间运行又阻塞的操作。
在 Listing 17-23 中,我们使用 slow
来模拟在一对 future 中执行此类 CPU 密集型工作。
thread::sleep
模拟慢速操作首先,每个 future 仅在执行完大量慢速操作后才将控制权交还给运行时。如果您运行此代码,您将看到此输出
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
与我们之前的示例一样,race
仍然在 a
完成后立即完成。但是,两个 future 之间没有交错。 a
future 执行其所有工作,直到 trpl::sleep
调用被等待,然后 b
future 执行其所有工作,直到其自身的 trpl::sleep
调用被等待,最后 a
future 完成。为了允许两个 future 在它们的慢速任务之间取得进展,我们需要 await 点,以便我们可以将控制权交还给运行时。这意味着我们需要一些可以 await 的东西!
我们已经可以在 Listing 17-23 中看到这种交接正在发生:如果我们删除 a
future 末尾的 trpl::sleep
,它将在 b
future 完全不运行的情况下完成。让我们尝试使用 sleep
函数作为起点,让操作切换停止进展,如 Listing 17-24 所示。
sleep
让操作切换停止进展在 Listing 17-24 中,我们在每次调用 slow
之间添加了带有 await 点的 trpl::sleep
调用。现在,两个 future 的工作是交错的
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
a
future 仍然会运行一段时间,然后将控制权交给 b
,因为它在调用 trpl::sleep
之前调用了 slow
,但在此之后,future 会在其中一个 future 命中 await 点时来回切换。在本例中,我们在每次调用 slow
后都执行了此操作,但我们可以以最适合我们的方式分解工作。
但我们真的不想在这里sleep:我们希望尽可能快地取得进展。我们只需要将控制权交还给运行时。我们可以直接使用 yield_now
函数来做到这一点。在 Listing 17-25 中,我们将所有这些 sleep
调用替换为 yield_now
。
yield_now
让操作切换停止进展此代码既更清楚地表达了实际意图,并且可能比使用 sleep
快得多,因为诸如 sleep
使用的计时器通常对其粒度有限制。例如,我们正在使用的 sleep
版本始终至少睡眠一毫秒,即使我们向其传递了一纳秒的 Duration
。同样,现代计算机非常快:它们在一毫秒内可以完成很多工作!
您可以通过设置一个小基准来亲眼看看这一点,例如 Listing 17-26 中的基准。(这不是一种特别严格的性能测试方法,但足以显示这里的差异。)
sleep
和 yield_now
的性能在这里,我们跳过所有状态打印,将一纳秒的 Duration
传递给 trpl::sleep
,并让每个 future 单独运行,future 之间没有切换。然后我们运行 1,000 次迭代,看看使用 trpl::sleep
的 future 与使用 trpl::yield_now
的 future 相比花费了多长时间。
使用 yield_now
的版本快得多!
这意味着 async 即使对于计算密集型任务也可能很有用,具体取决于您的程序还在做什么,因为它提供了一个有用的工具来构建程序不同部分之间的关系。这是一种协作式多任务处理,其中每个 future 都有权决定何时通过 await 点交出控制权。因此,每个 future 也都有责任避免阻塞太长时间。在某些基于 Rust 的嵌入式操作系统中,这是唯一一种多任务处理方式!
当然,在实际代码中,您通常不会在每一行上交替进行函数调用和 await 点。虽然以这种方式让出控制权相对便宜,但它并非免费。在许多情况下,尝试分解计算密集型任务可能会使其速度明显变慢,因此有时为了整体性能,最好让操作短暂阻塞。始终进行测量以查看代码的实际性能瓶颈在哪里。但是,如果您确实看到大量工作按顺序发生,而您期望它们并发发生,那么底层的动态非常重要,需要牢记!
构建我们自己的 Async 抽象
我们还可以将 future 组合在一起以创建新模式。例如,我们可以使用我们已经拥有的 async 构建块构建一个 timeout
函数。完成后,结果将是另一个构建块,我们可以使用它来创建更多的 async 抽象。
Listing 17-27 显示了我们期望此 timeout
如何与慢速 future 一起工作。
timeout
运行具有时间限制的慢速操作让我们实现它!首先,让我们考虑一下 timeout
的 API
- 它本身需要是一个 async 函数,以便我们可以 await 它。
- 它的第一个参数应该是要运行的 future。我们可以使其通用,以使其可以与任何 future 一起使用。
- 它的第二个参数将是最大等待时间。如果我们使用
Duration
,这将使其易于传递给trpl::sleep
。 - 它应该返回一个
Result
。如果 future 成功完成,则Result
将是Ok
,其中包含 future 生成的值。如果超时首先经过,则Result
将是Err
,其中包含超时等待的持续时间。
Listing 17-28 显示了此声明。
timeout
的签名这满足了我们对类型的目标。现在让我们考虑一下我们需要的行为:我们希望将传入的 future 与持续时间进行竞速。我们可以使用 trpl::sleep
从持续时间创建一个计时器 future,并使用 trpl::race
将该计时器与调用者传入的 future 一起运行。
我们还知道 race
是不公平的,它按照参数传递的顺序轮询参数。因此,我们首先将 future_to_try
传递给 race
,以便即使 max_time
是非常短的持续时间,它也有机会完成。如果 future_to_try
首先完成,race
将返回 Left
,其中包含来自 future_to_try
的输出。如果 timer
首先完成,race
将返回 Right
,其中包含计时器的 ()
输出。
在 Listing 17-29 中,我们匹配等待 trpl::race
的结果。
race
和 sleep
定义 timeout
如果 future_to_try
成功并且我们得到 Left(output)
,则我们返回 Ok(output)
。如果睡眠计时器相反地经过并且我们得到 Right(())
,则我们使用 _
忽略 ()
,并返回 Err(max_time)
。
这样,我们就有了一个由两个其他 async 助手构建的工作 timeout
。如果我们运行我们的代码,它将在超时后打印失败模式
Failed after 2 seconds
由于 future 可以与其他 future 组合,因此您可以使用较小的 async 构建块构建真正强大的工具。例如,您可以使用相同的方法将超时与重试相结合,然后将它们与网络调用等操作一起使用(本章开头的示例之一)。
在实践中,您通常会直接使用 async
和 await
,其次是使用诸如 join
、join_all
、race
等函数和宏。您只需要偶尔使用 pin
即可将 future 与这些 API 一起使用。
我们现在已经看到了多种同时处理多个 future 的方法。接下来,我们将研究如何使用流在一段时间内按顺序处理多个 future。以下是您可能需要首先考虑的更多事项
-
我们使用了带有
join_all
的Vec
来等待某个组中的所有 future 完成。您如何使用Vec
按顺序处理一组 future 呢?这样做有什么权衡? -
看看来自
futures
crate 的futures::stream::FuturesUnordered
类型。使用它与使用Vec
有什么不同?(不要担心它来自 crate 的stream
部分;它可以与任何 future 集合完美配合。)