将我们的单线程服务器转换为多线程服务器
目前,服务器将依次处理每个请求,这意味着在第一个请求完成处理之前,它不会处理第二个连接。如果服务器收到越来越多的请求,这种串行执行的效率会越来越低。如果服务器收到一个需要很长时间处理的请求,后续请求将不得不等待该长时间请求完成,即使新请求可以快速处理。我们需要修复这个问题,但首先,我们将看看实际操作中的问题。
在当前服务器实现中模拟慢速请求
我们将看看慢速处理请求如何影响对我们当前服务器实现的其他请求。清单 20-10 实现了处理对 * /sleep * 的请求,并模拟了一个慢速响应,该响应将导致服务器在响应之前休眠 5 秒。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
清单 20-10:通过休眠 5 秒来模拟慢速请求
现在我们有了三个用例,我们从 if
切换到 match
。我们需要显式匹配 request_line
的切片,以针对字符串字面值进行模式匹配;match
不像等式方法那样进行自动引用和解引用。
第一个分支与清单 20-9 中的 if
块相同。第二个分支匹配对 * /sleep * 的请求。收到该请求后,服务器将在呈现成功的 HTML 页面之前休眠 5 秒。第三个分支与清单 20-9 中的 else
块相同。
您可以看到我们的服务器是多么原始:真正的库将以一种不太冗长的方式处理对多个请求的识别!
使用 cargo run
启动服务器。然后打开两个浏览器窗口:一个用于 * http://127.0.0.1:7878/ *,另一个用于 * http://127.0.0.1:7878/sleep *。如果您像以前一样多次输入 * / * URI,您会看到它响应很快。但是,如果您输入 * /sleep *,然后加载 * / *,您会看到 * / * 会一直等待 sleep
休眠满 5 秒后才加载。
我们可以使用多种技术来避免请求在慢速请求之后备份;我们将实现的技术是线程池。
使用线程池提高吞吐量
线程池是一组已生成并等待处理任务的线程。当程序收到新任务时,它会为该任务分配池中的一个线程,并且该线程将处理该任务。池中的其余线程可用于处理第一个线程正在处理时收到的任何其他任务。当第一个线程完成其任务处理后,它会返回到空闲线程池,准备处理新任务。线程池允许您并发处理连接,从而提高服务器的吞吐量。
我们将池中的线程数限制为少量,以保护我们免受拒绝服务 (DoS) 攻击;如果我们的程序为每个传入的请求创建一个新线程,则向我们的服务器发出 1000 万个请求的人可能会耗尽我们服务器的所有资源,并使请求处理陷入停顿,从而造成严重破坏。
因此,我们不会生成无限数量的线程,而是让固定数量的线程在池中等待。传入的请求被发送到池中进行处理。池将维护一个传入请求队列。池中的每个线程都会从该队列中弹出一个请求,处理该请求,然后向队列请求另一个请求。使用这种设计,我们可以并发处理最多 N
个请求,其中 N
是线程数。如果每个线程都在响应一个长时间运行的请求,则后续请求仍可以在队列中备份,但是我们已经增加了在达到该点之前可以处理的长时间运行请求的数量。
此技术只是提高 Web 服务器吞吐量的众多方法之一。您可以探索的其他选项是 fork/join 模型、单线程异步 I/O 模型或 多线程异步 I/O 模型。如果您对此主题感兴趣,可以阅读有关其他解决方案的更多信息并尝试实施它们;使用像 Rust 这样的低级语言,所有这些选项都是可能的。
在我们开始实现线程池之前,让我们讨论一下使用池应该是什么样子。当您尝试设计代码时,首先编写客户端接口可以帮助指导您的设计。编写代码的 API,使其按照您想要调用它的方式进行结构化;然后在该结构中实现功能,而不是先实现功能,然后再设计公共 API。
与我们在第 12 章的项目中使用测试驱动开发类似,我们将在此处使用编译器驱动开发。我们将编写调用我们想要的函数的代码,然后我们将查看来自编译器的错误,以确定接下来应该更改什么以使代码正常工作。但是,在我们这样做之前,我们将探讨一种我们不打算用作起点的技术。
为每个请求生成一个线程
首先,让我们探讨一下,如果我们的代码确实为每个连接创建一个新线程,它的外观可能会如何。如前所述,这不是我们的最终计划,因为可能会生成无限数量的线程的问题,但它是首先获得正常工作的多线程服务器的起点。然后,我们将添加线程池作为改进,并且比较这两种解决方案会更容易。清单 20-11 显示了对 main
进行的更改,以在 for
循环中生成一个新线程来处理每个流。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
清单 20-11:为每个流生成一个新线程
正如您在第 16 章中了解到的,thread::spawn
将创建一个新线程,然后在新线程中运行闭包中的代码。如果运行此代码并在浏览器中加载 * /sleep *,然后在另外两个浏览器选项卡中加载 * / *,您会发现对 * / * 的请求不必等待 * /sleep * 完成。但是,正如我们提到的,这最终会使系统不堪重负,因为您将无限制地创建新线程。
创建有限数量的线程
我们希望我们的线程池以类似、熟悉的方式工作,以便从线程切换到线程池不需要对使用我们 API 的代码进行较大的更改。清单 20-12 显示了我们想要使用的 ThreadPool
结构的假设接口,而不是 thread::spawn
。
文件名:src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
清单 20-12:我们理想的 ThreadPool
接口
我们使用 ThreadPool::new
创建一个具有可配置线程数的新线程池,在本例中为四个。然后,在 for
循环中,pool.execute
具有与 thread::spawn
类似的接口,它接受一个闭包,该池应为每个流运行该闭包。我们需要实现 pool.execute
,使其接受闭包并将其交给池中的一个线程来运行。此代码尚未编译,但我们将尝试这样做,以便编译器可以指导我们如何修复它。
使用编译器驱动开发构建 ThreadPool
将清单 20-12 中的更改应用于 * src/main.rs *,然后让我们使用 cargo check
中的编译器错误来驱动我们的开发。这是我们得到的第一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
太棒了!此错误告诉我们需要 ThreadPool
类型或模块,因此我们现在将构建一个。我们的 ThreadPool
实现将独立于我们的 Web 服务器正在执行的工作类型。因此,让我们将 hello
crate 从二进制 crate 切换到库 crate,以保存我们的 ThreadPool
实现。在我们切换到库 crate 之后,我们还可以将单独的线程池库用于我们想要使用线程池进行的任何工作,而不仅仅是用于服务 Web 请求。
创建一个 * src/lib.rs *,其中包含以下内容,这是我们目前可以拥有的 ThreadPool
结构的最简单定义
文件名:src/lib.rs
pub struct ThreadPool;
然后编辑 * main.rs * 文件,通过将以下代码添加到 * src/main.rs * 的顶部,从库 crate 将 ThreadPool
引入作用域
文件名:src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
此代码仍然不起作用,但让我们再次检查它以获取我们需要解决的下一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
此错误表明接下来我们需要为 ThreadPool
创建一个名为 new
的关联函数。我们也知道 new
需要有一个参数,该参数可以接受 4
作为参数,并且应该返回一个 ThreadPool
实例。让我们实现最简单的 new
函数,该函数将具有这些特征
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我们选择 usize
作为 size
参数的类型,因为我们知道负数的线程数没有任何意义。我们也知道我们将使用此 4 作为线程集合中的元素数,这正是 usize
类型的用途,如 “整数类型”第 3 章的节中讨论。
让我们再次检查代码
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
现在出现错误是因为 ThreadPool
上没有 execute
方法。回想一下 “创建有限数量的线程”节,我们决定我们的线程池应该具有类似于 thread::spawn
的接口。此外,我们将实现 execute
函数,使其接受给它的闭包,并将其交给池中的空闲线程来运行。
我们将定义 ThreadPool
上的 execute
方法,以将闭包作为参数。回想一下 “将捕获的值移出闭包和 Fn
特征”第 13 章的节,我们可以将闭包作为具有三个不同特征的参数:Fn
、FnMut
和 FnOnce
。我们需要决定在这里使用哪种闭包。我们知道最终会做一些类似于标准库 thread::spawn
实现的事情,因此我们可以查看 thread::spawn
的签名对其参数的约束。文档向我们展示了以下内容
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F
类型参数是我们在这里关心的;T
类型参数与返回值有关,而我们不关心这一点。我们可以看到 spawn
使用 FnOnce
作为 F
的特征约束。这可能也是我们想要的,因为我们最终会将我们在 execute
中获得的参数传递给 spawn
。我们可以进一步确信 FnOnce
是我们想要使用的特征,因为用于运行请求的线程只会执行该请求的闭包一次,这与 FnOnce
中的 Once
匹配。
F
类型参数还具有特征约束 Send
和生命周期约束 'static
,这在我们的情况下很有用:我们需要 Send
将闭包从一个线程传输到另一个线程,并且需要 'static
,因为我们不知道该线程需要多长时间才能执行。让我们在 ThreadPool
上创建一个 execute
方法,该方法将接受具有这些约束的类型 F
的泛型参数
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们仍然在 FnOnce
之后使用 ()
,因为这个 FnOnce
代表一个不接受任何参数并返回单元类型 ()
的闭包。就像函数定义一样,返回类型可以从签名中省略,但即使我们没有参数,仍然需要括号。
再次强调,这是 execute
方法最简单的实现:它什么也不做,但我们只是想让我们的代码能够编译。让我们再次检查一下。
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.24s
它可以编译了!但请注意,如果您尝试 cargo run
并在浏览器中发出请求,您将在浏览器中看到本章开头看到的错误。我们的库实际上还没有调用传递给 execute
的闭包!
注意:关于像 Haskell 和 Rust 这样具有严格编译器的语言,您可能会听到一种说法:“如果代码可以编译,它就能工作。” 但这种说法并非普遍正确。我们的项目可以编译,但它绝对什么都不做!如果我们正在构建一个真正的完整项目,现在应该开始编写单元测试来检查代码是否可以编译并且具有我们想要的行为。
验证 new
中的线程数
我们没有对 new
和 execute
的参数做任何处理。让我们用我们想要的行为来实现这些函数的主体。首先,让我们考虑一下 new
。之前我们为 size
参数选择了无符号类型,因为具有负数线程的池没有任何意义。但是,具有零线程的池也没有任何意义,而零是一个完全有效的 usize
。我们将添加代码来检查 size
是否大于零,然后再返回 ThreadPool
实例,如果它收到零,则使用 assert!
宏使程序 panic,如清单 20-13 所示。
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
清单 20-13:实现 ThreadPool::new
以在 size
为零时 panic
我们还使用文档注释为我们的 ThreadPool
添加了一些文档。请注意,我们遵循了良好的文档实践,添加了一个部分,其中指出了我们的函数可能 panic 的情况,如第 14 章所述。尝试运行 cargo doc --open
并单击 ThreadPool
结构,以查看为 new
生成的文档是什么样的!
我们没有像这里一样添加 assert!
宏,而是可以将 new
更改为 build
并像我们在清单 12-9 中的 I/O 项目中使用 Config::build
一样返回 Result
。但我们在此案例中决定,尝试创建没有任何线程的线程池应该是一个不可恢复的错误。如果您雄心勃勃,请尝试编写一个名为 build
的函数,其签名如下,以与 new
函数进行比较。
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建空间来存储线程
现在我们有了一种方法来知道我们有有效的线程数要存储在池中,我们可以创建这些线程并将它们存储在 ThreadPool
结构中,然后再返回该结构。但是我们如何“存储”一个线程呢?让我们再次看看 thread::spawn
的签名。
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
函数返回一个 JoinHandle<T>
,其中 T
是闭包返回的类型。让我们也尝试使用 JoinHandle
,看看会发生什么。在我们的例子中,我们传递给线程池的闭包将处理连接并且不返回任何内容,因此 T
将是单元类型 ()
。
清单 20-14 中的代码可以编译,但尚未创建任何线程。我们已经更改了 ThreadPool
的定义,使其保存一个 thread::JoinHandle<()>
实例的向量,使用 size
容量初始化该向量,设置一个 for
循环来运行一些代码以创建线程,并返回一个包含这些线程的 ThreadPool
实例。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
清单 20-14:创建一个向量,供 ThreadPool
保存线程
我们已将 std::thread
带入库 crate 的作用域,因为我们使用 thread::JoinHandle
作为 ThreadPool
中向量项的类型。
一旦收到有效的大小,我们的 ThreadPool
将创建一个可以容纳 size
个项的新向量。with_capacity
函数执行与 Vec::new
相同的任务,但有一个重要的区别:它预先分配了向量中的空间。因为我们知道我们需要在向量中存储 size
个元素,所以预先进行此分配比使用在插入元素时调整自身大小的 Vec::new
稍微有效率。
当您再次运行 cargo check
时,它应该成功。
一个负责将代码从 ThreadPool
发送到线程的 Worker
结构
我们在清单 20-14 中 for
循环中留下了关于线程创建的注释。在这里,我们将看看我们如何实际创建线程。标准库提供了 thread::spawn
作为创建线程的方法,并且 thread::spawn
期望在创建线程后立即获取线程应该运行的一些代码。但是,在我们的例子中,我们希望创建线程并让它们等待我们稍后发送的代码。标准库的线程实现不包括执行此操作的任何方式;我们必须手动实现它。
我们将通过在 ThreadPool
和将管理此新行为的线程之间引入新的数据结构来实现此行为。我们将此数据结构称为 Worker,这是池实现中的常用术语。Worker 拾取需要运行的代码并在 Worker 的线程中运行该代码。想象一下在餐厅厨房工作的人:工人在收到客户的订单之前会等待,然后他们负责接收这些订单并完成它们。
我们不会在线程池中存储 JoinHandle<()>
实例的向量,而是存储 Worker
结构的实例。每个 Worker
将存储单个 JoinHandle<()>
实例。然后,我们将实现一个 Worker
的方法,该方法将获取一个要运行的代码闭包,并将其发送到已运行的线程以执行。我们还将为每个 worker 提供一个 id
,以便我们在日志记录或调试时区分池中的不同 worker。
这是我们在创建 ThreadPool
时将发生的新流程。我们将在以这种方式设置 Worker
后实现将闭包发送到线程的代码。
- 定义一个
Worker
结构,其中包含一个id
和一个JoinHandle<()>
。 - 更改
ThreadPool
以保存Worker
实例的向量。 - 定义一个
Worker::new
函数,该函数接受一个id
数字并返回一个Worker
实例,该实例保存id
和一个使用空闭包生成的线程。 - 在
ThreadPool::new
中,使用for
循环计数器生成一个id
,使用该id
创建一个新的Worker
,并将该 worker 存储在向量中。
如果您愿意接受挑战,请在查看清单 20-15 中的代码之前尝试自己实现这些更改。
准备好了吗?这是清单 20-15,其中包含进行上述修改的一种方法。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
清单 20-15:修改 ThreadPool
以保存 Worker
实例,而不是直接保存线程
我们已将 ThreadPool
上字段的名称从 threads
更改为 workers
,因为它现在保存 Worker
实例,而不是 JoinHandle<()>
实例。我们使用 for
循环中的计数器作为 Worker::new
的参数,并将每个新的 Worker
存储在名为 workers
的向量中。
外部代码(如 src/main.rs 中的服务器)不需要了解有关在 ThreadPool
中使用 Worker
结构的实现细节,因此我们将 Worker
结构及其 new
函数设为私有。Worker::new
函数使用我们给它的 id
并存储一个 JoinHandle<()>
实例,该实例是通过使用空闭包生成新线程来创建的。
注意:如果操作系统由于没有足够的系统资源而无法创建线程,thread::spawn
将会 panic。这将导致我们的整个服务器 panic,即使某些线程的创建可能成功。为了简单起见,此行为很好,但在生产线程池实现中,您可能希望使用std::thread::Builder
及其spawn
返回 Result
的方法。
此代码可以编译,并将存储我们指定为 ThreadPool::new
参数的 Worker
实例的数量。但是,我们仍然没有处理我们在 execute
中获取的闭包。接下来,让我们看看如何执行此操作。
通过通道将请求发送到线程
我们要解决的下一个问题是,提供给 thread::spawn
的闭包绝对什么也不做。目前,我们在 execute
方法中获取我们想要执行的闭包。但是,我们需要在创建 ThreadPool
期间,给 thread::spawn
一个在创建每个 Worker
时运行的闭包。
我们希望我们刚创建的 Worker
结构从 ThreadPool
中保存的队列中获取要运行的代码,并将该代码发送到其线程运行。
我们在第 16 章中了解到的通道——一种在两个线程之间进行通信的简单方法——非常适合此用例。我们将使用通道作为作业队列,execute
将从 ThreadPool
向 Worker
实例发送作业,后者会将作业发送到其线程。以下是计划:
ThreadPool
将创建一个通道并保存发送方。- 每个
Worker
将保存接收方。 - 我们将创建一个新的
Job
结构,其中将保存我们想要通过通道发送的闭包。 execute
方法将通过发送方发送它想要执行的作业。- 在它的线程中,
Worker
将循环遍历其接收器,并执行它接收到的任何作业的闭包。
让我们首先在 ThreadPool::new
中创建一个通道,并将发送器保存在 ThreadPool
实例中,如清单 20-16 所示。Job
结构体现在没有任何内容,但它将是我们通过通道发送的项目类型。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
清单 20-16:修改 ThreadPool
以存储传输 Job
实例的通道的发送器
在 ThreadPool::new
中,我们创建新的通道,并让线程池持有发送器。这将成功编译。
让我们尝试在线程池创建通道时,将通道的接收器传递给每个 worker。我们知道我们想在 worker 生成的线程中使用接收器,因此我们将在闭包中引用 receiver
参数。清单 20-17 中的代码暂时还无法编译。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
清单 20-17:将接收器传递给 worker
我们做了一些小的、直接的更改:我们将接收器传递给 Worker::new
,然后在闭包内部使用它。
当我们尝试检查此代码时,会收到此错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
help: consider moving the expression out of the loop so it is only moved once
|
25 ~ let mut value = Worker::new(id, receiver);
26 ~ for id in 0..size {
27 ~ workers.push(value);
|
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
代码正在尝试将 receiver
传递给多个 Worker
实例。这行不通,正如您在第 16 章中回忆的那样:Rust 提供的通道实现是多生产者,单消费者。这意味着我们不能简单地克隆通道的消费端来修复此代码。我们也不想多次向多个消费者发送消息;我们希望有一个包含多个 worker 的消息列表,以便每个消息只被处理一次。
此外,从通道队列中取出一个作业涉及到修改 receiver
,因此线程需要一种安全的方式来共享和修改 receiver
;否则,我们可能会遇到竞态条件(如第 16 章所述)。
回顾第 16 章中讨论的线程安全智能指针:为了在多个线程之间共享所有权并允许线程修改值,我们需要使用 Arc<Mutex<T>>
。Arc
类型将允许多个 worker 拥有接收器,而 Mutex
将确保一次只有一个 worker 从接收器获取作业。清单 20-18 显示了我们需要进行的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
清单 20-18:使用 Arc
和 Mutex
在 worker 之间共享接收器
在 ThreadPool::new
中,我们将接收器放入 Arc
和 Mutex
中。对于每个新的 worker,我们克隆 Arc
以增加引用计数,以便 worker 可以共享接收器的所有权。
通过这些更改,代码可以编译了!我们快完成了!
实现 execute
方法
最后,让我们在 ThreadPool
上实现 execute
方法。我们还将 Job
从结构体更改为持有 execute
接收的闭包类型的特征对象的类型别名。正如 “使用类型别名创建类型同义词”第 19 章的章节所述,类型别名允许我们缩短长类型,以便于使用。请看清单 20-19。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
清单 20-19:为持有每个闭包的 Box
创建 Job
类型别名,然后将作业发送到通道
在使用我们在 execute
中获取的闭包创建新的 Job
实例后,我们将该作业发送到通道的发送端。我们对 send
调用 unwrap
是为了防止发送失败。例如,如果我们停止所有线程的执行,则可能会发生这种情况,这意味着接收端已停止接收新消息。目前,我们无法停止线程的执行:只要线程池存在,我们的线程就会继续执行。我们使用 unwrap
的原因是,我们知道不会发生失败的情况,但编译器不知道这一点。
但是我们还没有完全完成!在 worker 中,传递给 thread::spawn
的闭包仍然只是引用通道的接收端。相反,我们需要闭包永远循环,向通道的接收端请求作业,并在收到作业时运行该作业。让我们对 Worker::new
进行如清单 20-20 所示的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
清单 20-20:在 worker 的线程中接收和执行作业
在这里,我们首先对 receiver
调用 lock
来获取互斥锁,然后我们调用 unwrap
来在任何错误时发生 panic。如果互斥锁处于中毒状态,则获取锁可能会失败,如果其他线程在持有锁而不是释放锁时发生 panic,则可能会发生这种情况。在这种情况下,调用 unwrap
使此线程 panic 是正确的做法。您可以随意将此 unwrap
更改为带有对您有意义的错误消息的 expect
。
如果我们获取了互斥锁,我们调用 recv
来从通道接收一个 Job
。最后的 unwrap
也忽略了这里的任何错误,如果持有发送器的线程已关闭,则可能会发生这种情况,类似于如果接收器关闭,send
方法如何返回 Err
。
对 recv
的调用会阻塞,因此如果还没有作业,当前线程将等待,直到有作业可用。Mutex<T>
确保一次只有一个 Worker
线程尝试请求作业。
我们的线程池现在处于工作状态!运行 cargo run
并发出一些请求
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!我们现在拥有一个异步执行连接的线程池。永远不会创建超过四个线程,因此如果服务器收到大量请求,我们的系统不会过载。如果我们向 /sleep 发出请求,服务器将能够通过让另一个线程运行其他请求来处理这些请求。
注意:如果您同时在多个浏览器窗口中打开 /sleep,它们可能会以 5 秒的间隔依次加载。一些 Web 浏览器出于缓存原因会按顺序执行同一请求的多个实例。此限制并非由我们的 Web 服务器引起。
在学习了第 18 章中的 while let
循环之后,您可能想知道为什么我们没有像清单 20-21 中所示编写 worker 线程代码。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
清单 20-21:使用 while let
的 Worker::new
的替代实现
此代码可以编译并运行,但不会产生所需的线程行为:慢速请求仍然会导致其他请求等待处理。原因有点微妙:Mutex
结构体没有公共的 unlock
方法,因为锁的所有权基于 lock
方法返回的 LockResult<MutexGuard<T>>
中的 MutexGuard<T>
的生命周期。在编译时,借用检查器可以强制执行以下规则:除非我们持有锁,否则无法访问由 Mutex
保护的资源。但是,如果我们不注意 MutexGuard<T>
的生命周期,此实现还可能导致锁的持有时间超出预期。
清单 20-20 中使用 let job = receiver.lock().unwrap().recv().unwrap();
的代码可以工作,因为使用 let
,等号右侧表达式中使用的任何临时值会在 let
语句结束时立即被丢弃。但是,while let
(以及 if let
和 match
)不会在关联的代码块结束之前丢弃临时值。在清单 20-21 中,锁在调用 job()
的持续时间内保持持有,这意味着其他 worker 无法接收作业。