Java 8 并行流中的自定义线程池 [英] Custom thread pool in Java 8 parallel stream

查看:29
本文介绍了Java 8 并行流中的自定义线程池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以为 Java 8 并行流指定自定义线程池?我在任何地方都找不到它.

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

想象一下,我有一个服务器应用程序,我想使用并行流.但是该应用程序很大并且是多线程的,所以我想对它进行划分.我不希望在另一个模块的 applicationblock 任务的一个模块中运行缓慢的任务.

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

如果我不能为不同的模块使用不同的线程池,这意味着我不能在现实世界的大多数情况下安全地使用并行流.

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

试试下面的例子.有一些 CPU 密集型任务在单独的线程中执行.这些任务利用并行流.第一个任务被破坏,所以每一步需要 1 秒(由线程睡眠模拟).问题是其他线程卡住并等待中断的任务完成.这是一个人为的例子,但想象一个 servlet 应用程序和某人将一个长时间运行的任务提交到共享 fork 加入池.

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

推荐答案

实际上有一个技巧如何在特定的 fork-join 池中执行并行操作.如果你将它作为一个 fork-join 池中的任务来执行,它会留在那里并且不使用公共的.

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

该技巧基于 ForkJoinTask.fork 其中指定:安排在当前任务正在运行的池中异步执行此任务,如果适用,或使用 ForkJoinPool.commonPool() 如果不是 inForkJoinPool()"

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

这篇关于Java 8 并行流中的自定义线程池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