什么是最好的/最优雅的方式来限制并行流中并发评估的数量(如使用fixedThreadPool) [英] What is the best / most elegant way to limit the number of concurrent evaluation (like with a fixedThreadPool) in parallel streams

查看:223
本文介绍了什么是最好的/最优雅的方式来限制并行流中并发评估的数量(如使用fixedThreadPool)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设一个lambda表达式消耗一定量的资源(如内存),这是有限的,需要限制并发执行的数量(例如:如果lambda暂时消耗100 MB(本地内存),我们喜欢限制它到1GB,我们不允许更多的10个并发评估)。

Assume a lambda expression consume a certain amount of a resource (like memory) which is limited and requires to limit the number of concurrent executions (example: if the lambda temporarily consumes 100 MB (of local memory) and we like to limit it to 1GB, we do not allow for more that 10 concurrent evaluations).

什么是限制并发执行数的最好方法,例如在



What is the best way to limit the number of concurrent execution, say for example in

IntStream.range(0, numberOfJobs).parallel().foreach( i -> { /*...*/ });

一个显而易见的选择是执行一个嵌套像

Note: An obvious option is to perform a nesting like

    double jobsPerThread = (double)numberOfJobs / numberOfThreads;
    IntStream.range(0, numberOfThreads).parallel().forEach( threadIndex ->
        IntStream.range((int)(threadIndex * jobsPerThread), (int)((threadIndex+1) * jobsPerThread)).sequential().forEach( i -> { /*...*/ }));

这是唯一的方法吗? Tt不是优雅的。 其实我想有一个

Is this the only way? Tt is not that elegant. Actually I would like to have a

IntStream.range(0, numberOfJobs).parallel(numberOfThreads).foreach( i -> { /*...*/ });


推荐答案

Stream s使用 ForkJoinPool 用于并行操作。默认情况下,它们使用 ForkJoinPool.commonPool(),这不允许随后更改并发。但是,您可以使用自己的 ForkJoinPool 实例。当您在自己的 ForkJoinPool 上下文中执行流代码时,此上下文池将用于流操作。以下示例通过使用默认行为执行相同的操作一次并且使用具有固定并发性 2 的自定义池来说明这一点:

The Streams use a ForkJoinPool for parallel operations. By default they are using the ForkJoinPool.commonPool() which does not allow changing the concurrency afterwards. However, you can use your own ForkJoinPool instance. When you execute the stream code within the context of your own ForkJoinPool this context pool will be used for the stream operations. The following example illustrates this by executing the same operation once using default behavior and once using a custom pool with a fixed concurrency of 2:

import java.util.HashSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

public class InterfaceStaticMethod {
    public static void main(String[] arg) throws Exception {
      Runnable parallelCode=() -> {
        HashSet<String> allThreads=new HashSet<>();
        IntStream.range(0, 1_000_000).parallel().filter(i->{
          allThreads.add(Thread.currentThread().getName()); return false;}
        ).min();
        System.out.println("executed by "+allThreads);
      };
      System.out.println("default behavior: ");
      parallelCode.run();
      System.out.println("specialized pool:");
      ForkJoinPool pool=new ForkJoinPool(2);
      pool.submit(parallelCode).get();
    }
}

这篇关于什么是最好的/最优雅的方式来限制并行流中并发评估的数量(如使用fixedThreadPool)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