如何在分区数组上运行并行计算线程? [英] How do I run parallel threads of computation on a partitioned array?
问题描述
我正在尝试在多个线程之间分配一个数组,并让线程并行地汇总该数组的各个部分.我希望线程0将元素0 1 2求和,线程1将元素3 4 5求和.线程2将6和7求和.线程3将8和9求和.
I'm trying to distribute an array across threads and have the threads sum up portions of the array in parallel. I want thread 0 to sum elements 0 1 2 and Thread 1 sum elements 3 4 5. Thread 2 to sum 6 and 7. and Thread 3 to sum 8 and 9.
我是Rust的新手,但是之前已经使用C/C ++/Java进行了编码.我从字面上将所有内容和垃圾槽都扔到了该程序中,希望可以得到一些指导.
I'm new to Rust but have coded with C/C++/Java before. I've literally thrown everything and the garbage sink at this program and I was hoping I could receive some guidance.
对不起,我的代码很草率,但是当它制成成品时,我会清理它.请忽略所有名称不正确的变量/间距不一致/等.
Sorry my code is sloppy but I will clean it up when it is a finished product. Please ignore all poorly named variables/inconsistent spacing/etc.
use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;
static NTHREADS: usize = 4;
static NPROCS: usize = 10;
fn main() {
let mut a = [0; 10]; // a: [i32; 10]
let mut endpoint = a.len() / NTHREADS;
let mut remElements = a.len() % NTHREADS;
for x in 0..a.len() {
let secret_number = (rand::random::<i32>() % 100) + 1;
a[x] = secret_number;
println!("{}", a[x]);
}
let mut b = a;
let mut x = 0;
check_sum(&mut a);
// serial_sum(&mut b);
// Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
// where `T` is the type of the message to be transferred
// (type annotation is superfluous)
let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let mut scale: usize = 0;
for id in 0..NTHREADS {
// The sender endpoint can be copied
let thread_tx = tx.clone();
// Each thread will send its id via the channel
Thread::spawn(move || {
// The thread takes ownership over `thread_tx`
// Each thread queues a message in the channel
let numTougherThreads: usize = NPROCS % NTHREADS;
let numTasksPerThread: usize = NPROCS / NTHREADS;
let mut lsum = 0;
if id < numTougherThreads {
let mut q = numTasksPerThread+1;
lsum = 0;
while q > 0 {
lsum = lsum + a[scale];
scale+=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
while z > 0 {
lsum = lsum + a[scale];
scale +=1;
z = z-1;
}
println!("Greater than numToughthreads lsum: {}", lsum);
}
// Sending is a non-blocking operation, the thread will continue
// immediately after sending its message
println!("thread {} finished", id);
thread_tx.send(lsum).unwrap();
});
}
// Here, all the messages are collected
let mut globalSum = 0;
let mut ids = Vec::with_capacity(NTHREADS);
for _ in 0..NTHREADS {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there no messages available
ids.push(rx.recv());
}
println!("Global Sum: {}", globalSum);
// Show the order in which the messages were sent
println!("ids: {:?}", ids);
}
fn check_sum (arr: &mut [i32]) {
let mut sum = 0;
let mut i = 0;
let mut size = arr.len();
loop {
sum += arr[i];
i+=1;
if i == size { break; }
}
println!("CheckSum is {}", sum);
}
到目前为止,我已经做到了这一点.无法弄清楚为什么线程0和1具有相同的总和以及线程2和3具有相同的作用:
So far I've gotten it to do this much. Can't figure out why threads 0 and 1 have the same sum as well as 2 and 3 doing the same thing:
-5
-49
-32
99
45
-65
-64
-29
-56
65
CheckSum is -91
Greater than numTough lsum: -54
thread 2 finished
Less than numTough lsum: -86
thread 1 finished
Less than numTough lsum: -86
thread 0 finished
Greater than numTough lsum: -54
thread 3 finished
Global Sum: 0
ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]
通过使用以下代码,我设法将其重写为使用偶数.
I managed to rewrite it to work with even numbers by using the below code.
while q > 0 {
if id*s+scale == a.len() { break; }
lsum = lsum + a[id*s+scale];
scale +=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
let mut scale = 0;
while z > 0 {
if id*numTasksPerThread+scale == a.len() { break; }
lsum = lsum + a[id*numTasksPerThread+scale];
scale = scale + 1;
z = z-1;
}
推荐答案
欢迎使用Rust! :)
Welcome to Rust! :)
是的,起初我没有意识到每个线程都得到它自己的scale副本
Yeah at first I didn't realize each thread gets it's own copy of scale
不仅如此!它还获得了自己的a
!
Not only that! It also gets its own copy of a
!
您要尝试执行的操作可能类似于以下代码.我想您似乎更容易看到一个完整的工作示例,因为您似乎是Rust的初学者并要求提供指导.我故意将[i32; 10]
替换为Vec
,因为Vec
不能 隐式地Copy
可用.它需要一个显式的clone()
;我们不能无意中复制它.请注意所有较大和较小的差异.该代码还具有更多功能(mut
少了).我评论了大多数值得注意的事情:
What you are trying to do could look like the following code. I guess it's easier for you to see a complete working example since you seem to be a Rust beginner and asked for guidance. I deliberately replaced [i32; 10]
with a Vec
since a Vec
is not implicitly Copy
able. It requires an explicit clone()
; we cannot copy it by accident. Please note all the larger and smaller differences. The code also got a little more functional (less mut
). I commented most of the noteworthy things:
extern crate rand;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
const NTHREADS: usize = 4; // I replaced `static` by `const`
// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
let mut s = 0;
for x in iter {
s += x;
}
s
}
fn main() {
// We don't want to clone the whole vector into every closure.
// So we wrap it in an `Arc`. This allows sharing it.
// I also got rid of `mut` here by moving the computations into
// the initialization.
let a: Arc<Vec<_>> =
Arc::new(
(0..10)
.map(|_| {
(rand::random::<i32>() % 100) + 1
})
.collect()
);
let (tx, rx) = mpsc::channel(); // types will be inferred
{ // local scope, we don't need the following variables outside
let num_tasks_per_thread = a.len() / NTHREADS; // same here
let num_tougher_threads = a.len() % NTHREADS; // same here
let mut offset = 0;
for id in 0..NTHREADS {
let chunksize =
if id < num_tougher_threads {
num_tasks_per_thread + 1
} else {
num_tasks_per_thread
};
let my_a = a.clone(); // refers to the *same* `Vec`
let my_tx = tx.clone();
thread::spawn(move || {
let end = offset + chunksize;
let partial_sum =
sum( (&my_a[offset..end]).iter().cloned() );
my_tx.send(partial_sum).unwrap();
});
offset += chunksize;
}
}
// We can close this Sender
drop(tx);
// Iterator magic! Yay! global_sum does not need to be mutable
let global_sum = sum(rx.iter());
println!("global sum via threads : {}", global_sum);
println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}
这篇关于如何在分区数组上运行并行计算线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!