使用 Async 应用并发

在本节中,我们将把 async 应用到第 16 章中使用线程处理的一些相同的并发挑战中。因为我们已经在那里讨论了很多关键思想,所以在本节中,我们将重点关注线程和 future 之间的不同之处。

在许多情况下,使用 async 进行并发的 API 与使用线程的 API 非常相似。在其他情况下,它们最终会变得非常不同。即使线程和 async 之间的 API 看起来 相似,它们通常也具有不同的行为——并且它们几乎总是具有不同的性能特征。

使用 spawn_task 创建新任务

我们在 使用 Spawn 创建新线程中处理的第一个操作是在两个单独的线程上计数。让我们使用 async 做同样的事情。trpl crate 提供了一个 spawn_task 函数,它看起来非常类似于 thread::spawn API,以及一个 sleep 函数,它是 thread::sleep API 的 async 版本。我们可以将它们一起使用来实现计数示例,如清单 17-6 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
清单 17-6: 创建一个新任务来打印一件事,而主任务打印另一件事

作为我们的起点,我们使用 trpl::run 设置我们的 main 函数,以便我们的顶层函数可以是 async 的。

注意:从本章的这一点开始,每个示例都将包含与 maintrpl::run 完全相同的包装代码,因此我们将经常像对 main 一样跳过它。不要忘记将其包含在您的代码中!

然后我们在该代码块中编写两个循环,每个循环都包含一个 trpl::sleep 调用,它等待半秒(500 毫秒)然后发送下一条消息。我们将一个循环放在 trpl::spawn_task 的主体中,另一个循环放在顶层 for 循环中。我们还在 sleep 调用后添加一个 await

此代码的行为类似于基于线程的实现——包括您在运行时可能会在自己的终端中看到消息以不同的顺序出现

hi number 1 from the second task! hi number 1 from the first task! hi number 2 from the first task! hi number 2 from the second task! hi number 3 from the first task! hi number 3 from the second task! hi number 4 from the first task! hi number 4 from the second task! hi number 5 from the first task!

此版本在主 async 代码块主体中的 for 循环完成后立即停止,因为 spawn_task 生成的任务在 main 函数结束时被关闭。如果您希望它一直运行到任务完成,您将需要使用 join handle 来等待第一个任务完成。对于线程,我们使用 join 方法“阻塞”直到线程完成运行。在清单 17-7 中,我们可以使用 await 来做同样的事情,因为任务句柄本身就是一个 future。它的 Output 类型是 Result,所以我们在等待它之后也对其进行解包。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
清单 17-7: 使用 await 和 join handle 来运行任务直到完成

这个更新后的版本运行直到两个循环都完成。

hi number 1 from the second task! hi number 1 from the first task! hi number 2 from the first task! hi number 2 from the second task! hi number 3 from the first task! hi number 3 from the second task! hi number 4 from the first task! hi number 4 from the second task! hi number 5 from the first task! hi number 6 from the first task! hi number 7 from the first task! hi number 8 from the first task! hi number 9 from the first task!

到目前为止,async 和线程看起来给出了相同的基本结果,只是语法不同:使用 await 而不是在 join handle 上调用 join,以及等待 sleep 调用。

更大的区别是我们不需要生成另一个操作系统线程来执行此操作。事实上,我们甚至不需要在这里生成一个任务。由于 async 代码块编译为匿名 future,我们可以将每个循环放在一个 async 代码块中,并让运行时使用 trpl::join 函数运行它们直到完成。

使用 join Handles 等待所有线程完成部分中,我们展示了如何在调用 std::thread::spawn 时返回的 JoinHandle 类型上使用 join 方法。trpl::join 函数类似,但用于 future。当您给它两个 future 时,它会生成一个新的 future,其输出是一个元组,其中包含您传入的每个 future 的输出,一旦它们完成。因此,在清单 17-8 中,我们使用 trpl::join 来等待 fut1fut2 完成。我们等待 fut1fut2,而是等待 trpl::join 生成的新 future。我们忽略输出,因为它只是一个包含两个单元值的元组。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
清单 17-8: 使用 trpl::join 来等待两个匿名 future

当我们运行它时,我们看到两个 future 都运行完成

hi number 1 from the first task! hi number 1 from the second task! hi number 2 from the first task! hi number 2 from the second task! hi number 3 from the first task! hi number 3 from the second task! hi number 4 from the first task! hi number 4 from the second task! hi number 5 from the first task! hi number 6 from the first task! hi number 7 from the first task! hi number 8 from the first task! hi number 9 from the first task!

