优雅关机和清理

如我们预期,列表 21-20 中的代码正在通过使用线程池异步响应请求。我们收到一些关于 workersidthread 字段的警告,这些字段我们没有直接使用,这提醒我们没有进行任何清理。当我们使用不太优雅的 ctrl-c 方法来停止主线程时,所有其他线程也会立即停止,即使它们正在处理请求。

接下来,我们将实现 Drop trait,以便在池中的每个线程上调用 join,这样它们就可以在关闭之前完成正在处理的请求。然后,我们将实现一种方法来告诉线程它们应该停止接受新请求并关闭。为了看到这段代码的实际效果,我们将修改我们的服务器,使其在优雅地关闭其线程池之前只接受两个请求。

在我们进行的过程中需要注意一件事:这些都不会影响处理执行闭包的代码部分,因此如果我们使用线程池用于异步运行时,那么这里的一切都将是相同的。

ThreadPool 上实现 Drop Trait

让我们从在我们的线程池上实现 Drop 开始。当池被 drop 时,我们的线程都应该 join 以确保它们完成它们的工作。列表 21-22 展示了 Drop 实现的首次尝试;这段代码还不能完全工作。

文件名: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } }
列表 21-22:当线程池超出作用域时 join 每个线程

首先,我们循环遍历线程池的每个 workers。我们为此使用 &mut,因为 self 是一个可变引用,并且我们也需要能够修改 worker。对于每个 worker,我们打印一条消息,说明这个特定的 worker 正在关闭,然后我们在该 worker 的线程上调用 join。如果调用 join 失败,我们使用 unwrap 使 Rust panic 并进入不优雅的关机。

这是我们编译这段代码时得到的错误

