流:序列中的 Future

在本章到目前为止,我们主要坚持使用单个 future。一个大的例外是我们使用的异步通道。回想一下在本章前面 “消息传递”部分中,我们如何使用异步通道的接收器。异步 recv 方法会随着时间的推移产生一系列项目。这是称为的更通用模式的一个实例。

我们在第 13 章中回顾了项目序列,当时我们在 迭代器特性和 next 方法部分中查看了 Iterator 特性,但迭代器和异步通道接收器之间有两个区别。第一个区别是时间:迭代器是同步的,而通道接收器是异步的。第二个是 API。当直接使用 Iterator 时,我们调用其同步的 next 方法。对于特定的 trpl::Receiver 流,我们调用了异步的 recv 方法。否则,这些 API 在其他方面感觉非常相似,而这种相似性并非巧合。流就像迭代的异步形式。然而,虽然 trpl::Receiver 专门等待接收消息,但通用的流 API 更广泛:它以 Iterator 的方式提供下一个项目,但它是异步的。

Rust 中迭代器和流之间的相似性意味着我们实际上可以从任何迭代器创建一个流。与迭代器一样,我们可以通过调用流的 next 方法然后等待输出来使用流,如清单 17-30 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
清单 17-30:从迭代器创建流并打印其值

我们从一个数字数组开始,将其转换为迭代器,然后调用 map 将所有值加倍。然后,我们使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来,我们使用 while let 循环遍历流中的项目。

不幸的是,当我们尝试运行代码时,它没有编译,而是报告没有可用的 next 方法