现在,您每次都会看到完全相同的顺序,这与我们在线程中看到的非常不同。这是因为 trpl::join 函数是公平的,这意味着它会平等地检查每个 future,在它们之间交替,并且永远不会让一个 future 在另一个 future 准备好时抢先。对于线程,操作系统决定检查哪个线程以及让它运行多长时间。对于 async Rust,运行时决定检查哪个任务。(在实践中,细节变得复杂,因为 async 运行时可能会在底层使用操作系统线程作为其管理并发的一部分,因此保证公平性对于运行时来说可能需要更多的工作——但这仍然是可能的!)运行时不必保证任何给定操作的公平性,并且它们通常提供不同的 API,让您选择是否需要公平性。

尝试一些等待 future 的变体,看看它们会做什么

  • 从任一或两个循环周围删除 async 代码块。
  • 在定义每个 async 代码块后立即等待它。
  • 仅将第一个循环包装在 async 代码块中,并在第二个循环的主体之后等待生成的 future。

对于额外的挑战,看看您是否可以在运行代码之前弄清楚每种情况下的输出将是什么!

使用消息传递在两个任务上计数

future 之间共享数据也将是熟悉的:我们将再次使用消息传递,但这次使用类型和函数的 async 版本。我们将采取与 使用消息传递在线程之间传输数据中不同的路径,以说明基于线程和基于 future 的并发之间的一些关键差异。在清单 17-9 中,我们将从一个单独的 async 代码块开始——而不是生成一个单独的任务,就像我们生成一个单独的线程一样。

文件名: src/main.rs
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
清单 17-9: 创建一个 async 通道并将两个半部分分配给 txrx

在这里,我们使用 trpl::channel,它是第 16 章中我们用于线程的多生产者、单消费者通道 API 的 async 版本。API 的 async 版本与基于线程的版本只有一点不同:它使用可变的而不是不可变的接收器 rx,并且它的 recv 方法生成一个我们需要等待的 future,而不是直接生成值。现在我们可以从发送者向接收者发送消息。请注意,我们不必生成单独的线程甚至任务;我们只需要等待 rx.recv 调用。

std::mpsc::channel 中的同步 Receiver::recv 方法会阻塞,直到它收到消息。trpl::Receiver::recv 方法不会,因为它是 async 的。它不会阻塞,而是将控制权交还给运行时,直到收到消息或通道的发送端关闭。相比之下,我们不等待 send 调用,因为它不会阻塞。它不需要阻塞,因为我们发送到的通道是无界的。

注意:由于所有这些 async 代码都在 trpl::run 调用中的 async 代码块中运行,因此其中的所有内容都可以避免阻塞。但是,外部的代码将阻塞 run 函数返回。这就是 trpl::run 函数的全部意义:它让您选择在某组 async 代码上阻塞的位置,从而选择在同步代码和 async 代码之间转换的位置。在大多数 async 运行时中,run 实际上被命名为 block_on,正是出于这个原因。

请注意此示例中的两件事。首先,消息将立即到达。其次,虽然我们在这里使用了 future,但还没有并发。清单中的所有内容都按顺序发生,就像没有涉及 future 一样。

让我们通过发送一系列消息并在它们之间休眠来解决第一部分,如清单 17-10 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); }
清单 17-10: 通过 async 通道发送和接收多条消息,并在每条消息之间使用 await 休眠

除了发送消息之外,我们还需要接收它们。在这种情况下,因为我们知道有多少消息进来,我们可以通过手动调用 rx.recv().await 四次来手动完成。然而,在现实世界中,我们通常会等待一些未知数量的消息,因此我们需要继续等待,直到我们确定没有更多消息为止。

在清单 16-10 中,我们使用 for 循环来处理从同步通道接收的所有项目。然而,Rust 尚未提供一种方法来编写遍历异步项目系列的 for 循环,因此我们需要使用我们以前没有见过的循环:while let 条件循环。这是我们在 使用 if letlet else 的简洁控制流部分中看到的 if let 结构的循环版本。只要它指定的模式继续匹配该值,循环就会继续执行。

rx.recv 调用生成一个 future,我们等待它。运行时将暂停 future,直到它准备好。一旦消息到达,future 将解析为 Some(message),次数与消息到达的次数一样多。当通道关闭时,无论是否有任何消息到达,future 都会解析为 None,以指示没有更多值,因此我们应该停止轮询——也就是说,停止等待。

while let 循环将所有这些组合在一起。如果调用 rx.recv().await 的结果是 Some(message),我们可以访问该消息,并且可以在循环体中使用它,就像我们可以使用 if let 一样。如果结果是 None,则循环结束。每次循环完成时,它都会再次命中 await 点,因此运行时会再次暂停它,直到另一条消息到达。

代码现在成功发送和接收所有消息。不幸的是,仍然存在一些问题。首先,消息不是以半秒的间隔到达的。它们在程序启动后 2 秒(2000 毫秒)后一次性到达。其次,此程序也永远不会退出!相反,它永远等待新消息。您需要使用 ctrl-c 关闭它。

