自定义线程池中的Java 8并行流 [英] Custom thread pool in Java 8 parallel stream

查看:593
本文介绍了自定义线程池中的Java 8并行流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以为Java 8 并行流指定自定义线程池?我找不到它在任何地方。

Is it possible to specify a custom thread pool for Java 8 parallel stream? I can not find it anywhere.

想象一下,我有一个服务器应用程序,我想使用并行流。但应用程序是大和多线程的,所以我想划分它。我不想在另一个模块的应用程序块任务的一个模块中运行缓慢的任务。

Imagine that I have a server application and I would like to use parallel streams. But the application is large and multi-threaded so I want to compartmentalize it. I do not want a slow running task in one module of the applicationblock tasks from another module.

如果我不能为不同的模块使用不同的线程池,这意味着我可以在大多数现实情况下不能安全地使用并行流。

If I can not use different thread pools for different modules, it means I can not safely use parallel streams in most of real world situations.

尝试以下示例。有一些CPU密集型任务在单独的线程中执行。
任务利用并行流。第一个任务被打破,因此每个步骤需要1秒(由线程睡眠模拟)。问题是,其他线程卡住,等待破碎的任务完成。这是一个假设的例子,但想象一个servlet应用程序和某人提交一个长时间运行的任务到共享fork连接池。

Try the following example. There are some CPU intensive tasks executed in separate threads. The tasks leverage parallel streams. The first task is broken, so each step takes 1 second (simulated by thread sleep). The issue is that other threads get stuck and wait for the broken task to finish. This is contrived example, but imagine a servlet app and someone submitting a long running task to the shared fork join pool.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}


推荐答案

实际上是一个技巧如何在特定的fork-join池中执行并行操作。如果在fork-join池中将其作为任务执行,它将保留在那里,不使用普通的。

There actually is a trick how to execute a parallel operation in a specific fork-join pool. If you execute it as a task in a fork-join pool, it stays there and does not use the common one.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

诀窍是基于 ForkJoinTask.fork ,其中指定:排列在当前任务正在运行的池中异步执行此任务,如果适用或使用ForkJoinPool.commonPool()如果不在ForkJoinPool()

The trick is based on ForkJoinTask.fork which specifies: "Arranges to asynchronously execute this task in the pool the current task is running in, if applicable, or using the ForkJoinPool.commonPool() if not inForkJoinPool()"

这篇关于自定义线程池中的Java 8并行流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