$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference --> src/lib.rs:52:13 | 52 | worker.thread.join().unwrap(); | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call | | | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: `JoinHandle::<T>::join` takes ownership of the receiver `self`, which moves `worker.thread` --> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/std/src/thread/mod.rs:1763:17 | 1763 | pub fn join(self) -> Result<T> { | ^^^^ For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` (lib) due to 1 previous error

错误告诉我们不能调用 join,因为我们只拥有每个 worker 的可变借用,而 join 取得其参数的所有权。为了解决这个问题,我们需要将线程移出拥有 threadWorker 实例,以便 join 可以 consume 该线程。一种方法是采用我们在列表 18-15 中所做的相同方法。如果 Worker 持有一个 Option<thread::JoinHandle<()>>,我们可以调用 Option 上的 take 方法将值移出 Some 变体,并在其位置留下一个 None 变体。换句话说,正在运行的 Worker 将在 thread 中有一个 Some 变体,当我们想要清理 Worker 时,我们会将 Some 替换为 None,以便 Worker 没有线程可以运行。

然而,这种情况 *只* 会在 drop Worker 时出现。作为交换,我们必须在我们访问 worker.thread 的任何地方处理 Option<thread::JoinHandle<()>>。惯用的 Rust 大量使用 Option,但是当你发现自己将某些东西包装在 Option 中作为一种变通方法,即使你知道该项将始终存在时,寻找替代方法是一个好主意。它们可以使你的代码更简洁且不易出错。

在这种情况下,有一个更好的替代方案:Vec::drain 方法。它接受一个范围参数来指定要从 Vec 中删除哪些项,并返回这些项的迭代器。传递 .. 范围语法将从 Vec 中删除 *每个* 值。

因此我们需要像这样更新 ThreadPooldrop 实现

文件名: src/lib.rs
#![allow(unused)] fn main() { use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } } }

这解决了编译器错误,并且不需要对我们的代码进行任何其他更改。

向线程发送信号以停止监听作业

通过我们所做的所有更改,我们的代码编译时没有任何警告。然而,坏消息是这段代码还不能按照我们期望的方式运行。关键在于 Worker 实例的线程运行的闭包中的逻辑:目前,我们调用 join,但这不会关闭线程,因为它们会永远 loop 寻找作业。如果我们尝试使用我们当前的 drop 实现 drop 我们的 ThreadPool,主线程将永远阻塞,等待第一个线程完成。

为了解决这个问题,我们需要更改 ThreadPooldrop 实现,然后更改 Worker 循环。

首先,我们将更改 ThreadPooldrop 实现,以在等待线程完成之前显式 drop sender。列表 21-23 显示了对 ThreadPool 的更改,以显式 drop sender。与 workers 不同,在这里我们 *确实* 需要使用 Option,以便能够使用 Option::takesender 移出 ThreadPool

文件名: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { // --snip-- assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } }
列表 21-23:在 join worker 线程之前显式 drop sender

Drop sender 会关闭通道,这表明不会再发送消息。当这种情况发生时,worker 在无限循环中所做的所有 recv 调用都将返回一个错误。在列表 21-24 中,我们更改了 Worker 循环,以便在这种情况下优雅地退出循环,这意味着当 ThreadPooldrop 实现对它们调用 join 时,线程将完成。

文件名: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread } } }
列表 21-24:当 recv 返回错误时显式跳出循环

为了看到这段代码的实际效果,让我们修改 main 以在优雅地关闭服务器之前只接受两个请求,如列表 21-25 所示。

文件名: src/main.rs
use hello::ThreadPool; use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
列表 21-25:通过退出循环在服务两个请求后关闭服务器

你不会希望一个真实的 Web 服务器在仅服务两个请求后就关闭。这段代码只是演示了优雅关机和清理正在正常工作。

take 方法在 Iterator trait 中定义,并将迭代限制为最多前两项。ThreadPool 将在 main 结束时超出作用域,并且 drop 实现将运行。

使用 cargo run 启动服务器,并发出三个请求。第三个请求应该出错,并且在你的终端中你应该看到类似于这样的输出

$ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.41s Running `target/debug/hello` Worker 0 got a job; executing. Shutting down. Shutting down worker 0 Worker 3 got a job; executing. Worker 1 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 3 disconnected; shutting down. Worker 0 disconnected; shutting down. Shutting down worker 1 Shutting down worker 2 Shutting down worker 3

你可能会看到 worker 和消息打印的不同顺序。我们可以从消息中看到这段代码是如何工作的:worker 0 和 3 收到了前两个请求。服务器在第二个连接后停止接受连接,并且 ThreadPool 上的 Drop 实现甚至在 worker 3 开始其作业之前就开始执行。Drop sender 断开所有 worker 的连接并告诉它们关闭。worker 在断开连接时各自打印一条消息,然后线程池调用 join 以等待每个 worker 线程完成。

注意这次特定执行的一个有趣方面:ThreadPool dropped 了 sender,并且在任何 worker 收到错误之前,我们尝试 join worker 0。Worker 0 尚未从 recv 收到错误,因此主线程阻塞等待 worker 0 完成。与此同时,worker 3 收到了一个作业,然后所有线程都收到了一个错误。当 worker 0 完成时,主线程等待其余 worker 完成。那时,它们都已退出循环并停止。

恭喜!我们现在已经完成了我们的项目;我们有一个基本的 Web 服务器,它使用线程池异步响应。我们能够执行服务器的优雅关机,这会清理池中的所有线程。

这是完整代码供参考

文件名: src/main.rs
use hello::ThreadPool; use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
文件名: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }

我们可以在这里做更多的事情!如果你想继续增强这个项目,这里有一些想法

  • ThreadPool 及其公共方法添加更多文档。
  • 添加库的功能测试。
  • 将对 unwrap 的调用更改为更健壮的错误处理。
  • 使用 ThreadPool 执行除服务 Web 请求以外的其他任务。
  • crates.io 上找到一个线程池 crate,并使用该 crate 实现一个类似的 Web 服务器。然后将其 API 和健壮性与我们实现的线程池进行比较。

总结

做得好!你已经到达了本书的结尾!我们要感谢你加入我们的 Rust 之旅。你现在已经准备好实现你自己的 Rust 项目并帮助其他人的项目。请记住,有一个友好的 Rustacean 社区,他们很乐意帮助你解决你在 Rust 之旅中遇到的任何挑战。