让我们首先检查为什么消息在完整延迟后一次性到达,而不是在每条消息之间都有延迟地到达。在给定的 async 代码块中,await 关键字在代码中出现的顺序也是程序运行时执行它们的顺序。

清单 17-10 中只有一个 async 代码块,因此其中的所有内容都线性运行。仍然没有并发。所有 tx.send 调用都会发生,穿插着所有 trpl::sleep 调用及其关联的 await 点。只有这样,while let 循环才会遍历 recv 调用上的任何 await 点。

为了获得我们想要的行为,即休眠延迟发生在每条消息之间,我们需要将 txrx 操作放在它们自己的 async 代码块中,如清单 17-11 所示。然后运行时可以使用 trpl::join 分别执行它们,就像在计数示例中一样。再次强调,我们等待调用 trpl::join 的结果,而不是单独的 future。如果我们按顺序等待单独的 future,我们将最终回到顺序流中——这正是我们试图要做的事情。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
清单 17-11: 将 sendrecv 分隔到它们自己的 async 代码块中并等待这些代码块的 future

使用清单 17-11 中的更新代码,消息以 500 毫秒的间隔打印,而不是在 2 秒后一次性冲出来。

但是,程序仍然永远不会退出,因为 while let 循环与 trpl::join 交互的方式

  • trpl::join 返回的 future 仅在传递给它的两个 future 都完成后才会完成。
  • tx future 在发送 vals 中的最后一条消息后完成休眠后完成。
  • rx future 在 while let 循环结束之前不会完成。
  • 只有等待 rx.recv 产生 None 时,while let 循环才会结束。
  • 只有在通道的另一端关闭后,等待 rx.recv 才会返回 None
  • 只有当我们调用 rx.close 或当发送者端 tx 被丢弃时,通道才会关闭。
  • 我们没有在任何地方调用 rx.close,并且 tx 在传递给 trpl::run 的最外层 async 代码块结束之前不会被丢弃。
  • 代码块无法结束,因为它被阻塞在 trpl::join 完成上,这会将我们带回到此列表的顶部。

我们可以通过在某处调用 rx.close 来手动关闭 rx,但这没有多大意义。在处理任意数量的消息后停止将使程序关闭,但我们可能会遗漏消息。我们需要一些其他方法来确保 tx 在函数结束之前被丢弃。

现在,我们发送消息的 async 代码块仅借用 tx,因为发送消息不需要所有权,但如果我们可以将 tx 移动到该 async 代码块中,它将在该代码块结束后被丢弃。在第 13 章 捕获引用或移动所有权部分中,您学习了如何在闭包中使用 move 关键字,并且,正如第 16 章 move 闭包与线程一起使用部分中所讨论的,我们在使用线程时经常需要将数据移动到闭包中。相同的基本动态适用于 async 代码块,因此 move 关键字在 async 代码块中的工作方式与在闭包中的工作方式相同。

在清单 17-12 中,我们将用于发送消息的代码块从 async 更改为 async move。当我们运行版本的代码时,它会在发送和接收最后一条消息后优雅地关闭。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
清单 17-12: 清单 17-11 中代码的修订版,在完成时正确关闭

此 async 通道也是一个多生产者通道,因此如果我们想从多个 future 发送消息,我们可以调用 tx 上的 clone,如清单 17-13 所示。

文件名: src/main.rs
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
清单 17-13: 将多个生产者与 async 代码块一起使用

首先,我们克隆 tx,在第一个 async 代码块外部创建 tx1。我们将 tx1 移动到该代码块中,就像我们之前对 tx 所做的那样。然后,稍后,我们将原始 tx 移动到一个新的 async 代码块中,我们在其中以稍慢的延迟发送更多消息。我们碰巧将这个新的 async 代码块放在接收消息的 async 代码块之后,但它也可以放在它之前。关键是 future 被等待的顺序,而不是它们被创建的顺序。

用于发送消息的两个 async 代码块都需要是 async move 代码块,以便 txtx1 都在这些代码块完成时被丢弃。否则,我们将最终回到我们开始时的相同无限循环中。最后,我们从 trpl::join 切换到 trpl::join3 以处理额外的 future。

现在我们看到了来自两个发送 future 的所有消息,并且由于发送 future 在发送后使用略有不同的延迟,因此消息也以这些不同的间隔接收。

received 'hi' received 'more' received 'from' received 'the' received 'messages' received 'future' received 'for' received 'you'

这是一个好的开始,但这将我们限制为仅处理少量 future:两个使用 join,或三个使用 join3。让我们看看我们如何处理更多的 future。