测试用例:映射-规约

Rust 使并行化数据处理变得非常容易,而不会遇到传统上与此类尝试相关的许多麻烦。

标准库提供了开箱即用的出色线程原语。这些原语与 Rust 的所有权和别名规则概念相结合,可以自动防止数据竞争。

别名规则(一个可写引用或多个可读引用)自动阻止您操作对其他线程可见的状态。(如果需要同步,可以使用同步原语,例如 MutexChannel。)

在本例中,我们将计算一个数字块中所有数字的总和。我们将通过将数据块分成不同的线程来实现这一点。每个线程将对其微小的数字块求和,然后我们将对每个线程产生的中间和求和。

请注意,尽管我们跨线程边界传递引用,但 Rust 知道我们只传递只读引用,因此不会发生不安全或数据竞争。此外,由于我们传递的引用具有 'static 生命周期,因此 Rust 知道在这些线程仍在运行时我们的数据不会被销毁。(当您需要在线程之间共享非 static 数据时,可以使用智能指针(如 Arc)来保持数据活动并避免非 static 生命周期。)

use std::thread;

// This is the `main` thread
fn main() {

    // This is our data to process.
    // We will calculate the sum of all digits via a threaded map-reduce algorithm.
    // Each whitespace separated chunk will be handled in a different thread.
    //
    // TODO: see what happens to the output if you insert spaces!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // Make a vector to hold the child-threads which we will spawn.
    let mut children = vec![];

    /*************************************************************************
     * "Map" phase
     *
     * Divide our data into segments, and apply initial processing
     ************************************************************************/

    // split our data into segments for individual calculation
    // each chunk will be a reference (&str) into the actual data
    let chunked_data = data.split_whitespace();

    // Iterate over the data segments.
    // .enumerate() adds the current loop index to whatever is iterated
    // the resulting tuple "(index, element)" is then immediately
    // "destructured" into two variables, "i" and "data_segment" with a
    // "destructuring assignment"
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);

        // Process each data segment in a separate thread
        //
        // spawn() returns a handle to the new thread,
        // which we MUST keep to access the returned value
        //
        // 'move || -> u32' is syntax for a closure that:
        // * takes no arguments ('||')
        // * takes ownership of its captured variables ('move') and
        // * returns an unsigned 32-bit integer ('-> u32')
        //
        // Rust is smart enough to infer the '-> u32' from
        // the closure itself so we could have left that out.
        //
        // TODO: try removing the 'move' and see what happens
        children.push(thread::spawn(move || -> u32 {
            // Calculate the intermediate sum of this segment:
            let result = data_segment
                        // iterate over the characters of our segment..
                        .chars()
                        // .. convert text-characters to their number value..
                        .map(|c| c.to_digit(10).expect("should be a digit"))
                        // .. and sum the resulting iterator of numbers
                        .sum();

            // println! locks stdout, so no text-interleaving occurs
            println!("processed segment {}, result={}", i, result);

            // "return" not needed, because Rust is an "expression language", the
            // last evaluated expression in each block is automatically its value.
            result

        }));
    }


    /*************************************************************************
     * "Reduce" phase
     *
     * Collect our intermediate results, and combine them into a final result
     ************************************************************************/

    // combine each thread's intermediate results into a single final sum.
    //
    // we use the "turbofish" ::<> to provide sum() with a type hint.
    //
    // TODO: try without the turbofish, by instead explicitly
    // specifying the type of final_result
    let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();

    println!("Final sum result: {}", final_result);
}

作业

让我们的线程数依赖于用户输入的数据是不明智的。如果用户决定插入大量空格怎么办?我们真的要生成 2,000 个线程吗?修改程序,以便始终将数据块分成有限数量的块,该数量由程序开头的静态常量定义。

另请参阅