测试用例:映射-规约
Rust 使并行化数据处理变得非常容易,而不会遇到传统上与此类尝试相关的许多麻烦。
标准库提供了开箱即用的出色线程原语。这些原语与 Rust 的所有权和别名规则概念相结合,可以自动防止数据竞争。
别名规则(一个可写引用或多个可读引用)自动阻止您操作对其他线程可见的状态。(如果需要同步,可以使用同步原语,例如 Mutex
或 Channel
。)
在本例中,我们将计算一个数字块中所有数字的总和。我们将通过将数据块分成不同的线程来实现这一点。每个线程将对其微小的数字块求和,然后我们将对每个线程产生的中间和求和。
请注意,尽管我们跨线程边界传递引用,但 Rust 知道我们只传递只读引用,因此不会发生不安全或数据竞争。此外,由于我们传递的引用具有 'static
生命周期,因此 Rust 知道在这些线程仍在运行时我们的数据不会被销毁。(当您需要在线程之间共享非 static
数据时,可以使用智能指针(如 Arc
)来保持数据活动并避免非 static
生命周期。)
use std::thread; // This is the `main` thread fn main() { // This is our data to process. // We will calculate the sum of all digits via a threaded map-reduce algorithm. // Each whitespace separated chunk will be handled in a different thread. // // TODO: see what happens to the output if you insert spaces! let data = "86967897737416471853297327050364959 11861322575564723963297542624962850 70856234701860851907960690014725639 38397966707106094172783238747669219 52380795257888236525459303330302837 58495327135744041048897885734297812 69920216438980873548808413720956532 16278424637452589860345374828574668"; // Make a vector to hold the child-threads which we will spawn. let mut children = vec![]; /************************************************************************* * "Map" phase * * Divide our data into segments, and apply initial processing ************************************************************************/ // split our data into segments for individual calculation // each chunk will be a reference (&str) into the actual data let chunked_data = data.split_whitespace(); // Iterate over the data segments. // .enumerate() adds the current loop index to whatever is iterated // the resulting tuple "(index, element)" is then immediately // "destructured" into two variables, "i" and "data_segment" with a // "destructuring assignment" for (i, data_segment) in chunked_data.enumerate() { println!("data segment {} is \"{}\"", i, data_segment); // Process each data segment in a separate thread // // spawn() returns a handle to the new thread, // which we MUST keep to access the returned value // // 'move || -> u32' is syntax for a closure that: // * takes no arguments ('||') // * takes ownership of its captured variables ('move') and // * returns an unsigned 32-bit integer ('-> u32') // // Rust is smart enough to infer the '-> u32' from // the closure itself so we could have left that out. // // TODO: try removing the 'move' and see what happens children.push(thread::spawn(move || -> u32 { // Calculate the intermediate sum of this segment: let result = data_segment // iterate over the characters of our segment.. .chars() // .. convert text-characters to their number value.. .map(|c| c.to_digit(10).expect("should be a digit")) // .. and sum the resulting iterator of numbers .sum(); // println! locks stdout, so no text-interleaving occurs println!("processed segment {}, result={}", i, result); // "return" not needed, because Rust is an "expression language", the // last evaluated expression in each block is automatically its value. result })); } /************************************************************************* * "Reduce" phase * * Collect our intermediate results, and combine them into a final result ************************************************************************/ // combine each thread's intermediate results into a single final sum. // // we use the "turbofish" ::<> to provide sum() with a type hint. // // TODO: try without the turbofish, by instead explicitly // specifying the type of final_result let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>(); println!("Final sum result: {}", final_result); }
作业
让我们的线程数依赖于用户输入的数据是不明智的。如果用户决定插入大量空格怎么办?我们真的要生成 2,000 个线程吗?修改程序,以便始终将数据块分成有限数量的块,该数量由程序开头的静态常量定义。
另请参阅
- 线程
- 向量 和 迭代器
- 闭包、移动 语义和
move
闭包 - 解构 赋值
- turbofish 符号 以帮助类型推断
- unwrap 与 expect
- 枚举