使用消息传递在线程之间传输数据

一种越来越流行的确保安全并发的方法是_消息传递_,其中线程或参与者通过相互发送包含数据的_消息_进行通信。以下是Go 语言文档中的一句口号:“不要通过共享内存进行通信;而应该通过通信来共享内存。”

为了实现消息发送并发,Rust 的标准库提供了一个_通道_的实现。通道是一个通用的编程概念,通过它可以将数据从一个线程发送到另一个线程。

您可以将编程中的通道想象成一个有方向的水道,例如溪流或河流。如果您将橡皮鸭之类的东西放入河流中,它将顺流而下到达水道的尽头。

一个通道有两部分:发送端和接收端。发送端是您将橡皮鸭放入河流的上游位置,而接收端是橡皮鸭最终到达的下游位置。您代码的一部分使用要发送的数据调用发送端上的方法,而另一部分则检查接收端是否有消息到达。如果发送端或接收端被丢弃,则称通道已_关闭_。

在这里,我们将编写一个程序,该程序有一个线程用于生成值并将它们发送到通道,而另一个线程将接收这些值并打印出来。我们将使用通道在线程之间发送简单的值,以说明该功能。一旦您熟悉了该技术,就可以将通道用于需要相互通信的任何线程,例如聊天系统或多个线程执行计算的一部分并将这些部分发送到一个线程来汇总结果的系统。

首先,在代码清单 16-6 中,我们将创建一个通道,但不做任何事情。请注意,这还不能编译,因为 Rust 无法判断我们想通过通道发送什么类型的值。

文件名:src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

代码清单 16-6:创建一个通道并将两部分分别赋值给 txrx

我们使用 mpsc::channel 函数创建一个新通道;mpsc 代表_多生产者,单消费者_。简而言之,Rust 标准库实现通道的方式意味着一个通道可以有多个_发送_端来生成值,但只有一个_接收_端来消费这些值。想象一下多条溪流汇聚成一条大河:从任何一条溪流发送下来的所有东西最终都会汇聚到一条河流中。我们现在将从一个生产者开始,但是当我们让这个例子工作时,我们将添加多个生产者。

mpsc::channel 函数返回一个元组,其第一个元素是发送端(发送器),第二个元素是接收端(接收器)。缩写 txrx 在许多领域传统上分别用于表示_发送器_和_接收器_,因此我们将变量命名为此类名称以指示每一端。我们使用了一个带有模式的 let 语句来解构元组;我们将在第 18 章中讨论 let 语句中模式的使用和解构。现在,请记住,以这种方式使用 let 语句是提取 mpsc::channel 返回的元组的便捷方法。

让我们将发送端移动到一个派生线程中,并让它发送一个字符串,以便派生线程与主线程通信,如代码清单 16-7 所示。这就像把一只橡皮鸭放在上游的河流中,或者从一个线程向另一个线程发送聊天消息。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

代码清单 16-7:将 tx 移动到派生线程并发送“hi”

同样,我们使用 thread::spawn 创建一个新线程,然后使用 movetx 移动到闭包中,以便派生线程拥有 tx。派生线程需要拥有发送器才能通过通道发送消息。发送器有一个 send 方法,它接收我们要发送的值。send 方法返回一个 Result<T, E> 类型,因此如果接收器已经被丢弃并且没有地方发送值,则发送操作将返回一个错误。在本例中,我们调用 unwrap 在出错时发生 panic。但在实际应用程序中,我们会正确处理它:回到第 9 章,回顾正确处理错误的策略。

在代码清单 16-8 中,我们将从主线程中的接收器获取值。这就像在河的尽头从水中取回橡皮鸭,或者接收一条聊天消息。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

代码清单 16-8:在主线程中接收值“hi”并打印它

接收器有两个有用的方法:recvtry_recv。我们使用的是 recv,它是 receive 的缩写,它将阻塞主线程的执行并等待,直到一个值被发送到通道中。一旦一个值被发送,recv 将在 Result<T, E> 中返回它。当发送器关闭时,recv 将返回一个错误,表示不会再有值到来。

