测试用例:map-reduce
Rust 可以非常容易地并行化数据处理,而无需像传统尝试那样带来许多麻烦。
标准库开箱即用地提供了出色的线程原语。这些与 Rust 的所有权和别名规则相结合,可以自动防止数据竞争。
别名规则(一个可写引用 XOR 多个可读引用)会自动阻止您操作对其他线程可见的状态。(在需要同步的地方,有像 Mutex
或 Channel
这样的同步原语。)
在这个例子中,我们将计算一个数字块中所有数字的总和。我们将通过将块分成不同的线程来完成此操作。每个线程将对其小块数字求和,随后我们将对每个线程产生的中间和进行求和。
请注意,尽管我们正在跨线程边界传递引用,但 Rust 理解我们仅传递只读引用,因此不会发生不安全或数据竞争。同样,因为我们传递的引用具有 'static
生命周期,Rust 理解我们的数据在这些线程仍在运行时不会被销毁。(当您需要在线程之间共享非 static
数据时,您可以使用像 Arc
这样的智能指针来保持数据存活并避免非 static
生命周期。)
use std::thread; // This is the `main` thread fn main() { // This is our data to process. // We will calculate the sum of all digits via a threaded map-reduce algorithm. // Each whitespace separated chunk will be handled in a different thread. // // TODO: see what happens to the output if you insert spaces! let data = "86967897737416471853297327050364959 11861322575564723963297542624962850 70856234701860851907960690014725639 38397966707106094172783238747669219 52380795257888236525459303330302837 58495327135744041048897885734297812 69920216438980873548808413720956532 16278424637452589860345374828574668"; // Make a vector to hold the child-threads which we will spawn. let mut children = vec![]; /************************************************************************* * "Map" phase * * Divide our data into segments, and apply initial processing ************************************************************************/ // split our data into segments for individual calculation // each chunk will be a reference (&str) into the actual data let chunked_data = data.split_whitespace(); // Iterate over the data segments. // .enumerate() adds the current loop index to whatever is iterated // the resulting tuple "(index, element)" is then immediately // "destructured" into two variables, "i" and "data_segment" with a // "destructuring assignment" for (i, data_segment) in chunked_data.enumerate() { println!("data segment {} is \"{}\"", i, data_segment); // Process each data segment in a separate thread // // spawn() returns a handle to the new thread, // which we MUST keep to access the returned value // // 'move || -> u32' is syntax for a closure that: // * takes no arguments ('||') // * takes ownership of its captured variables ('move') and // * returns an unsigned 32-bit integer ('-> u32') // // Rust is smart enough to infer the '-> u32' from // the closure itself so we could have left that out. // // TODO: try removing the 'move' and see what happens children.push(thread::spawn(move || -> u32 { // Calculate the intermediate sum of this segment: let result = data_segment // iterate over the characters of our segment.. .chars() // .. convert text-characters to their number value.. .map(|c| c.to_digit(10).expect("should be a digit")) // .. and sum the resulting iterator of numbers .sum(); // println! locks stdout, so no text-interleaving occurs println!("processed segment {}, result={}", i, result); // "return" not needed, because Rust is an "expression language", the // last evaluated expression in each block is automatically its value. result })); } /************************************************************************* * "Reduce" phase * * Collect our intermediate results, and combine them into a final result ************************************************************************/ // combine each thread's intermediate results into a single final sum. // // we use the "turbofish" ::<> to provide sum() with a type hint. // // TODO: try without the turbofish, by instead explicitly // specifying the type of final_result let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>(); println!("Final sum result: {}", final_result); }
练习
让线程的数量取决于用户输入的数据是不明智的。如果用户决定插入很多空格会怎么样?我们真的想生成 2,000 个线程吗?修改程序,以便数据始终被分块为有限数量的块,这些块由程序开始时的静态常量定义。