使用消息传递在线程间传输数据
一种日益流行的确保安全并发的方法是消息传递,其中线程或 Actor 通过相互发送包含数据的消息进行通信。下面是来自 Go语言文档 的一个口号:“不要通过共享内存来通信;而是通过通信来共享内存。”
为了实现消息传递并发,Rust 的标准库提供了通道的实现。通道是一个通用的编程概念,数据可以通过它从一个线程发送到另一个线程。
你可以将编程中的通道想象成一个有方向的水道,比如溪流或河流。如果你将一个橡皮鸭放入河流,它会顺流而下到达水道的尽头。
通道有两个部分:一个发送端(transmitter)和一个接收端(receiver)。发送端就像你在上游将橡皮鸭放入河流的位置,接收端就像橡皮鸭最终到达下游的位置。你代码的一部分调用发送端的方法并附带你要发送的数据,另一部分检查接收端是否有到达的消息。如果发送端或接收端被丢弃,则称该通道被关闭。
在这里,我们将构建一个程序,该程序有一个线程生成值并将它们沿通道发送,另一个线程接收这些值并打印出来。我们将使用通道在线程之间发送简单的值来演示此功能。一旦你熟悉了这个技巧,你就可以将通道用于任何需要相互通信的线程,例如聊天系统,或者一个多线程执行计算的一部分并将结果发送给一个线程来汇总结果的系统。
首先,在列表 16-6 中,我们将创建一个通道,但不对其执行任何操作。请注意,这还不会编译通过,因为 Rust 无法知道我们希望通过通道发送什么类型的值。
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx 和 rx我们使用 mpsc::channel 函数创建一个新通道;mpsc 代表 multiple producer, single consumer(多发送者、单接收者)。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个生成值的发送端,但只有一个消费这些值的接收端。想象一下多条溪流汇入一条大河:从任何一条溪流送出的所有东西最终都会汇集到大河的末端。我们暂时从一个发送者开始,但在使这个示例工作起来后,我们将添加多个发送者。
mpsc::channel 函数返回一个元组,第一个元素是发送端——即发送者(transmitter),第二个元素是接收端——即接收者(receiver)。缩写 tx 和 rx 在许多领域传统上分别用于表示 transmitter(发送端)和 receiver(接收端),因此我们将变量命名为 tx 和 rx 来表示每一端。我们正在使用带有模式匹配的 let 语句来解构元组;我们将在第 19 章讨论在 let 语句中使用模式匹配和解构。现在只需知道,以这种方式使用 let 语句是一种方便的方法,可以提取 mpsc::channel 返回的元组的各个部分。
让我们将发送端移动到一个派生线程中,并让它发送一个字符串,这样派生线程就可以与主线程通信了,如列表 16-7 所示。这就像在上游将橡皮鸭放入河流,或者从一个线程向另一个线程发送聊天消息。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
tx 移动到派生线程中并发送 "hi"同样,我们使用 thread::spawn 创建一个新线程,然后使用 move 将 tx 移动到闭包中,这样派生线程就拥有了 tx。派生线程需要拥有发送端才能通过通道发送消息。
发送端有一个 send 方法,它接收我们要发送的值。send 方法返回一个 Result<T, E> 类型,因此如果接收端已经被丢弃而无法发送值,发送操作将返回一个错误。在这个例子中,我们在发生错误时调用 unwrap 来引起 panic。但在实际应用中,我们应该妥善处理:返回第 9 章复习正确的错误处理策略。
在列表 16-8 中,我们将在主线程中从接收端获取值。这就像在河流末端从水中捞出橡皮鸭,或者接收一条聊天消息。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {received}"); }
"hi" 并打印接收端有两个有用的方法:recv 和 try_recv。我们使用的是 recv,它是 receive(接收)的缩写,它会阻塞主线程的执行,直到一个值被发送到通道。一旦一个值被发送,recv 将以 Result<T, E> 的形式返回它。当发送端关闭时,recv 将返回一个错误,表示不会再有值到来。
try_recv 方法不会阻塞,而是立即返回一个 Result<T, E>:如果消息可用,则返回包含消息的 Ok 值;如果当前没有消息,则返回 Err 值。如果线程在等待消息时还有其他工作要做,使用 try_recv 会很有用:我们可以编写一个循环,定期调用 try_recv,如果有消息可用就处理它,否则就先做其他工作一会儿,然后再检查。
在这个例子中,我们使用 recv 是为了简单起见;主线程除了等待消息之外没有其他工作要做,因此阻塞主线程是合适的。
当我们运行列表 16-8 中的代码时,将看到从主线程打印出的值
Got: hi
完美!
通道与所有权转移
所有权规则在消息发送中起着至关重要的作用,因为它们有助于你编写安全的并发代码。在整个 Rust 程序中考虑所有权是防止并发编程中错误的一个优势。让我们做一个实验来展示通道和所有权如何协同工作以防止问题:我们将尝试在派生线程中,在我们通过通道发送 val 值之后,再次使用它。尝试编译列表 16-9 中的代码,看看为什么不允许这样做。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val 发送到通道后使用它这里,我们尝试在通过 tx.send 将 val 发送到通道后打印它。允许这样做是个糟糕的主意:一旦值被发送到另一个线程,该线程可能会在我们再次尝试使用它之前修改或丢弃它。潜在地,由于数据不一致或不存在,另一个线程的修改可能会导致错误或意外结果。然而,如果我们尝试编译列表 16-9 中的代码,Rust 会给出错误:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:26
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
我们的并发错误导致了编译时错误。send 函数取得了其参数的所有权,并且当值被移动时,接收端取得了它的所有权。这阻止了我们在发送值后意外地再次使用它;所有权系统检查一切是否正常。
发送多个值并观察接收端等待
列表 16-8 中的代码编译并运行了,但它没有清楚地向我们展示两个独立的线程正在通过通道相互通信。在列表 16-10 中,我们做了一些修改,以证明列表 16-8 中的代码是并发运行的:派生线程现在将发送多条消息,并在每条消息之间暂停一秒钟。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
这次,派生线程有一个字符串 vector,我们想将其发送到主线程。我们迭代这些字符串,逐个发送,并在每个发送之间调用 thread::sleep 函数暂停一秒钟,使用一个 Duration 值。
在主线程中,我们不再显式调用 recv 函数:相反,我们将 rx 视为一个迭代器。对于接收到的每个值,我们都将其打印出来。当通道关闭时,迭代将结束。
运行列表 16-10 中的代码时,你应该看到以下输出,每行之间暂停一秒钟
Got: hi
Got: from
Got: the
Got: thread
因为主线程中的 for 循环里没有任何暂停或延迟代码,我们可以知道主线程正在等待从派生线程接收值。
通过克隆发送端创建多个发送者
前面我们提到 mpsc 是 multiple producer, single consumer(多发送者、单接收者)的首字母缩写。让我们将 mpsc 付诸实践,扩展列表 16-10 中的代码,创建多个线程,它们都将值发送到同一个接收端。我们可以通过克隆发送端来实现这一点,如列表 16-11 所示。
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
这次,在创建第一个派生线程之前,我们对发送端调用了 clone。这将为我们提供一个新的发送端,我们可以将其传递给第一个派生线程。我们将原始发送端传递给第二个派生线程。这样我们就有两个线程,每个线程都向同一个接收端发送不同的消息。
运行代码时,你的输出应该类似于这样
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
根据你的系统,你可能会看到值以不同的顺序出现。这就是并发既有趣又困难的地方。如果你尝试在不同的线程中使用 thread::sleep 并赋予它不同的值,每次运行将更加不确定,并且每次都会创建不同的输出。
既然我们已经了解了通道的工作原理,接下来让我们看看另一种实现并发的方法。