try_recv 方法不会阻塞,而是会立即返回一个 Result<T, E>:如果有一个消息可用,则返回一个包含消息的 Ok 值,如果这次没有任何消息,则返回一个 Err 值。如果该线程在等待消息时还有其他工作要做,那么使用 try_recv 非常有用:我们可以编写一个循环,每隔一段时间调用一次 try_recv,如果有一个消息可用就处理它,否则做其他工作一段时间后再检查。

为了简单起见,我们在本例中使用了 recv;除了等待消息之外,我们没有其他工作要让主线程做,所以阻塞主线程是合适的。

当我们运行代码清单 16-8 中的代码时,我们将看到从主线程打印的值

Got: hi

完美!

通道和所有权转移

所有权规则在消息发送中起着至关重要的作用,因为它们可以帮助您编写安全的并发代码。防止并发编程中的错误是您在整个 Rust 程序中考虑所有权的优势。让我们做一个实验来展示通道和所有权如何协同工作以防止问题:我们将尝试在将 val 值发送到通道后,在派生线程中使用它。尝试编译代码清单 16-9 中的代码,看看为什么不允许这样做

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}

代码清单 16-9:尝试在将 val 发送到通道后使用它

在这里,我们尝试在通过 tx.sendval 发送到通道后打印它。允许这样做是一个坏主意:一旦该值被发送到另一个线程,该线程可能会在我们再次尝试使用该值之前修改或丢弃它。潜在地,由于数据不一致或不存在,其他线程的修改可能会导致错误或意外结果。但是,如果我们尝试编译代码清单 16-9 中的代码,Rust 会给我们一个错误

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
help: consider cloning the value if the performance cost is acceptable
   |
9  |         tx.send(val.clone()).unwrap();
   |                    ++++++++

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

我们的并发错误导致了一个编译时错误。send 函数获取其参数的所有权,当值被移动时,接收器获取其所有权。这阻止了我们在发送值后意外地再次使用它;所有权系统会检查一切是否正常。

发送多个值并查看接收器正在等待

代码清单 16-8 中的代码编译并运行了,但它没有清楚地向我们展示两个独立的线程正在通过通道相互通信。在代码清单 16-10 中,我们做了一些修改,这些修改将证明代码清单 16-8 中的代码是并发运行的:派生线程现在将发送多条消息,并在每条消息之间暂停一秒钟。

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}

代码清单 16-10:发送多条消息并在每条消息之间暂停

这一次,派生线程有一个字符串向量,我们想把它发送给主线程。我们迭代它们,分别发送每个字符串,并通过调用 thread::sleep 函数并传入 1 秒的 Duration 值来在每个字符串之间暂停。

在主线程中,我们不再显式调用 recv 函数:相反,我们将 rx 视为一个迭代器。对于接收到的每个值,我们都将其打印出来。当通道关闭时,迭代将结束。

当运行代码清单 16-10 中的代码时,您应该会看到以下输出,每行之间暂停 1 秒

Got: hi
Got: from
Got: the
Got: thread

因为我们在主线程的 for 循环中没有任何暂停或延迟的代码,所以我们可以知道主线程正在等待接收来自派生线程的值。

通过克隆发送器创建多个生产者

之前我们提到过,mpsc多个生产者,单个消费者 的首字母缩写。让我们使用 mpsc 并扩展代码清单 16-10 中的代码,以创建多个线程,所有线程都向同一个接收器发送值。我们可以通过克隆发送器来做到这一点,如代码清单 16-11 所示

文件名:src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}

代码清单 16-11:从多个生产者发送多条消息

这一次,在我们创建第一个派生线程之前,我们在发送器上调用 clone。这将给我们一个新的发送器,我们可以将其传递给第一个派生线程。我们将原始发送器传递给第二个派生线程。这给了我们两个线程,每个线程都向一个接收器发送不同的消息。

当您运行代码时,您的输出应该类似于这样

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

您可能会看到值的顺序不同,这取决于您的系统。这就是并发有趣且困难的原因。如果您尝试使用 thread::sleep,在不同的线程中给它不同的值,每次运行都会更加不确定,并每次都创建不同的输出。

现在我们已经了解了通道是如何工作的,让我们来看看另一种并发方法。