共享状态并发
消息传递是处理并发的一种好方法,但它不是唯一的方法。另一种方法是多个线程访问相同的共享数据。再次考虑Go语言文档中的这部分口号:“不要通过共享内存来通信。”
通过共享内存进行通信是什么样的?此外,为什么消息传递的爱好者会告诫不要使用内存共享?
从某种程度上说,任何编程语言中的通道都类似于单一所有权,因为一旦你通过通道转移了一个值,就不应该再使用那个值了。共享内存并发则类似于多重所有权:多个线程可以同时访问同一个内存位置。正如你在第15章看到的,智能指针使得多重所有权成为可能,多重所有权会增加复杂性,因为需要管理这些不同的所有者。Rust的类型系统和所有权规则极大地帮助我们正确地进行这种管理。例如,我们来看看互斥锁(mutexes),它是共享内存中最常见的并发原语之一。
使用互斥锁一次只允许一个线程访问数据
Mutex 是 mutual exclusion(互斥)的缩写,意味着互斥锁在任何给定时间只允许一个线程访问某些数据。要访问互斥锁中的数据,线程必须首先通过请求获取互斥锁的锁来表明它想要访问。锁是互斥锁的一部分,它是一个数据结构,用于跟踪当前谁拥有对数据的排他访问权。因此,互斥锁被描述为通过锁定系统来守护它所持有的数据。
互斥锁因其难以使用而闻名,因为你必须记住两条规则:
- 在使用数据之前,必须尝试获取锁。
- 使用完互斥锁守护的数据后,必须解锁数据,以便其他线程可以获取锁。
对于互斥锁的一个现实世界中的比喻,想象一下会议中的小组讨论,只有一个麦克风。在小组成员发言之前,他们必须请求或示意他们想要使用麦克风。当他们拿到麦克风后,他们可以想说多久就说多久,然后把麦克风递给下一个请求发言的小组成员。如果一个小组成员用完麦克风后忘记递出去,就没有其他人能说话了。如果共享麦克风的管理出了问题,小组讨论就不会按计划进行!
互斥锁的管理可能非常棘手,这也是为什么很多人对通道充满热情的原因。然而,感谢Rust的类型系统和所有权规则,你不会搞错锁定和解锁。
Mutex<T> 的 API
为了说明如何使用互斥锁,我们首先在单线程上下文中使用互斥锁,如列表 16-12 所示。
use std::sync::Mutex; fn main() { let m = Mutex::new(5); { let mut num = m.lock().unwrap(); *num = 6; } println!("m = {m:?}"); }
Mutex<T> 的 API,以简化说明与许多类型一样,我们使用关联函数 new 创建一个 Mutex<T>。要访问互斥锁内部的数据,我们使用 lock 方法获取锁。这个调用会阻塞当前线程,使其无法执行任何工作,直到轮到我们拥有锁。
如果另一个持有锁的线程发生了 panic,lock 调用会失败。在这种情况下,没有人能够再次获取锁,因此我们选择使用 unwrap,如果发生这种情况,当前线程会 panic。
获取锁后,我们可以将返回值(在此例中名为 num)视为内部数据的可变引用。类型系统确保我们在使用 m 中的值之前获取锁。m 的类型是 Mutex<i32>,而不是 i32,所以我们必须调用 lock 才能使用 i32 值。我们不能忘记;否则类型系统不会让我们访问内部的 i32。
正如你可能猜到的,Mutex<T> 是一个智能指针。更准确地说,lock 调用返回一个名为 MutexGuard 的智能指针,它被封装在一个 LockResult 中,我们通过调用 unwrap 来处理它。MutexGuard 智能指针实现了 Deref trait,指向我们的内部数据;这个智能指针还实现了 Drop trait,当 MutexGuard 超出作用域时(这发生在内部作用域结束时),它会自动释放锁。因此,我们不会忘记释放锁而导致互斥锁被其他线程阻塞,因为锁的释放是自动发生的。
释放锁后,我们可以打印互斥锁的值,并看到我们成功地将内部的 i32 更改为 6。
在多个线程之间共享 Mutex<T>
现在我们尝试使用 Mutex<T> 在多个线程之间共享一个值。我们将启动 10 个线程,让它们每个都将计数器值加 1,这样计数器就会从 0 增加到 10。列表 16-13 中的示例会产生编译器错误,我们将利用这个错误来学习更多关于使用 Mutex<T> 以及 Rust 如何帮助我们正确使用它的知识。
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Mutex<T> 守护的计数器我们创建一个 counter 变量,用于在 Mutex<T> 中存放一个 i32 值,就像我们在列表 16-12 中做的那样。接下来,我们通过遍历一个数字范围来创建 10 个线程。我们使用 thread::spawn 并给所有线程同一个闭包:这个闭包将计数器移动到线程中,通过调用 lock 方法获取 Mutex<T> 的锁,然后将互斥锁中的值加 1。当一个线程完成运行其闭包后,num 将超出作用域并释放锁,这样另一个线程就可以获取它。
在主线程中,我们收集所有的 join handle。然后,正如我们在列表 16-2 中所做的那样,我们对每个 handle 调用 join 以确保所有线程完成。此时,主线程将获取锁并打印程序的运行结果。
我们曾暗示这个例子不会编译。现在我们来找出原因!
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0382]: borrow of moved value: `counter`
--> src/main.rs:21:29
|
5 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 | for _ in 0..10 {
| -------------- inside of this loop
9 | let handle = thread::spawn(move || {
| ------- value moved into closure here, in previous iteration of loop
...
21 | println!("Result: {}", *counter.lock().unwrap());
| ^^^^^^^ value borrowed here after move
|
help: consider moving the expression out of the loop so it is only moved once
|
8 ~ let mut value = counter.lock();
9 ~ for _ in 0..10 {
10 | let handle = thread::spawn(move || {
11 ~ let mut num = value.unwrap();
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
错误消息指出,在循环的上一次迭代中,counter 值被移动了。Rust 告诉我们不能将互斥锁 counter 的所有权移动到多个线程中。让我们使用第 15 章讨论过的多重所有权方法来修复这个编译器错误。
多线程下的多重所有权
在第 15 章中,我们通过使用智能指针 Rc<T> 创建一个引用计数的值,从而将一个值赋予多个所有者。让我们在这里也这样做,看看会发生什么。我们将在列表 16-14 中将 Mutex<T> 封装在 Rc<T> 中,并在将所有权移动到线程之前克隆 Rc<T>。
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
Rc<T> 允许多个线程拥有 Mutex<T>再次编译,我们得到……不同的错误!编译器正在教我们很多东西。
$ cargo run
Compiling shared-state v0.1.0 (file:///projects/shared-state)
error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ------------- ^------
| | |
| ______________________|_____________within this `{closure@src/main.rs:11:36: 11:43}`
| | |
| | required by a bound introduced by this call
12 | | let mut num = counter.lock().unwrap();
13 | |
14 | | *num += 1;
15 | | });
| |_________^ `Rc<Mutex<i32>>` cannot be sent between threads safely
|
= help: within `{closure@src/main.rs:11:36: 11:43}`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
note: required because it's used within this closure
--> src/main.rs:11:36
|
11 | let handle = thread::spawn(move || {
| ^^^^^^^
note: required by a bound in `spawn`
--> file:///home/.rustup/toolchains/1.85/lib/rustlib/src/rust/library/std/src/thread/mod.rs:731:8
|
728 | pub fn spawn<F, T>(f: F) -> JoinHandle<T>
| ----- required by a bound in this function
...
731 | F: Send + 'static,
| ^^^^ required by this bound in `spawn`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `shared-state` (bin "shared-state") due to 1 previous error
哇,这个错误消息非常冗长!这里是需要关注的重要部分:`Rc<Mutex<i32>>` cannot be sent between threads safely(`Rcthe trait `Send` is not implemented for `Rc<Mutex<i32>>`(trait `Send` 没有为 `RcSend trait:它是确保我们在并发情境下使用的类型适合用于线程的 trait 之一。
不幸的是,Rc<T> 不安全地在线程之间共享。当 Rc<T> 管理引用计数时,每次调用 clone 时增加计数,每次 clone 被 drop 时减少计数。但它没有使用任何并发原语来确保对计数的更改不会被另一个线程中断。这可能导致错误的计数——微妙的 bug,进而可能导致内存泄漏或在值使用完毕之前就被 drop。我们需要的是一个与 Rc<T> 完全相似,但以线程安全的方式更改引用计数的类型。
使用 Arc<T> 进行原子引用计数
幸运的是,Arc<T> 是一种像 Rc<T> 一样在并发情境下安全使用的类型。字母 a 代表 atomic,意味着它是一种原子引用计数类型。原子类型是另一种并发原语,我们在这里不会详细介绍:请参阅标准库文档 std::sync::atomic获取更多详细信息。目前,你只需要知道原子类型的工作方式类似于基本类型,但可以在线程之间安全共享。
你可能想知道为什么不是所有的基本类型都是原子类型,以及为什么标准库类型默认不使用 Arc<T> 实现。原因是线程安全会带来性能损失,只有当你真正需要时才想承担这个代价。如果你只是在单个线程内对值进行操作,代码如果不需要强制执行原子类型提供的保证,运行速度会更快。
回到我们的例子:Arc<T> 和 Rc<T> 具有相同的 API,因此我们通过更改 use 行、调用 new 和调用 clone 来修复程序。列表 16-15 中的代码最终将编译并运行。
use std::sync::{Arc, Mutex}; use std::thread; fn main() { let counter = Arc::new(Mutex::new(0)); let mut handles = vec![]; for _ in 0..10 { let counter = Arc::clone(&counter); let handle = thread::spawn(move || { let mut num = counter.lock().unwrap(); *num += 1; }); handles.push(handle); } for handle in handles { handle.join().unwrap(); } println!("Result: {}", *counter.lock().unwrap()); }
Arc<T> 包装 Mutex<T>,使其能够在多个线程之间共享所有权这段代码将打印如下内容:
Result: 10
我们做到了!我们从 0 数到了 10,这可能看起来不那么令人印象深刻,但它确实教会了我们很多关于 Mutex<T> 和线程安全的知识。你也可以使用这个程序的结构来执行比简单递增计数器更复杂的操作。使用这种策略,你可以将计算分解为独立的各个部分,将这些部分分配给不同的线程,然后使用 Mutex<T> 让每个线程用自己的部分来更新最终结果。
注意,如果你正在进行简单的数值操作,标准库的 std::sync::atomic 模块提供了比 Mutex<T> 类型更简单的类型。这些类型提供了对基本类型的安全、并发、原子访问。在这个例子中,我们选择将 Mutex<T> 与一个基本类型一起使用,以便我们可以专注于 Mutex<T> 的工作方式。
RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 之间的相似性
你可能已经注意到 counter 是不可变的,但我们可以获得其内部值的可变引用;这意味着 Mutex<T> 提供了内部可变性(interior mutability),就像 Cell 系列那样。正如我们在第 15 章中使用 RefCell<T> 允许我们修改 Rc<T> 内部的内容一样,我们使用 Mutex<T> 来修改 Arc<T> 内部的内容。
另一个需要注意的细节是,当你使用 Mutex<T> 时,Rust 不能保护你免受所有类型的逻辑错误。回顾第 15 章,使用 Rc<T> 存在创建引用循环的风险,即两个 Rc<T> 值相互引用,导致内存泄漏。类似地,Mutex<T> 存在创建死锁的风险。死锁发生在一个操作需要锁定两个资源,而两个线程各自已经获取了其中一个锁,导致它们永远互相等待。如果你对死锁感兴趣,可以尝试创建一个会发生死锁的 Rust 程序;然后研究任何语言中针对互斥锁的死锁缓解策略,并尝试在 Rust 中实现它们。Mutex<T> 和 MutexGuard 的标准库 API 文档提供了有用的信息。
我们将在本章的最后讨论 Send 和 Sync trait,以及如何将它们与自定义类型一起使用。