Async 特征的深入探讨

在本章中,我们以各种方式使用了 FuturePinUnpinStreamStreamExt 特征。然而到目前为止,我们一直避免深入探讨它们的工作原理或它们如何协同工作的细节,这对于您的日常 Rust 工作来说在大多数时候都是可以的。但是,有时您会遇到需要了解更多这些细节的情况。在本节中,我们将进行足够的深入,以帮助您应对这些情况,仍然将真正深入的探讨留给其他文档。

Future 特征

让我们首先仔细看看 Future 特征是如何工作的。以下是 Rust 对它的定义

#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }

该特征定义包含一堆新类型,以及一些我们以前没有见过的语法,所以让我们逐个部分地了解这个定义。

首先,Future 的关联类型 Output 说明了 future 解析为哪个值。这类似于 Iterator 特征的 Item 关联类型。其次,Future 也具有 poll 方法,该方法为其 self 参数采用特殊的 Pin 引用,并为 Context 类型采用可变引用,并返回 Poll<Self::Output>。我们稍后将详细讨论 PinContext。现在,让我们关注该方法返回的内容,即 Poll 类型

#![allow(unused)] fn main() { enum Poll<T> { Ready(T), Pending, } }

Poll 类型类似于 Option。它有一个包含值的变体 Ready(T),以及一个不包含值的变体 Pending。但是,Poll 的含义与 Option 完全不同!Pending 变体表示 future 仍有工作要做,因此调用者需要稍后再次检查。Ready 变体表示 future 已完成其工作,并且 T 值可用。

注意:对于大多数 future,在 future 返回 Ready 后,调用者不应再次调用 poll。许多 future 在变为 ready 后再次轮询会 panic。可以安全地再次轮询的 Future 会在其文档中明确说明。这类似于 Iterator::next 的行为方式。

当您看到使用 await 的代码时,Rust 会在底层将其编译为调用 poll 的代码。如果您回顾清单 17-4,我们在其中打印了单个 URL 的页面标题,一旦它解析完成,Rust 会将其编译成类似于(虽然不完全是)这样的东西

