みーのぺーじ

みーが趣味でやっているPCやソフトウェアについて.Python, Javascript, Processing, Unityなど.

Rust で気軽に並列処理をする

Rust の rayon の紹介です.驚くほど簡単かつ安全に並列処理を行えます.

Cargo.toml

[package]
name = "thread-pool-sample"
version = "0.1.0"
edition = "2021"

[dependencies]
chrono = "0.4.24"
rayon = "1.7.0"
  • rustc 1.68.2

main.rs

use chrono::prelude::*;
use rayon::{prelude::*, ThreadPool, ThreadPoolBuilder};
use std::{thread, time::Duration};

fn create_thread_pool(num_threads: usize) -> ThreadPool {
    ThreadPoolBuilder::new()
        .num_threads(num_threads)
        .build()
        .unwrap()
}

fn sub(index: u32) -> u32 {
    let thread_index = rayon::current_thread_index().unwrap();
    let now = Utc::now().format("%H:%M:%S").to_string();
    println!("{now} Task {index} start. thread_index={thread_index}");
    thread::sleep(Duration::from_secs(2));
    let now = Utc::now().format("%H:%M:%S").to_string();
    println!("{now} Task {index} end.");
    return index * 2;
}

fn run(num_tasks: u32, num_threads: usize) {
    create_thread_pool(num_threads).install(|| {
        let ps = (0..num_tasks).into_par_iter().map(|i| sub(i));
        let r: Vec<u32> = ps.collect();
        println!("{:?}", r);
    });
}

create_thread_pool 関数で,スレッドの数を指定して,スレッドプールを作成します.これを使って,タスクを作成し,並列処理を開始します.それぞれのタスクでは,2 秒待ってから引数を2倍して返します.

fn main() {
    run(4, 4);
    println!("Complete.");
}

6 コア CPU で実行したところ,本来ならば 2 秒 × 4 タスクなので 8 秒かかるはずですが,2 秒で完了していますので,並列処理がうまくできていることが確認できました.

14:54:28 Task 0 start. thread_index=3
14:54:28 Task 2 start. thread_index=0
14:54:28 Task 1 start. thread_index=2
14:54:28 Task 3 start. thread_index=1
14:54:30 Task 0 end.
14:54:30 Task 1 end.
14:54:30 Task 2 end.
14:54:30 Task 3 end.
[0, 2, 4, 6]
Complete.

スレッドの作成にはコストがかかるので,CPU コアの数に対応したスレッド数を設定して,使い回すのがよいでしょう.例えば,タスクの数が 15 個で,スレッドの数が 4 個の場合は,処理が完了したスレッドが使い回されます.

fn main() {
    run(15, 4);
    println!("Complete.");
}

6 コア CPU で実行したところ,Task は 15 個, thread_index は 4 個となりましたので,期待した通りの動作です.

14:54:54 Task 3 start. thread_index=2
14:54:54 Task 7 start. thread_index=1
14:54:54 Task 5 start. thread_index=3
14:54:54 Task 0 start. thread_index=0
14:54:56 Task 7 end.
14:54:56 Task 0 end.
14:54:56 Task 3 end.
14:54:56 Task 1 start. thread_index=0
14:54:56 Task 5 end.
14:54:56 Task 6 start. thread_index=3
14:54:56 Task 8 start. thread_index=1
14:54:56 Task 4 start. thread_index=2
14:54:58 Task 1 end.
14:54:58 Task 2 start. thread_index=0
14:54:58 Task 6 end.
14:54:58 Task 8 end.
14:54:58 Task 9 start. thread_index=1
14:54:58 Task 11 start. thread_index=3
14:54:58 Task 4 end.
14:54:58 Task 13 start. thread_index=2
14:55:00 Task 2 end.
14:55:00 Task 12 start. thread_index=0
14:55:00 Task 11 end.
14:55:00 Task 9 end.
14:55:00 Task 10 start. thread_index=3
14:55:00 Task 13 end.
14:55:00 Task 14 start. thread_index=1
14:55:02 Task 12 end.
14:55:02 Task 10 end.
14:55:02 Task 14 end.
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28]
Complete.

また,最後に "Complete." と出力されているので,ThreadPool.install 関数でメインスレッドが他のスレッドを待っていることが分かります.

参考