将我们的单线程服务器转换为多线程服务器
目前,服务器将依次处理每个请求,这意味着在第一个请求处理完成之前,它不会处理第二个连接。如果服务器收到越来越多的请求,这种串行执行将变得越来越不理想。如果服务器收到一个需要很长时间才能处理的请求,则后续请求将不得不等到长请求完成,即使新请求可以快速处理。我们需要解决这个问题,但首先,我们将看看实际情况中的问题。
在当前服务器实现中模拟慢速请求
我们将看看慢速处理请求如何影响对我们当前服务器实现的其他请求。清单 20-10 实现了处理对 /sleep 的请求,并模拟了缓慢的响应,这将导致服务器在响应之前休眠 5 秒。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); handle_connection(stream); } } fn handle_connection(mut stream: TcpStream) { // --snip-- let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
我们现在从 if 切换到 match,因为我们有三种情况。我们需要显式匹配 request_line 的切片,以便对字符串字面量值进行模式匹配;match 不会像相等方法那样自动进行引用和解引用。
第一个分支与清单 20-9 中的 if 块相同。第二个分支匹配对 /sleep 的请求。收到该请求后,服务器将休眠 5 秒,然后呈现成功的 HTML 页面。第三个分支与清单 20-9 中的 else 块相同。
您可以看到我们的服务器是多么原始:真正的库将以一种更简洁的方式处理多个请求的识别!
使用 cargo run 启动服务器。然后打开两个浏览器窗口:一个用于 http://127.0.0.1:7878/,另一个用于 http://127.0.0.1:7878/sleep。如果您像以前一样多次输入 / URI,您会看到它响应很快。但是,如果您输入 /sleep 然后加载 /,您会看到 / 会等到 sleep 休眠了整整 5 秒才会加载。
我们可以使用多种技术来避免请求在慢速请求后积压;我们将实现的一种技术是线程池。
使用线程池提高吞吐量
线程池是一组已生成并准备好处理任务的线程。当程序收到新任务时,它会将池中的一个线程分配给该任务,该线程将处理该任务。池中剩余的线程可用于处理第一个线程正在处理时传入的任何其他任务。当第一个线程完成其任务的处理后,它将返回到空闲线程池中,准备处理新任务。线程池允许您并发处理连接,从而提高服务器的吞吐量。
我们会将池中的线程数限制在一个较小的数字,以防止拒绝服务 (DoS) 攻击;如果我们的程序在每个请求传入时都创建一个新线程,那么向我们的服务器发出 1000 万个请求的人可能会耗尽我们服务器的所有资源并导致请求处理陷入停顿,从而造成严重破坏。
因此,我们不会生成无限的线程,而是在池中设置固定数量的线程等待。传入的请求将发送到池中进行处理。池将维护一个传入请求队列。池中的每个线程都将从此队列中弹出一个请求,处理该请求,然后向队列请求另一个请求。通过这种设计,我们可以并发处理多达 N
个请求,其中 N
是线程数。如果每个线程都在响应一个长时间运行的请求,后续请求仍然可以在队列中备份,但在达到该点之前,我们已经增加了可以处理的长时间运行请求的数量。
这种技术只是提高 Web 服务器吞吐量的众多方法之一。您可以探索的其他选项包括*fork/join 模型*、*单线程异步 I/O 模型*或*多线程异步 I/O 模型*。如果您对本主题感兴趣,可以阅读有关其他解决方案的更多信息并尝试实现它们;使用 Rust 等低级语言,所有这些选项都是可能的。
在开始实现线程池之前,让我们先讨论一下使用线程池应该是什么样子。当您尝试设计代码时,首先编写客户端接口可以帮助指导您的设计。编写代码的 API,使其以您希望调用的方式构建;然后在该结构中实现功能,而不是实现功能然后设计公共 API。
类似于我们在第 12 章的项目中使用测试驱动开发的方式,我们将在这里使用编译器驱动开发。我们将编写调用我们想要的函数的代码,然后查看编译器的错误,以确定接下来应该更改什么才能使代码正常工作。但是,在我们这样做之前,我们将探索我们不会使用的技术作为起点。
为每个请求生成一个线程
首先,让我们探讨一下如果我们的代码确实为每个连接创建一个新线程,那么它会是什么样子。如前所述,由于可能会产生无限数量的线程,因此这不是我们的最终计划,但它是首先获得一个正常工作的多线程服务器的起点。然后,我们将添加线程池作为改进,并且更容易对比这两种解决方案。清单 20-11 显示了对 main
所做的更改,以生成一个新线程来处理 for
循环内的每个流。
文件名:src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } } fn handle_connection(mut stream: TcpStream) { let buf_reader = BufReader::new(&mut stream); let request_line = buf_reader.lines().next().unwrap().unwrap(); let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; let contents = fs::read_to_string(filename).unwrap(); let length = contents.len(); let response = format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}"); stream.write_all(response.as_bytes()).unwrap(); }
正如您在第 16 章中所学到的,thread::spawn
将创建一个新线程,然后在新线程中运行闭包中的代码。如果您运行此代码并在浏览器中加载 /sleep,然后在另外两个浏览器选项卡中加载 /,您确实会看到对 / 的请求不必等待 /sleep 完成。但是,正如我们所提到的,这最终会使系统不堪重负,因为您将在没有任何限制的情况下创建新线程。
创建有限数量的线程
我们希望我们的线程池以类似的、熟悉的方式工作,因此从线程切换到线程池不需要对使用我们 API 的代码进行大的更改。清单 20-12 显示了我们希望使用的 ThreadPool
结构的假设接口,而不是 thread::spawn
。
文件名:src/main.rs
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
我们使用 ThreadPool::new
创建一个具有可配置线程数的新线程池,在本例中为四个。然后,在 for
循环中,pool.execute
具有与 thread::spawn
类似的接口,因为它接受池应该为每个流运行的闭包。我们需要实现 pool.execute
,以便它接收闭包并将其提供给池中的线程以运行。此代码尚无法编译,但我们将尝试,以便编译器可以指导我们如何修复它。
使用编译器驱动开发构建 ThreadPool
对 src/main.rs 进行清单 20-12 中的更改,然后让我们使用 cargo check
中的编译器错误来驱动我们的开发。这是我们遇到的第一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
--> src/main.rs:11:16
|
11 | let pool = ThreadPool::new(4);
| ^^^^^^^^^^ use of undeclared type `ThreadPool`
For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` (bin "hello") due to 1 previous error
太棒了!此错误告诉我们需要一个 ThreadPool
类型或模块,因此我们现在就构建一个。我们的 ThreadPool
实现将独立于我们的 Web 服务器正在执行的工作类型。因此,让我们将 hello
crate 从二进制 crate 切换到库 crate 以保存我们的 ThreadPool
实现。在我们更改为库 crate 之后,我们还可以将单独的线程池库用于我们想要使用线程池完成的任何工作,而不仅仅是用于服务 Web 请求。
创建一个包含以下内容的 src/lib.rs,这是我们现在可以拥有的最简单的 ThreadPool
结构定义
文件名:src/lib.rs
pub struct ThreadPool;
然后编辑 main.rs 文件,通过将以下代码添加到 src/main.rs 的顶部,将 ThreadPool
从库 crate 引入作用域
文件名:src/main.rs
use hello::ThreadPool;
use std::{
fs,
io::{prelude::*, BufReader},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let buf_reader = BufReader::new(&mut stream);
let request_line = buf_reader.lines().next().unwrap().unwrap();
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
let contents = fs::read_to_string(filename).unwrap();
let length = contents.len();
let response =
format!("{status_line}\r\nContent-Length: {length}\r\n\r\n{contents}");
stream.write_all(response.as_bytes()).unwrap();
}
此代码仍然无法工作,但让我们再次检查它以获取我们需要解决的下一个错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
--> src/main.rs:12:28
|
12 | let pool = ThreadPool::new(4);
| ^^^ function or associated item not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
此错误表明接下来我们需要为 ThreadPool
创建一个名为 new
的关联函数。我们还知道 new
需要有一个参数,该参数可以接受 4
作为参数,并且应该返回一个 ThreadPool
实例。让我们实现最简单的 new
函数,它将具有这些特征
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我们选择 usize
作为 size
参数的类型,因为我们知道负数线程没有任何意义。我们还知道我们将使用这 4 作为线程集合中的元素数量,这就是 usize
类型的用途,如第 3 章“整数类型”部分所述。
让我们再次检查代码
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
--> src/main.rs:17:14
|
17 | pool.execute(|| {
| -----^^^^^^^ method not found in `ThreadPool`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` (bin "hello") due to 1 previous error
现在出现错误是因为我们在 ThreadPool
上没有 execute
方法。回想一下“创建有限数量的线程”部分,我们决定我们的线程池应该有一个类似于 thread::spawn
的接口。此外,我们将实现 execute
函数,以便它接收它收到的闭包并将其提供给池中的空闲线程以运行。
我们将在 ThreadPool
上定义 execute
方法以将闭包作为参数。回想一下第 13 章“将捕获的值移出闭包和 Fn
特征”部分,我们可以使用三种不同的特征将闭包作为参数:Fn
、FnMut
和 FnOnce
。我们需要在这里决定使用哪种闭包。我们知道我们最终会做一些类似于标准库 thread::spawn
实现的事情,因此我们可以查看 thread::spawn
的签名在其参数上有何界限。文档向我们展示了以下内容
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
F
类型参数是我们在这里关心的参数;T
类型参数与返回值有关,我们不关心它。我们可以看到 spawn
使用 FnOnce
作为 F
上的特征界限。这可能也是我们想要的,因为我们最终会将我们在 execute
中获得的参数传递给 spawn
。我们可以更加确信 FnOnce
是我们想要使用的特征,因为用于运行请求的线程只会执行该请求的闭包一次,这与 FnOnce
中的 Once
相匹配。
F
类型参数还具有特征界限 Send
和生命周期界限 'static
,这在我们的情况下很有用:我们需要 Send
将闭包从一个线程转移到另一个线程,而 'static
因为我们不知道线程执行需要多长时间。让我们在 ThreadPool
上创建一个 execute
方法,该方法将采用具有这些界限的类型为 F
的泛型参数
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们仍然在 FnOnce
之后使用 ()
,因为此 FnOnce
表示一个不带参数并返回单元类型 ()
的闭包。就像函数定义一样,可以从签名中省略返回类型,但即使我们没有参数,我们仍然需要括号。
同样,这是 execute
方法的最简单实现:它什么也不做,但我们只是尝试使我们的代码编译。让我们再次检查它
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
Finished dev [unoptimized + debuginfo] target(s) in 0.24s
它编译了!但请注意,如果您尝试 cargo run
并在浏览器中发出请求,您将在浏览器中看到我们在本章开头看到的错误。我们的库实际上还没有调用传递给 execute
的闭包!
注意:您可能会听到关于具有严格编译器的语言(例如 Haskell 和 Rust)的说法是“如果代码编译,它就可以工作”。但这句谚语并非普遍适用。我们的项目编译了,但它什么也没做!如果我们正在构建一个真实的、完整的项目,那么现在是开始编写单元测试以检查代码是否编译*并且*具有我们想要的行為的好时机。
验证 new
中的线程数
我们没有对 new
和 execute
的参数做任何事情。让我们使用我们想要的行為来实现这些函数的主体。首先,让我们考虑一下 new
。之前我们为 size
参数选择了一个无符号类型,因为具有负数线程的池没有任何意义。但是,具有零线程的池也没有意义,但零是一个完全有效的 usize
。我们将添加代码以在返回 ThreadPool
实例之前检查 size
是否大于零,并在收到零时使用 assert!
宏使程序 panic,如清单 20-13 所示。
文件名:src/lib.rs
pub struct ThreadPool;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们还使用文档注释为我们的 ThreadPool
添加了一些文档。请注意,我们遵循了良好的文档实践,添加了一个部分来指出我们的函数可能出现 panic 的情况,如第 14 章所述。尝试运行 cargo doc --open
并单击 ThreadPool
结构以查看生成的 new
文档是什么样子!
除了像我们这里做的那样添加 assert!
宏之外,我们还可以将 new
改为 build
并返回一个 Result
,就像我们在清单 12-9 中对 I/O 项目中的 Config::build
所做的那样。但是,在这种情况下,我们决定尝试创建一个没有任何线程的线程池应该是一个不可恢复的错误。如果您有雄心壮志,请尝试编写一个名为 build
的函数,其签名如下,以便与 new
函数进行比较
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
创建存储线程的空间
现在我们有了一种方法来知道我们有有效数量的线程存储在线程池中,我们可以在返回 ThreadPool
结构之前创建这些线程并将它们存储在其中。但是我们如何“存储”线程呢?让我们再看一下 thread::spawn
签名
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn
函数返回一个 JoinHandle<T>
,其中 T
是闭包返回的类型。让我们也尝试使用 JoinHandle
,看看会发生什么。在我们的例子中,我们传递给线程池的闭包将处理连接并且不返回任何内容,因此 T
将是单元类型 ()
。
清单 20-14 中的代码可以编译,但还没有创建任何线程。我们已经将 ThreadPool
的定义更改为保存一个 thread::JoinHandle<()>
实例的向量,用容量 size
初始化向量,设置一个 for
循环,该循环将运行一些代码来创建线程,并返回一个包含它们的 ThreadPool
实例。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我们在库 crate 中将 std::thread
引入作用域,因为我们在 ThreadPool
中使用 thread::JoinHandle
作为向量中项目的类型。
一旦接收到有效的大小,我们的 ThreadPool
就会创建一个可以容纳 size
个项目的新向量。with_capacity
函数执行与 Vec::new
相同的任务,但有一个重要的区别:它在向量中预先分配空间。因为我们知道我们需要在向量中存储 size
个元素,所以预先进行分配比使用 Vec::new
(它会在插入元素时调整自身大小)稍微有效一些。
当您再次运行 cargo check
时,它应该会成功。
一个负责将代码从 ThreadPool
发送到线程的 Worker
结构体
我们在清单 20-14 中关于线程创建的 for
循环中留下了一条注释。在这里,我们将看看我们是如何实际创建线程的。标准库提供了 thread::spawn
作为创建线程的一种方式,并且 thread::spawn
期望在线程创建后立即获得一些线程应该运行的代码。但是,在我们的例子中,我们希望创建线程并让它们*等待*我们稍后发送的代码。标准库的线程实现不包括任何实现此目的的方法;我们必须手动实现它。
我们将通过在 ThreadPool
和将管理此新行为的线程之间引入新的数据结构来实现此行为。我们将此数据结构称为*Worker*,这是池实现中的常用术语。Worker 获取需要运行的代码并在 Worker 的线程中运行代码。想想在餐馆厨房工作的人:工人们一直等到顾客点餐,然后他们负责接收这些订单并完成它们。
我们将在线程池中存储 Worker
结构体的实例,而不是存储 JoinHandle<()>
实例的向量。每个 Worker
将存储一个 JoinHandle<()>
实例。然后,我们将在 Worker
上实现一个方法,该方法将获取要运行的代码闭包并将其发送到已运行的线程以供执行。我们还将为每个 worker 提供一个 id
,以便我们可以在记录或调试时区分池中的不同 worker。
以下是我们在创建 ThreadPool
时将发生的新流程。在我们以这种方式设置好 Worker
之后,我们将实现将闭包发送到线程的代码
- 定义一个包含
id
和JoinHandle<()>
的Worker
结构体。 - 将
ThreadPool
更改为保存Worker
实例的向量。 - 定义一个
Worker::new
函数,该函数接受一个id
号并返回一个Worker
实例,该实例包含id
和一个使用空闭包生成的线程。 - 在
ThreadPool::new
中,使用for
循环计数器生成一个id
,使用该id
创建一个新的Worker
,并将该 worker 存储在向量中。
如果您想挑战一下,请尝试在查看清单 20-15 中的代码之前自己实现这些更改。
准备好了吗?以下是清单 20-15,其中包含进行上述修改的一种方法。
文件名:src/lib.rs
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
我们已将 ThreadPool
上的字段名称从 threads
更改为 workers
,因为它现在保存的是 Worker
实例而不是 JoinHandle<()>
实例。我们将 for
循环中的计数器用作 Worker::new
的参数,并将每个新的 Worker
存储在名为 workers
的向量中。
外部代码(如我们在 src/main.rs 中的服务器)不需要知道关于在 ThreadPool
中使用 Worker
结构体的实现细节,因此我们将 Worker
结构体及其 new
函数设为私有。Worker::new
函数使用我们给它的 id
并存储一个 JoinHandle<()>
实例,该实例是通过使用空闭包生成新线程来创建的。
注意:如果操作系统由于系统资源不足而无法创建线程,则
thread::spawn
将会 panic。这将导致我们的整个服务器 panic,即使某些线程的创建可能成功。为了简单起见,此行为是可以接受的,但在生产线程池实现中,您可能希望使用std::thread::Builder
及其spawn
方法,该方法返回Result
。
此代码将编译并将存储我们指定为 ThreadPool::new
的参数的 Worker
实例数。但是我们*仍然*没有处理我们在 execute
中获得的闭包。接下来让我们看看如何做到这一点。
通过通道将请求发送到线程
我们将要解决的下一个问题是,提供给 thread::spawn
的闭包什么也不做。目前,我们在 execute
方法中获取了我们想要执行的闭包。但是,当我们在创建 ThreadPool
期间创建每个 Worker
时,我们需要为 thread::spawn
提供一个要运行的闭包。
我们希望我们刚刚创建的 Worker
结构体从 ThreadPool
中保存的队列中获取要运行的代码,并将该代码发送到其线程以供运行。
我们在第 16 章中了解到的通道(一种在线程之间进行通信的简单方法)非常适合这种用例。我们将使用一个通道作为作业队列,execute
将从 ThreadPool
向 Worker
实例发送作业,后者将作业发送到其线程。以下是计划
ThreadPool
将创建一个通道并持有发送端。- 每个
Worker
将持有接收端。 - 我们将创建一个新的
Job
结构体,它将保存我们想要通过通道发送的闭包。 execute
方法将通过发送端发送它想要执行的作业。- 在其线程中,
Worker
将循环遍历其接收端并执行它收到的任何作业的闭包。
让我们首先在 ThreadPool::new
中创建一个通道,并将发送端保存在 ThreadPool
实例中,如清单 20-16 所示。Job
结构体现在不保存任何内容,但将是我们通过通道发送的项目的类型。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
在 ThreadPool::new
中,我们创建了新的通道,并让线程池持有发送端。这将成功编译。
让我们尝试在创建通道时将通道的接收端传递给每个 worker。我们知道我们想在 worker 生成的线程中使用接收端,因此我们将在闭包中引用 receiver
参数。清单 20-17 中的代码还不能完全编译。
文件名:src/lib.rs
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
我们做了一些小的、直接的更改:我们将接收端传递给 Worker::new
,然后在闭包中使用它。
当我们尝试检查此代码时,会收到以下错误
$ cargo check
Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
--> src/lib.rs:26:42
|
21 | let (sender, receiver) = mpsc::channel();
| -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
25 | for id in 0..size {
| ----------------- inside of this loop
26 | workers.push(Worker::new(id, receiver));
| ^^^^^^^^ value moved here, in previous iteration of loop
|
note: consider changing this parameter type in method `new` to borrow instead if owning the value isn't necessary
--> src/lib.rs:47:33
|
47 | fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
| --- in this method ^^^^^^^^^^^^^^^^^^^ this parameter takes ownership of the value
For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` (lib) due to 1 previous error
该代码试图将 receiver
传递给多个 Worker
实例。这将不起作用,正如您在第 16 章中所记得的那样:Rust 提供的通道实现是多*生产者*,单*消费者*。这意味着我们不能仅仅克隆通道的消费端来修复此代码。我们也不希望将一条消息多次发送给多个消费者;我们希望有一个包含多个 worker 的消息列表,以便每条消息只被处理一次。
此外,从通道队列中取出作业涉及到对 receiver
进行修改,因此线程需要一种安全的方式来共享和修改 receiver
;否则,我们可能会遇到竞争条件(如第 16 章所述)。
回想一下第 16 章中讨论的线程安全智能指针:为了在多个线程之间共享所有权并允许线程修改值,我们需要使用 Arc<Mutex<T>>
。Arc
类型将允许多个 worker 拥有接收端,而 Mutex
将确保一次只有一个 worker 从接收端获得作业。清单 20-18 显示了我们需要进行的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
// --snip--
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
在 ThreadPool::new
中,我们将接收端放在 Arc
和 Mutex
中。对于每个新的 worker,我们克隆 Arc
以增加引用计数,以便 worker 可以共享接收端的所有权。
通过这些更改,代码就可以编译了!我们快成功了!
实现 execute
方法
最后让我们在 ThreadPool
上实现 execute
方法。我们还将 Job
从结构体更改为持有 execute
接收的闭包类型的特征对象的类型别名。正如第 19 章的 “使用类型别名创建类型同义词”部分所述,类型别名允许我们缩短长类型以方便使用。请看清单 20-19。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
// --snip--
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
在使用我们在 execute
中获得的闭包创建新的 Job
实例之后,我们将该作业发送到通道的发送端。如果发送失败,我们会调用 send
上的 unwrap
。例如,如果我们停止所有线程的执行,这意味着接收端已停止接收新消息,则可能会发生这种情况。目前,我们无法阻止线程执行:只要线程池存在,我们的线程就会继续执行。我们使用 unwrap
的原因是我们知道失败的情况不会发生,但编译器不知道这一点。
但我们还没有完全完成!在 worker 中,我们传递给 thread::spawn
的闭包仍然只*引用*通道的接收端。相反,我们需要闭包永远循环,询问通道的接收端是否有作业,并在获得作业时运行作业。让我们对 Worker::new
进行清单 20-20 中所示的更改。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
});
Worker { id, thread }
}
}
在这里,我们首先在 receiver
上调用 lock
来获取互斥锁,然后调用 unwrap
来处理任何错误并引发 panic。如果互斥锁处于*中毒*状态,获取锁可能会失败,这种情况可能发生在另一个线程持有锁时发生 panic 而不是释放锁。在这种情况下,调用 unwrap
使该线程 panic 是正确的操作。您可以随意将此 unwrap
更改为带有对您有意义的错误消息的 expect
。
如果我们获得了互斥锁,则调用 recv
从通道接收 Job
。最后的 unwrap
也会处理这里的任何错误,如果持有发送者的线程已关闭,则可能会发生这些错误,类似于如果接收者关闭,send
方法将返回 Err
。
对 recv
的调用会阻塞,因此如果没有作业,当前线程将一直等待,直到有作业可用。Mutex<T>
确保一次只有一个 Worker
线程尝试请求作业。
我们的线程池现在处于工作状态!运行 cargo run
并发出一些请求。
$ cargo run
Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
--> src/lib.rs:7:5
|
7 | workers: Vec<Worker>,
| ^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(dead_code)]` on by default
warning: field is never read: `id`
--> src/lib.rs:48:5
|
48 | id: usize,
| ^^^^^^^^^
warning: field is never read: `thread`
--> src/lib.rs:49:5
|
49 | thread: thread::JoinHandle<()>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
warning: `hello` (lib) generated 3 warnings
Finished dev [unoptimized + debuginfo] target(s) in 1.40s
Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
成功!我们现在有了一个异步执行连接的线程池。创建的线程永远不会超过四个,因此即使服务器收到大量请求,我们的系统也不会过载。如果我们向 /sleep 发出请求,服务器将能够通过让另一个线程运行它们来处理其他请求。
注意:如果您同时在多个浏览器窗口中打开 /sleep,它们可能会以 5 秒的间隔逐个加载。出于缓存原因,某些 Web 浏览器会顺序执行同一请求的多个实例。此限制不是由我们的 Web 服务器引起的。
在学习了第 18 章中的 while let
循环之后,您可能想知道为什么我们没有像代码清单 20-21 中那样编写工作线程代码。
文件名:src/lib.rs
use std::{
sync::{mpsc, Arc, Mutex},
thread,
};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
这段代码可以编译和运行,但不会产生所需的线程行为:缓慢的请求仍然会导致其他请求等待处理。原因有些微妙:Mutex
结构体没有公共的 unlock
方法,因为锁的所有权基于 lock
方法返回的 LockResult<MutexGuard<T>>
中 MutexGuard<T>
的生命周期。在编译时,借用检查器可以强制执行以下规则:除非我们持有锁,否则不能访问由 Mutex
保护的资源。但是,如果我们不注意 MutexGuard<T>
的生命周期,此实现也可能导致锁的持有时间超过预期。
使用 let job = receiver.lock().unwrap().recv().unwrap();
的代码清单 20-20 中的代码之所以有效,是因为使用 let
时,等号右侧表达式中使用的任何临时值都会在 let
语句结束时立即被丢弃。但是,while let
(以及 if let
和 match
)不会丢弃临时值,直到关联的代码块结束。在代码清单 20-21 中,锁在调用 job()
期间一直被持有,这意味着其他工作线程无法接收作业。