match page_title(url).poll() { Ready(page_title) => match page_title { Some(title) => println!("The title for {url} was {title}"), None => println!("{url} had no title"), } Pending => { // But what goes here? } }

当 future 仍然是 Pending 时,我们应该怎么办?我们需要某种方法来一遍又一遍地尝试,直到 future 最终 ready。换句话说,我们需要一个循环

let mut page_title_fut = page_title(url); loop { match page_title_fut.poll() { Ready(value) => match page_title { Some(title) => println!("The title for {url} was {title}"), None => println!("{url} had no title"), } Pending => { // continue } } }

但是,如果 Rust 将其编译成完全相同的代码,则每个 await 都会是阻塞的——这与我们想要达到的目标完全相反!相反,Rust 确保循环可以将控制权移交给可以暂停此 future 上的工作以处理其他 future,然后再稍后再次检查此 future 的东西。正如我们所见,这个东西是一个 async 运行时,而这种调度和协调工作是其主要工作之一。

在本章前面,我们描述了等待 rx.recvrecv 调用返回一个 future,而等待 future 会轮询它。我们注意到,运行时将暂停 future,直到它准备好 Some(message) 或通道关闭时的 None。凭借我们对 Future 特征,特别是 Future::poll 的更深入理解,我们可以看到它是如何工作的。运行时知道 future 在返回 Poll::Pending 时尚未准备就绪。相反,当 poll 返回 Poll::Ready(Some(message))Poll::Ready(None) 时,运行时知道 future 准备就绪并推进它。

运行时如何做到这一点的确切细节超出了本书的范围,但关键是要了解 future 的基本机制:运行时轮询它负责的每个 future,并在 future 尚未准备就绪时将其放回睡眠状态。

PinUnpin 特征

当我们在清单 17-16 中介绍 pinning 的概念时,我们遇到了一个非常棘手的错误消息。以下是再次相关的部分

error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned --> src/main.rs:48:33 | 48 | trpl::join_all(futures).await; | ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future` | = note: consider using the `pin!` macro consider using `Box::pin` if you need to access the pinned value outside of the current scope = note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future` note: required by a bound in `futures_util::future::join_all::JoinAll` --> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8 | 27 | pub struct JoinAll<F> | ------- required by a bound in this struct 28 | where 29 | F: Future, | ^^^^^^ required by this bound in `JoinAll`

此错误消息不仅告诉我们需要 pin 这些值,还告诉我们为什么需要 pinning。trpl::join_all 函数返回一个名为 JoinAll 的 struct。该 struct 是泛型,类型为 FF 被约束为实现 Future 特征。直接使用 await 等待 future 会隐式地 pin future。这就是为什么我们不需要在我们想要等待 future 的任何地方都使用 pin!

但是,我们在这里没有直接等待 future。相反,我们通过将 future 的集合传递给 join_all 函数来构造一个新的 future JoinAlljoin_all 的签名要求集合中项的类型都实现 Future 特征,并且 Box<T> 仅当它包装的 T 是实现 Unpin 特征的 future 时才实现 Future

这需要吸收很多内容!为了真正理解它,让我们更深入地了解 Future 特征的实际工作原理,特别是围绕pinning

再次查看 Future 特征的定义

#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; // Required method fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; } }

cx 参数及其 Context 类型是运行时实际知道何时检查任何给定 future,同时仍然保持惰性的关键。同样,其工作原理的细节超出了本章的范围,您通常只需要在编写自定义 Future 实现时才需要考虑这一点。我们将重点关注 self 的类型,因为这是我们第一次看到 self 具有类型注释的方法。self 的类型注释的工作方式类似于其他函数参数的类型注释,但有两个关键区别

  • 它告诉 Rust 方法被调用时 self 必须是什么类型。

  • 它不能只是任何类型。它被限制为方法实现的类型,指向该类型的引用或智能指针,或者包装指向该类型的引用的 Pin

我们将在 第 18 章 中看到更多关于此语法的内容。现在,知道如果我们想要轮询 future 以检查它是 Pending 还是 Ready(Output),我们需要一个 Pin 包装的可变引用到该类型就足够了。

Pin 是指针类类型的包装器,例如 &&mutBoxRc。(从技术上讲,Pin 适用于实现 DerefDerefMut 特征的类型,但这实际上等同于仅使用指针。)Pin 本身不是指针,也没有任何自己的行为,例如 RcArc 在引用计数方面所做的那样;它纯粹是编译器可以用来强制执行指针使用约束的工具。

回想一下,await 是根据对 poll 的调用来实现的,这开始解释了我们之前看到的错误消息,但那是关于 Unpin 而不是 Pin 的。那么 PinUnpin 究竟有什么关系?为什么 Future 需要 selfPin 类型中才能调用 poll 呢?

回想一下本章前面内容,future 中的一系列 await 点被编译成一个状态机,编译器确保状态机遵循 Rust 关于安全性的所有常规规则,包括借用和所有权。为了使其工作,Rust 会查看一个 await 点和下一个 await 点或 async 块的末尾之间需要哪些数据。然后,它在编译的状态机中创建一个相应的变体。每个变体都获得访问权限,以访问将在源代码的该部分中使用的数据,无论是通过获取该数据的所有权,还是通过获取对该数据的可变或不可变引用。

到目前为止,一切都很好:如果我们在给定的 async 块中的所有权或引用方面做错了任何事情,借用检查器会告诉我们。当我们想要移动与该块对应的 future 时——例如将其移动到 Vec 中以传递给 join_all——事情变得棘手起来。

当我们移动一个 future 时——无论是通过将其推送到数据结构中以用作 join_all 的迭代器,还是通过从函数返回它——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为 async 块创建的 future 最终可能会在任何给定变体的字段中包含对自身的引用,如图 17-4 中的简化图所示。

A single-column, three-row table representing a future, fut1, which has data values 0 and 1 in the first two rows and an arrow pointing from the third row back to the second row, representing an internal reference within the future.
图 17-4:自引用数据类型。

但是,默认情况下,任何具有自身引用的对象移动都是不安全的,因为引用始终指向它们所引用的任何内容的实际内存地址(参见图 17-5)。如果您移动数据结构本身,这些内部引用将指向旧位置。但是,该内存位置现在无效。首先,当您更改数据结构时,其值将不会更新。更重要的一点是,计算机现在可以自由地将该内存重新用于其他目的!您最终可能会稍后读取完全不相关的数据。

Two tables, depicting two futures, fut1 and fut2, each of which has one column and three rows, representing the result of having moved a future out of fut1 into fut2. The first, fut1, is grayed out, with a question mark in each index, representing unknown memory. The second, fut2, has 0 and 1 in the first and second rows and an arrow pointing from its third row back to the second row of fut1, representing a pointer that is referencing the old location in memory of the future before it was moved.
图 17-5:移动自引用数据类型的不安全结果

从理论上讲,Rust 编译器可以尝试在每次移动对象时更新对对象的所有引用,但这可能会增加很多性能开销,尤其是在需要更新整个引用网络的情况下。如果我们可以确保所讨论的数据结构在内存中不移动,我们就不必更新任何引用。这正是 Rust 的借用检查器要求的:在安全代码中,它会阻止您移动任何具有活动引用的项。

Pin 基于此为我们提供了我们需要的确切保证。当我们通过将指向该值的指针包装在 Pin 中来pin一个值时,它就不能再移动了。因此,如果您有 Pin<Box<SomeType>>,您实际上 pin 了 SomeType 值,而不是 Box 指针。图 17-6 说明了这个过程。

Three boxes laid out side by side. The first is labeled “Pin”, the second “b1”, and the third “pinned”. Within “pinned” is a table labeled “fut”, with a single column; it represents a future with cells for each part of the data structure. Its first cell has the value “0”, its second cell has an arrow coming out of it and pointing to the fourth and final cell, which has the value “1” in it, and the third cell has dashed lines and an ellipsis to indicate there may be other parts to the data structure. All together, the “fut” table represents a future which is self-referential. An arrow leaves the box labeled “Pin”, goes through the box labeled “b1” and has terminates inside the “pinned” box at the “fut” table.
图 17-6:Pin 指向自引用 future 类型的 `Box`。

事实上,Box 指针仍然可以自由移动。请记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动,但它指向的数据在同一位置,如图 17-7 所示,则没有潜在的问题。作为一个独立的练习,查看类型以及 std::pin 模块的文档,并尝试找出如何使用包装 BoxPin 来做到这一点。)关键是自引用类型本身不能移动,因为它仍然被 pin 了。

Four boxes laid out in three rough columns, identical to the previous diagram with a change to the second column. Now there are two boxes in the second column, labeled “b1” and “b2”, “b1” is grayed out, and the arrow from “Pin” goes through “b2” instead of “b1”, indicating that the pointer has moved from “b1” to “b2”, but the data in “pinned” has not moved.
图 17-7:移动指向自引用 future 类型的 `Box`。

但是,即使大多数类型碰巧在 Pin 指针后面,移动它们也是完全安全的。我们只需要在项目具有内部引用时才考虑 pinning。诸如数字和布尔值之类的原始值是安全的,因为它们显然没有任何内部引用,因此它们显然是安全的。您通常在 Rust 中使用的大多数类型也不是。例如,您可以移动 Vec 而无需担心。仅根据我们目前所见,如果您有 Pin<Vec<String>>,即使 Vec<String> 在没有其他对其的引用时始终可以安全移动,您也必须通过 Pin 提供的安全但限制性的 API 来完成所有操作。我们需要一种方法来告诉编译器,在这种情况下移动项目是可以的——这就是 Unpin 发挥作用的地方。

Unpin 是一个标记特征,类似于我们在第 16 章中看到的 SendSync 特征,因此它本身没有功能。标记特征的存在只是为了告诉编译器,在特定上下文中,使用实现给定特征的类型是安全的。Unpin 通知编译器,给定类型需要维护关于所讨论的值是否可以安全移动的任何保证。

就像 SendSync 一样,编译器会自动为所有它可以证明安全的类型实现 Unpin。一个特殊情况,再次类似于 SendSync,是 Unpin 为类型实现的情况。这种情况的表示法是 impl !Unpin for SomeType,其中 SomeType 是一个类型的名称,该类型确实需要维护这些保证,以便在 Pin 中使用指向该类型的指针时安全。

换句话说,关于 PinUnpin 之间的关系,需要记住两件事。首先,Unpin 是“正常”情况,而 !Unpin 是特殊情况。其次,类型是否实现 Unpin!Unpin 在您使用指向该类型的 pin 指针(如 Pin<&mut SomeType>)时才重要。

为了具体化这一点,考虑一个 String:它具有长度和组成它的 Unicode 字符。我们可以将 String 包装在 Pin 中,如图 17-8 所示。但是,String 会自动实现 Unpin,Rust 中的大多数其他类型也是如此。

Concurrent work flow
图 17-8:Pin 一个 `String`;虚线表示 `String` 实现了 `Unpin` 特征,因此未被 pin。

因此,我们可以做一些如果 String 实现 !Unpin 而不是 Unpin 则会是非法的事情,例如在与图 17-9 中完全相同的位置用另一个字符串替换一个字符串。这不会违反 Pin 契约,因为 String 没有内部引用使其移动不安全!这正是它实现 Unpin 而不是 !Unpin 的原因。

Concurrent work flow
图 17-9:在内存中用完全不同的 `String` 替换 `String`。

现在我们已经了解了足够多的知识来理解清单 17-17 中 join_all 调用的报告错误。我们最初尝试将 async 块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>> 中,但是正如我们所见,这些 future 可能具有内部引用,因此它们不实现 Unpin。它们需要被 pin,然后我们可以将 Pin 类型传递到 Vec 中,确信 future 中的底层数据不会被移动。

PinUnpin 主要对于构建较低级别的库,或者当您自己构建运行时时才重要,而不是对于日常 Rust 代码。但是,当您在错误消息中看到这些特征时,现在您将更好地了解如何修复代码!

注意:PinUnpin 的这种组合使得在 Rust 中安全地实现一整类复杂的类型成为可能,否则这些类型将因为它们是自引用的而证明具有挑战性。需要 Pin 的类型在当今的 async Rust 中最常见,但您偶尔也可能会在其他上下文中看到它们。

PinUnpin 的工作原理以及它们需要维护的规则的具体细节在 std::pin 的 API 文档中得到了广泛的介绍,因此如果您有兴趣了解更多信息,这是一个很好的起点。

如果您想更详细地了解底层的工作原理,请参阅 24 章 的 Rust 异步编程

Stream 特征

现在您对 FuturePinUnpin 特征有了更深入的了解,我们可以将注意力转向 Stream 特征。正如您在本章前面所了解的那样,stream 类似于异步迭代器。但是,与 IteratorFuture 不同,截至本文撰写之时,Stream 在标准库中没有定义,但是来自整个生态系统中使用的 futures crate 的一个非常常见的定义。

在查看 Stream 特征如何将它们合并在一起之前,让我们回顾一下 IteratorFuture 特征的定义。从 Iterator 中,我们有了序列的概念:它的 next 方法提供了一个 Option<Self::Item>。从 Future 中,我们有了随时间推移的 readiness 的概念:它的 poll 方法提供了一个 Poll<Self::Output>。为了表示随时间推移变为 ready 的项目序列,我们定义了一个 Stream 特征,该特征将这些功能放在一起

#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; } }

Stream 特征定义了一个名为 Item 的关联类型,用于 stream 生成的项目的类型。这类似于 Iterator,其中可能有零到多个项目,但与 Future 不同,Future 始终只有一个 Output,即使它是 unit 类型 ()

Stream 还定义了一个获取这些项目的方法。我们称其为 poll_next,以明确表示它以与 Future::poll 相同的方式轮询,并以与 Iterator::next 相同的方式生成项目序列。其返回类型将 PollOption 结合在一起。外部类型是 Poll,因为它必须检查 readiness,就像 future 一样。内部类型是 Option,因为它需要发出信号,指示是否还有更多消息,就像迭代器一样。

非常类似于此定义的定义很可能最终成为 Rust 标准库的一部分。同时,它是大多数运行时的工具包的一部分,因此您可以依赖它,并且我们接下来涵盖的所有内容通常都应适用!

但是,在我们之前关于 streaming 的部分中看到的示例中,我们没有使用 poll_next Stream,而是使用了 nextStreamExt。我们可以当然,直接根据 poll_next API 手动编写我们自己的 Stream 状态机,就像我们可以通过它们的 poll 方法直接使用 future 一样。但是,使用 await 要好得多,并且 StreamExt 特征提供了 next 方法,因此我们可以做到这一点

#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>; } trait StreamExt: Stream { async fn next(&mut self) -> Option<Self::Item> where Self: Unpin; // other methods... } }

注意:我们在本章前面使用的实际定义看起来与此略有不同,因为它支持尚不支持在特征中使用 async 函数的 Rust 版本。因此,它看起来像这样

fn next(&mut self) -> Next<'_, Self> where Self: Unpin;

Next 类型是一个实现 Futurestruct,并允许我们使用 Next<'_, Self> 命名对 self 的引用的生命周期,以便 await 可以与此方法一起使用。

StreamExt 特征也是所有可用于 stream 的有趣方法的家。StreamExt 会自动为每个实现 Stream 的类型实现,但是这些特征是分开定义的,以便社区可以在不影响基础特征的情况下迭代便利的 API。

trpl crate 中使用的 StreamExt 版本中,该特征不仅定义了 next 方法,还提供了 next 的默认实现,该实现正确处理调用 Stream::poll_next 的细节。这意味着即使您需要编写自己的 streaming 数据类型,您只需实现 Stream,然后任何使用您的数据类型的人都可以自动使用 StreamExt 及其方法。

这就是我们将要介绍的关于这些特征的较低级别细节的全部内容。为了总结,让我们考虑一下 future(包括 stream)、task 和线程如何协同工作!