error[E0599]: no method named `next` found for struct `Iter` in the current scope --> src/main.rs:10:40 | 10 | while let Some(value) = stream.next().await { | ^^^^ | = note: the full type name has been written to 'file:///projects/async_await/target/debug/deps/async_await-9de943556a6001b8.long-type-1281356139287206597.txt' = note: consider using `--verbose` to print the full type name to the console = help: items from traits can only be used if the trait is in scope help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them | 1 + use crate::trpl::StreamExt; | 1 + use futures_util::stream::stream::StreamExt; | 1 + use std::iter::Iterator; | 1 + use std::str::pattern::Searcher; | help: there is a method `try_next` with a similar name | 10 | while let Some(value) = stream.try_next().await { | ~~~~~~~~

正如这个输出解释的那样,编译器错误的原因是我们需要正确的特性在作用域内才能使用 next 方法。鉴于我们目前的讨论,您可能会合理地期望该特性是 Stream,但实际上是 StreamExtExtextension(扩展)的缩写,是 Rust 社区中用于使用另一个特性扩展一个特性的常见模式。

我们将在本章末尾更详细地解释 StreamStreamExt 特性,但现在您只需要知道 Stream 特性定义了一个低级接口,该接口有效地结合了 IteratorFuture 特性。StreamExtStream 之上提供了一组更高级别的 API,包括 next 方法以及其他类似于 Iterator 特性提供的实用程序方法。StreamStreamExt 尚未成为 Rust 标准库的一部分,但大多数生态系统 crate 都使用相同的定义。

修复编译器错误的方法是为 trpl::StreamExt 添加一个 use 语句,如清单 17-31 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
清单 17-31:成功地使用迭代器作为流的基础

将所有这些部分放在一起后,这段代码就可以按我们想要的方式工作了!更重要的是,现在我们有了作用域内的 StreamExt,我们可以使用它的所有实用程序方法,就像使用迭代器一样。例如,在清单 17-32 中,我们使用 filter 方法过滤掉除三和五的倍数以外的所有内容。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
清单 17-32:使用 StreamExt::filter 方法过滤流

当然,这没什么意思,因为我们可以使用普通的迭代器来做同样的事情,而无需任何异步操作。让我们看看我们可以做些什么流独有的。

组合流

许多概念自然地表示为流:队列中可用的项目、当完整数据集对于计算机内存来说太大时从文件系统中增量拉取的数据块,或者随着时间推移通过网络到达的数据。由于流是 future,我们可以将它们与任何其他类型的 future 一起使用,并以有趣的方式组合它们。例如,我们可以批量处理事件以避免触发过多的网络调用,为长时间运行的操作序列设置超时,或者限制用户界面事件以避免做不必要的工作。

让我们从构建一个小的消息流开始,作为我们可能从 WebSocket 或其他实时通信协议中看到的数据流的替代,如清单 17-33 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
清单 17-33:使用 rx 接收器作为 ReceiverStream

首先,我们创建一个名为 get_messages 的函数,该函数返回 impl Stream<Item = String>。对于其实现,我们创建一个异步通道,循环遍历英文字母表的前 10 个字母,并将它们发送到通道上。

我们还使用了一种新类型:ReceiverStream,它将 trpl::channel 中的 rx 接收器转换为具有 next 方法的 Stream。回到 main 中,我们使用 while let 循环打印来自流的所有消息。

当我们运行这段代码时,我们得到了完全符合我们预期的结果

Message: 'a' Message: 'b' Message: 'c' Message: 'd' Message: 'e' Message: 'f' Message: 'g' Message: 'h' Message: 'i' Message: 'j'

同样,我们可以使用常规的 Receiver API 甚至常规的 Iterator API 来做到这一点,尽管如此,所以让我们添加一个需要流的功能:为流中的每个项目添加超时,并在我们发出的项目上添加延迟,如清单 17-34 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
清单 17-34:使用 StreamExt::timeout 方法为流中的项目设置时间限制

我们首先使用来自 StreamExt 特性的 timeout 方法向流添加超时。然后我们更新 while let 循环的主体,因为流现在返回一个 ResultOk 变体表示消息及时到达;Err 变体表示在任何消息到达之前超时已过。我们对结果进行 match,并在成功接收到消息时打印消息,或者打印有关超时的通知。最后,请注意,我们在将超时应用于消息后固定消息,因为超时助手生成一个需要固定的流才能被轮询。

但是,由于消息之间没有延迟,因此此超时不会更改程序的行为。让我们向我们发送的消息添加可变延迟,如清单 17-35 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
清单 17-35:通过 tx 发送消息,并使用异步延迟,而无需使 get_messages 成为异步函数

get_messages 中,我们将 enumerate 迭代器方法与 messages 数组一起使用,以便我们可以获取每个项目的索引以及项目本身。然后,我们对偶数索引项目应用 100 毫秒的延迟,对奇数索引项目应用 300 毫秒的延迟,以模拟我们可能从真实世界中的消息流中看到的不同延迟。由于我们的超时时间为 200 毫秒,因此这应该会影响一半的消息。

为了在 get_messages 函数中不阻塞地休眠消息之间,我们需要使用异步。但是,我们不能将 get_messages 本身变成异步函数,因为那样我们会返回 Future<Output = Stream<Item = String>> 而不是 Stream<Item = String>>。调用者必须等待 get_messages 本身才能访问流。但是请记住:给定 future 中的所有内容都是线性发生的;并发发生在 future之间。等待 get_messages 将要求它发送所有消息,包括每个消息之间的睡眠延迟,然后再返回接收器流。结果,超时将是无用的。流本身不会有延迟;它们都会在流甚至可用之前发生。

相反,我们将 get_messages 保留为一个返回流的常规函数,并且我们生成一个任务来处理异步 sleep 调用。

注意:以这种方式调用 spawn_task 可以工作,因为我们已经设置了运行时;如果我们没有这样做,它会导致 panic。其他实现选择不同的权衡:它们可能会生成一个新的运行时并避免 panic,但最终会产生一些额外的开销,或者它们可能根本不提供在没有运行时引用的情况下生成独立任务的方式。请确保您知道您的运行时选择了什么权衡,并相应地编写您的代码!

现在我们的代码有了更有趣的结果。每隔一对消息之间,都会出现 Problem: Elapsed(()) 错误。

Message: 'a' Problem: Elapsed(()) Message: 'b' Message: 'c' Problem: Elapsed(()) Message: 'd' Message: 'e' Problem: Elapsed(()) Message: 'f' Message: 'g' Problem: Elapsed(()) Message: 'h' Message: 'i' Problem: Elapsed(()) Message: 'j'

超时不会阻止消息最终到达。我们仍然收到所有原始消息,因为我们的通道是无界的:它可以容纳尽可能多的消息,只要我们能放入内存中即可。如果消息在超时之前未到达,我们的流处理程序将对此进行说明,但是当它再次轮询流时,消息可能现在已经到达。

如果需要,您可以通过使用其他类型的通道或其他类型的流来获得不同的行为。让我们通过将时间间隔流与此消息流组合在一起来实践一下。

合并流

首先,让我们创建另一个流,如果我们让它直接运行,它将每毫秒发出一个项目。为简单起见,我们可以使用 sleep 函数在延迟后发送消息,并将其与我们在 get_messages 中使用的从通道创建流的相同方法结合使用。不同之处在于,这次我们将返回已过去的时间间隔计数,因此返回类型将是 impl Stream<Item = u32>,我们可以将该函数称为 get_intervals(参见清单 17-36)。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
清单 17-36:创建一个计数器流,该计数器将每毫秒发出一次

我们首先在任务中定义一个 count。(我们也可以在任务外部定义它,但限制任何给定变量的作用域更清晰。)然后我们创建一个无限循环。循环的每次迭代都会异步休眠一毫秒,增加计数,然后通过通道发送它。因为这一切都包含在 spawn_task 创建的任务中,所以所有这些(包括无限循环)都将与运行时一起清理。

这种无限循环,仅在整个运行时被拆除时才结束,在异步 Rust 中相当常见:许多程序需要无限期地运行。使用异步,只要每次循环迭代中至少有一个 await 点,这就不会阻塞任何其他内容。

现在,回到我们 main 函数的异步块中,我们可以尝试合并 messagesintervals 流,如清单 17-37 所示。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals(); let merged = messages.merge(intervals); while let Some(result) = merged.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
清单 17-37:尝试合并 messagesintervals

我们首先调用 get_intervals。然后我们使用 merge 方法合并 messagesintervals 流,该方法将多个流组合成一个流,该流在项目可用后立即从任何源流生成项目,而不会施加任何特定的排序。最后,我们循环遍历组合流而不是 messages

此时,messagesintervals 都不需要固定或可变,因为它们都将被组合成单个 merged 流。但是,对 merge 的调用不会编译!(while let 循环中的 next 调用也不会编译,但我们稍后会回到这一点。)这是因为这两个流具有不同的类型。messages 流的类型为 Timeout<impl Stream<Item = String>>,其中 Timeout 是为 timeout 调用实现 Stream 的类型。intervals 流的类型为 impl Stream<Item = u32>。为了合并这两个流,我们需要转换其中一个流以匹配另一个流。我们将重做 intervals 流,因为 messages 已经处于我们想要的基本格式,并且必须处理超时错误(参见清单 17-38)。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
清单 17-38:使 intervals 流的类型与 messages 流的类型对齐

首先,我们可以使用 map 辅助方法将 intervals 转换为字符串。其次,我们需要匹配来自 messagesTimeout。但是,因为我们实际上不想要 intervals 的超时,所以我们可以创建一个比我们正在使用的其他持续时间更长的超时。在这里,我们使用 Duration::from_secs(10) 创建一个 10 秒的超时。最后,我们需要使 stream 可变,以便 while let 循环的 next 调用可以迭代流,并固定它,以便安全地执行此操作。这使我们几乎达到了我们需要达到的目标。一切类型检查。但是,如果您运行此代码,则会出现两个问题。首先,它永远不会停止!您需要使用 ctrl-c 停止它。其次,来自英文字母表的消息将被埋没在所有间隔计数器消息中

--snip-- Interval: 38 Interval: 39 Interval: 40 Message: 'a' Interval: 41 Interval: 42 Interval: 43 --snip--

清单 17-39 显示了一种解决最后两个问题的方法。

文件名:src/main.rs
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
清单 17-39:使用 throttletake 管理合并的流

首先,我们在 intervals 流上使用 throttle 方法,使其不会压倒 messages 流。节流是一种限制调用函数速率的方法—或者,在这种情况下,限制流被轮询的频率。每 100 毫秒一次应该可以,因为这大致是我们消息到达的频率。

为了限制我们将从流中接受的项目数量,我们将 take 方法应用于 merged 流,因为我们想要限制最终输出,而不仅仅是一个流或另一个流。

现在,当我们运行程序时,它会在从流中拉取 20 个项目后停止,并且间隔不会压倒消息。我们也没有得到 Interval: 100Interval: 200 等,而是得到 Interval: 1Interval: 2 等—即使我们有一个源流可以每毫秒生成一个事件。这是因为 throttle 调用生成了一个新的流,该流包装了原始流,以便仅以节流速率而不是其自身的“本机”速率轮询原始流。我们没有选择忽略大量未处理的间隔消息。相反,我们首先从未生成这些间隔消息!这就是 Rust 的 future 固有的“惰性”再次发挥作用,使我们能够选择我们的性能特征。

Interval: 1 Message: 'a' Interval: 2 Interval: 3 Problem: Elapsed(()) Interval: 4 Message: 'b' Interval: 5 Message: 'c' Interval: 6 Interval: 7 Problem: Elapsed(()) Interval: 8 Message: 'd' Interval: 9 Message: 'e' Interval: 10 Interval: 11 Problem: Elapsed(()) Interval: 12

最后一件我们需要处理的事情是:错误!对于这两个基于通道的流,当通道的另一端关闭时,send 调用可能会失败—这只是运行时如何执行构成流的 future 的问题。到目前为止,我们通过调用 unwrap 忽略了这种可能性,但在行为良好的应用程序中,我们应该显式地处理错误,至少通过结束循环,这样我们就不会尝试发送更多消息。清单 17-40 显示了一个简单的错误策略:打印问题,然后从循环中 break

extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
清单 17-40:处理错误并关闭循环

与往常一样,处理消息发送错误的正确方法会有所不同;只需确保您有一个策略即可。

现在我们已经看到了许多异步实践,让我们退后一步,深入探讨 FutureStream 以及 Rust 用于使异步工作的其他关键特性的几个细节。