Async 特征的深入探讨
在本章中,我们以各种方式使用了 Future
、Pin
、Unpin
、Stream
和 StreamExt
特征。然而到目前为止,我们一直避免深入探讨它们的工作原理或它们如何协同工作的细节,这对于您的日常 Rust 工作来说在大多数时候都是可以的。但是,有时您会遇到需要了解更多这些细节的情况。在本节中,我们将进行足够的深入,以帮助您应对这些情况,仍然将真正深入的探讨留给其他文档。
Future
特征
让我们首先仔细看看 Future
特征是如何工作的。以下是 Rust 对它的定义
该特征定义包含一堆新类型,以及一些我们以前没有见过的语法,所以让我们逐个部分地了解这个定义。
首先,Future
的关联类型 Output
说明了 future 解析为哪个值。这类似于 Iterator
特征的 Item
关联类型。其次,Future
也具有 poll
方法,该方法为其 self
参数采用特殊的 Pin
引用,并为 Context
类型采用可变引用,并返回 Poll<Self::Output>
。我们稍后将详细讨论 Pin
和 Context
。现在,让我们关注该方法返回的内容,即 Poll
类型
Poll
类型类似于 Option
。它有一个包含值的变体 Ready(T)
,以及一个不包含值的变体 Pending
。但是,Poll
的含义与 Option
完全不同!Pending
变体表示 future 仍有工作要做,因此调用者需要稍后再次检查。Ready
变体表示 future 已完成其工作,并且 T
值可用。
注意:对于大多数 future,在 future 返回 Ready
后,调用者不应再次调用 poll
。许多 future 在变为 ready 后再次轮询会 panic。可以安全地再次轮询的 Future 会在其文档中明确说明。这类似于 Iterator::next
的行为方式。
当您看到使用 await
的代码时,Rust 会在底层将其编译为调用 poll
的代码。如果您回顾清单 17-4,我们在其中打印了单个 URL 的页面标题,一旦它解析完成,Rust 会将其编译成类似于(虽然不完全是)这样的东西
match page_title(url).poll() {
Ready(page_title) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// But what goes here?
}
}
当 future 仍然是 Pending
时,我们应该怎么办?我们需要某种方法来一遍又一遍地尝试,直到 future 最终 ready。换句话说,我们需要一个循环
let mut page_title_fut = page_title(url);
loop {
match page_title_fut.poll() {
Ready(value) => match page_title {
Some(title) => println!("The title for {url} was {title}"),
None => println!("{url} had no title"),
}
Pending => {
// continue
}
}
}
但是,如果 Rust 将其编译成完全相同的代码,则每个 await
都会是阻塞的——这与我们想要达到的目标完全相反!相反,Rust 确保循环可以将控制权移交给可以暂停此 future 上的工作以处理其他 future,然后再稍后再次检查此 future 的东西。正如我们所见,这个东西是一个 async 运行时,而这种调度和协调工作是其主要工作之一。
在本章前面,我们描述了等待 rx.recv
。recv
调用返回一个 future,而等待 future 会轮询它。我们注意到,运行时将暂停 future,直到它准备好 Some(message)
或通道关闭时的 None
。凭借我们对 Future
特征,特别是 Future::poll
的更深入理解,我们可以看到它是如何工作的。运行时知道 future 在返回 Poll::Pending
时尚未准备就绪。相反,当 poll
返回 Poll::Ready(Some(message))
或 Poll::Ready(None)
时,运行时知道 future 已准备就绪并推进它。
运行时如何做到这一点的确切细节超出了本书的范围,但关键是要了解 future 的基本机制:运行时轮询它负责的每个 future,并在 future 尚未准备就绪时将其放回睡眠状态。
Pin
和 Unpin
特征
当我们在清单 17-16 中介绍 pinning 的概念时,我们遇到了一个非常棘手的错误消息。以下是再次相关的部分
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
此错误消息不仅告诉我们需要 pin 这些值,还告诉我们为什么需要 pinning。trpl::join_all
函数返回一个名为 JoinAll
的 struct。该 struct 是泛型,类型为 F
,F
被约束为实现 Future
特征。直接使用 await
等待 future 会隐式地 pin future。这就是为什么我们不需要在我们想要等待 future 的任何地方都使用 pin!
。
但是,我们在这里没有直接等待 future。相反,我们通过将 future 的集合传递给 join_all
函数来构造一个新的 future JoinAll
。join_all
的签名要求集合中项的类型都实现 Future
特征,并且 Box<T>
仅当它包装的 T
是实现 Unpin
特征的 future 时才实现 Future
。
这需要吸收很多内容!为了真正理解它,让我们更深入地了解 Future
特征的实际工作原理,特别是围绕pinning。
再次查看 Future
特征的定义
cx
参数及其 Context
类型是运行时实际知道何时检查任何给定 future,同时仍然保持惰性的关键。同样,其工作原理的细节超出了本章的范围,您通常只需要在编写自定义 Future
实现时才需要考虑这一点。我们将重点关注 self
的类型,因为这是我们第一次看到 self
具有类型注释的方法。self
的类型注释的工作方式类似于其他函数参数的类型注释,但有两个关键区别
-
它告诉 Rust 方法被调用时
self
必须是什么类型。 -
它不能只是任何类型。它被限制为方法实现的类型,指向该类型的引用或智能指针,或者包装指向该类型的引用的
Pin
。
我们将在 第 18 章 中看到更多关于此语法的内容。现在,知道如果我们想要轮询 future 以检查它是 Pending
还是 Ready(Output)
,我们需要一个 Pin
包装的可变引用到该类型就足够了。
Pin
是指针类类型的包装器,例如 &
、&mut
、Box
和 Rc
。(从技术上讲,Pin
适用于实现 Deref
或 DerefMut
特征的类型,但这实际上等同于仅使用指针。)Pin
本身不是指针,也没有任何自己的行为,例如 Rc
和 Arc
在引用计数方面所做的那样;它纯粹是编译器可以用来强制执行指针使用约束的工具。
回想一下,await
是根据对 poll
的调用来实现的,这开始解释了我们之前看到的错误消息,但那是关于 Unpin
而不是 Pin
的。那么 Pin
与 Unpin
究竟有什么关系?为什么 Future
需要 self
在 Pin
类型中才能调用 poll
呢?
回想一下本章前面内容,future 中的一系列 await 点被编译成一个状态机,编译器确保状态机遵循 Rust 关于安全性的所有常规规则,包括借用和所有权。为了使其工作,Rust 会查看一个 await 点和下一个 await 点或 async 块的末尾之间需要哪些数据。然后,它在编译的状态机中创建一个相应的变体。每个变体都获得访问权限,以访问将在源代码的该部分中使用的数据,无论是通过获取该数据的所有权,还是通过获取对该数据的可变或不可变引用。
到目前为止,一切都很好:如果我们在给定的 async 块中的所有权或引用方面做错了任何事情,借用检查器会告诉我们。当我们想要移动与该块对应的 future 时——例如将其移动到 Vec
中以传递给 join_all
——事情变得棘手起来。
当我们移动一个 future 时——无论是通过将其推送到数据结构中以用作 join_all
的迭代器,还是通过从函数返回它——这实际上意味着移动 Rust 为我们创建的状态机。与 Rust 中的大多数其他类型不同,Rust 为 async 块创建的 future 最终可能会在任何给定变体的字段中包含对自身的引用,如图 17-4 中的简化图所示。
但是,默认情况下,任何具有自身引用的对象移动都是不安全的,因为引用始终指向它们所引用的任何内容的实际内存地址(参见图 17-5)。如果您移动数据结构本身,这些内部引用将指向旧位置。但是,该内存位置现在无效。首先,当您更改数据结构时,其值将不会更新。更重要的一点是,计算机现在可以自由地将该内存重新用于其他目的!您最终可能会稍后读取完全不相关的数据。
从理论上讲,Rust 编译器可以尝试在每次移动对象时更新对对象的所有引用,但这可能会增加很多性能开销,尤其是在需要更新整个引用网络的情况下。如果我们可以确保所讨论的数据结构在内存中不移动,我们就不必更新任何引用。这正是 Rust 的借用检查器要求的:在安全代码中,它会阻止您移动任何具有活动引用的项。
Pin
基于此为我们提供了我们需要的确切保证。当我们通过将指向该值的指针包装在 Pin
中来pin一个值时,它就不能再移动了。因此,如果您有 Pin<Box<SomeType>>
,您实际上 pin 了 SomeType
值,而不是 Box
指针。图 17-6 说明了这个过程。
事实上,Box
指针仍然可以自由移动。请记住:我们关心的是确保最终被引用的数据保持在原位。如果指针移动,但它指向的数据在同一位置,如图 17-7 所示,则没有潜在的问题。作为一个独立的练习,查看类型以及 std::pin
模块的文档,并尝试找出如何使用包装 Box
的 Pin
来做到这一点。)关键是自引用类型本身不能移动,因为它仍然被 pin 了。
但是,即使大多数类型碰巧在 Pin
指针后面,移动它们也是完全安全的。我们只需要在项目具有内部引用时才考虑 pinning。诸如数字和布尔值之类的原始值是安全的,因为它们显然没有任何内部引用,因此它们显然是安全的。您通常在 Rust 中使用的大多数类型也不是。例如,您可以移动 Vec
而无需担心。仅根据我们目前所见,如果您有 Pin<Vec<String>>
,即使 Vec<String>
在没有其他对其的引用时始终可以安全移动,您也必须通过 Pin
提供的安全但限制性的 API 来完成所有操作。我们需要一种方法来告诉编译器,在这种情况下移动项目是可以的——这就是 Unpin
发挥作用的地方。
Unpin
是一个标记特征,类似于我们在第 16 章中看到的 Send
和 Sync
特征,因此它本身没有功能。标记特征的存在只是为了告诉编译器,在特定上下文中,使用实现给定特征的类型是安全的。Unpin
通知编译器,给定类型不需要维护关于所讨论的值是否可以安全移动的任何保证。
就像 Send
和 Sync
一样,编译器会自动为所有它可以证明安全的类型实现 Unpin
。一个特殊情况,再次类似于 Send
和 Sync
,是 Unpin
未为类型实现的情况。这种情况的表示法是 impl !Unpin for SomeType
,其中 SomeType
是一个类型的名称,该类型确实需要维护这些保证,以便在 Pin
中使用指向该类型的指针时安全。
换句话说,关于 Pin
和 Unpin
之间的关系,需要记住两件事。首先,Unpin
是“正常”情况,而 !Unpin
是特殊情况。其次,类型是否实现 Unpin
或 !Unpin
仅在您使用指向该类型的 pin 指针(如 Pin<&mut SomeType>
)时才重要。
为了具体化这一点,考虑一个 String
:它具有长度和组成它的 Unicode 字符。我们可以将 String
包装在 Pin
中,如图 17-8 所示。但是,String
会自动实现 Unpin
,Rust 中的大多数其他类型也是如此。
因此,我们可以做一些如果 String
实现 !Unpin
而不是 Unpin
则会是非法的事情,例如在与图 17-9 中完全相同的位置用另一个字符串替换一个字符串。这不会违反 Pin
契约,因为 String
没有内部引用使其移动不安全!这正是它实现 Unpin
而不是 !Unpin
的原因。
现在我们已经了解了足够多的知识来理解清单 17-17 中 join_all
调用的报告错误。我们最初尝试将 async 块生成的 future 移动到 Vec<Box<dyn Future<Output = ()>>>
中,但是正如我们所见,这些 future 可能具有内部引用,因此它们不实现 Unpin
。它们需要被 pin,然后我们可以将 Pin
类型传递到 Vec
中,确信 future 中的底层数据不会被移动。
Pin
和 Unpin
主要对于构建较低级别的库,或者当您自己构建运行时时才重要,而不是对于日常 Rust 代码。但是,当您在错误消息中看到这些特征时,现在您将更好地了解如何修复代码!
注意:Pin
和 Unpin
的这种组合使得在 Rust 中安全地实现一整类复杂的类型成为可能,否则这些类型将因为它们是自引用的而证明具有挑战性。需要 Pin
的类型在当今的 async Rust 中最常见,但您偶尔也可能会在其他上下文中看到它们。
Pin
和 Unpin
的工作原理以及它们需要维护的规则的具体细节在 std::pin
的 API 文档中得到了广泛的介绍,因此如果您有兴趣了解更多信息,这是一个很好的起点。
Stream
特征
现在您对 Future
、Pin
和 Unpin
特征有了更深入的了解,我们可以将注意力转向 Stream
特征。正如您在本章前面所了解的那样,stream 类似于异步迭代器。但是,与 Iterator
和 Future
不同,截至本文撰写之时,Stream
在标准库中没有定义,但是来自整个生态系统中使用的 futures
crate 的一个非常常见的定义。
在查看 Stream
特征如何将它们合并在一起之前,让我们回顾一下 Iterator
和 Future
特征的定义。从 Iterator
中,我们有了序列的概念:它的 next
方法提供了一个 Option<Self::Item>
。从 Future
中,我们有了随时间推移的 readiness 的概念:它的 poll
方法提供了一个 Poll<Self::Output>
。为了表示随时间推移变为 ready 的项目序列,我们定义了一个 Stream
特征,该特征将这些功能放在一起
Stream
特征定义了一个名为 Item
的关联类型,用于 stream 生成的项目的类型。这类似于 Iterator
,其中可能有零到多个项目,但与 Future
不同,Future
始终只有一个 Output
,即使它是 unit 类型 ()
。
Stream
还定义了一个获取这些项目的方法。我们称其为 poll_next
,以明确表示它以与 Future::poll
相同的方式轮询,并以与 Iterator::next
相同的方式生成项目序列。其返回类型将 Poll
与 Option
结合在一起。外部类型是 Poll
,因为它必须检查 readiness,就像 future 一样。内部类型是 Option
,因为它需要发出信号,指示是否还有更多消息,就像迭代器一样。
非常类似于此定义的定义很可能最终成为 Rust 标准库的一部分。同时,它是大多数运行时的工具包的一部分,因此您可以依赖它,并且我们接下来涵盖的所有内容通常都应适用!
但是,在我们之前关于 streaming 的部分中看到的示例中,我们没有使用 poll_next
或 Stream
,而是使用了 next
和 StreamExt
。我们可以当然,直接根据 poll_next
API 手动编写我们自己的 Stream
状态机,就像我们可以通过它们的 poll
方法直接使用 future 一样。但是,使用 await
要好得多,并且 StreamExt
特征提供了 next
方法,因此我们可以做到这一点
注意:我们在本章前面使用的实际定义看起来与此略有不同,因为它支持尚不支持在特征中使用 async 函数的 Rust 版本。因此,它看起来像这样
fn next(&mut self) -> Next<'_, Self> where Self: Unpin;
Next
类型是一个实现 Future
的 struct
,并允许我们使用 Next<'_, Self>
命名对 self
的引用的生命周期,以便 await
可以与此方法一起使用。
StreamExt
特征也是所有可用于 stream 的有趣方法的家。StreamExt
会自动为每个实现 Stream
的类型实现,但是这些特征是分开定义的,以便社区可以在不影响基础特征的情况下迭代便利的 API。
在 trpl
crate 中使用的 StreamExt
版本中,该特征不仅定义了 next
方法,还提供了 next
的默认实现,该实现正确处理调用 Stream::poll_next
的细节。这意味着即使您需要编写自己的 streaming 数据类型,您只需实现 Stream
,然后任何使用您的数据类型的人都可以自动使用 StreamExt
及其方法。
这就是我们将要介绍的关于这些特征的较低级别细节的全部内容。为了总结,让我们考虑一下 future(包括 stream)、task 和线程如何协同工作!