什么是最好的/最优雅的方式来限制并行流中并发评估的数量(如使用fixedThreadPool) [英] What is the best / most elegant way to limit the number of concurrent evaluation (like with a fixedThreadPool) in parallel streams
问题描述
假设一个lambda表达式消耗一定量的资源(如内存),这是有限的,需要限制并发执行的数量(例如:如果lambda暂时消耗100 MB(本地内存),我们喜欢限制它到1GB,我们不允许更多的10个并发评估)。
Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).
什么是限制并发执行数的最好方法,例如在
What is the best way to limit the number of concurrent execution, say for example in
IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });
?
一个显而易见的选择是执行一个嵌套像
Note: An obvious option is to perform a nesting like
double jobsPerThread = (double)numberOfJobs / numberOfThreads;
IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));
这是唯一的方法吗? Tt不是优雅的。 其实我想有一个
Is this the only way? Tt is not that elegant. Actually I would like to have a
IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });
推荐答案
Stream
s使用 ForkJoinPool
用于并行操作。默认情况下,它们使用 ForkJoinPool.commonPool()
,这不允许随后更改并发。但是,您可以使用自己的 ForkJoinPool
实例。当您在自己的 ForkJoinPool
上下文中执行流代码时,此上下文池将用于流操作。以下示例通过使用默认行为执行相同的操作一次并且使用具有固定并发性 2
的自定义池来说明这一点:
The Stream
s use a ForkJoinPool
for parallel operations. By default they are using the ForkJoinPool.commonPool()
which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool
instance. When you execute the stream code within the context of your own ForkJoinPool
this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2
:
import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class InterfaceStaticMethod {
public static void main(String[] arg) throws Exception {
Runnable parallelCode=() -> {
HashSet<String> allThreads=new HashSet<>();
IntStream.range(0, 1_000_000).parallel().filter(i->{
allThreads.add(Thread.currentThread().getName()); return false;}
).min();
System.out.println("executed by "+allThreads);
};
System.out.println("default behavior: ");
parallelCode.run();
System.out.println("specialized pool:");
ForkJoinPool pool=new ForkJoinPool(2);
pool.submit(parallelCode).get();
}
}
这篇关于什么是最好的/最优雅的方式来限制并行流中并发评估的数量(如使用fixedThreadPool)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!