在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗? [英] Is there anything wrong with using I/O + ManagedBlocker in Java8 parallelStream()?

查看:258
本文介绍了在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Java 8中的默认"paralellStream()"使用公用ForkJoinPool,如果在提交任务时耗尽了公用Pool线程,则这可能是一个延迟问题.但是,在许多情况下,有足够的CPU能力可用,并且任务足够简短,因此这不是问题.如果我们确实有一些长期运行的任务,那么这当然需要仔细考虑,但是对于这个问题,让我们假设这不是问题.

The default "paralellStream()" in Java 8 uses the common ForkJoinPool which may be a latency problem if the common Pool threads are exhausted when a task is submitted. However in many cases enough CPU power is available and the tasks are short enough so that this is not a problem. If we do have some long running tasks this will of course need some careful consideration, but for this question let's assume that this is not the problem.

但是,用实际上不执行任何CPU约束工作的I/O任务填充ForkJoinPool是引入瓶颈的一种方法,即使有足够的CPU能力. 我理解.但是,这就是我们要使用的ManagedBlocker的原因.因此,如果我们有一个I/O任务,我们应该简单地允许ForkJoinPoolManagedBlocker中进行管理.这听起来非常容易.然而,令我惊讶的是,对于ManagedBlocker而言,它是一个相当复杂的API.毕竟,我认为这是一个普遍的问题.因此,我只是构建了一个简单的实用程序方法,使ManagedBlocker在常见情况下易于使用:

However filling the ForkJoinPool with I/O tasks that don't actually do any CPU-bound work is a way to introduce a bottleneck even though enough CPU power is available. I understood that. However that is what we have the ManagedBlocker for. So if we have an I/O task we should simply allow the ForkJoinPool to manage that within a ManagedBlocker. That sounds incredibly easy. However to my surprise using a ManagedBlocker is rather complicated API for the simple thing that it is. And after all I think that this is a common problem. So I just built a simple utility method that makes ManagedBlockers easy to use for the common case:

public class BlockingTasks {

    public static<T> T callInManagedBlock(final Supplier<T> supplier) {
        final SupplierManagedBlock<T> managedBlock = new SupplierManagedBlock<>(supplier);
        try {
            ForkJoinPool.managedBlock(managedBlock);
        } catch (InterruptedException e) {
            throw new Error(e);
        }
        return managedBlock.getResult();
    }

    private static class SupplierManagedBlock<T> implements ForkJoinPool.ManagedBlocker {
        private final Supplier<T> supplier;
        private T result;
        private boolean done = false;

        private SupplierManagedBlock(final Supplier<T> supplier) {
            this.supplier = supplier;
        }

        @Override
        public boolean block() {
            result = supplier.get();
            done = true;
            return true;
        }

        @Override
        public boolean isReleasable() {
            return done;
        }

        public T getResult() {
            return result;
        }
    }
}

现在,如果我想在paralell中下载几个网站的html代码,则可以这样进行,而不会造成I/O的任何麻烦:

Now if I want to download the html code of a couple of websites in paralell I could to it like this without the I/O causing any trouble:

public static void main(String[] args) {
    final List<String> pagesHtml = Stream
        .of("https://google.com", "https://stackoverflow.com", "...")
        .map((url) -> BlockingTasks.callInManagedBlock(() -> download(url)))
        .collect(Collectors.toList());
}

对于Java没有像上面的BlockingTasks这样的类,我感到有些惊讶(但是我没有找到它?),但是构建起来并不难.

I am a little bit surprised that there is no class like the BlockingTasks above shipped with Java (or I did not find it?), but it was not that hard to build.

当我用Google搜索"java 8并行流"时,我在前四个结果中发现那些声称由于I/O问题Fork/Join吸收Java的文章:

When I google for "java 8 parallel stream" I get in the first four results those articles that claim that due to the I/O problem Fork/Join sucks in Java:

  • https://dzone.com/articles/think-twice-using-java-8
  • http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/ (at least mentions ManagedBlocker but also says "in a different use case you’d be able to give it a ManagedBlocker instance". It does not mention why not in this case.

我已经对搜索词进行了一些更改,尽管有很多人抱怨生活多么糟糕,但是却发现没人在谈论像上面这样的解决方案.由于我不喜欢Marvin(就像行星一样的大脑)并​​且Java 8可用了相当长的一段时间,我怀疑我在那里提出的建议存在严重错误.

I have altered my search terms somewhat and while there a lot of people complaining about how horrible life is I found nobody talking about a solution like the above. Since I don't feel like Marvin (brain like a planet) and Java 8 is available for quite a while I suspect that there is something terribly wrong with what I am proposing up there.

我猛烈地进行了一次小测试:

I banged together a small test:

public static void main(String[] args) {
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Start");
    IntStream.range(0, 10).parallel().forEach((x) -> sleep());
    System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": End");
}

public static void sleep() {
    try {
        System.out.println(DateTimeFormatter.ISO_LOCAL_TIME.format(LocalTime.now()) + ": Sleeping " + Thread.currentThread().getName());
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        throw new Error(e);
    }
}

我跑得到了以下结果:

18:41:29.021: Start
18:41:29.033: Sleeping main
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-2
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-5
18:41:29.034: Sleeping ForkJoinPool.commonPool-worker-4
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-6
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-3
18:41:29.035: Sleeping ForkJoinPool.commonPool-worker-7
18:41:39.034: Sleeping main
18:41:39.034: Sleeping ForkJoinPool.commonPool-worker-1
18:41:49.035: End

因此,在我的8 CPU计算机上,ForkJoinPool自然选择了8个线程,完成了前8个任务,最后完成了最后两个任务,这意味着这花费了20秒,并且如果还有其他任务排队,则该池可能仍未使用显然处于空闲状态的CPU(最近10秒钟中的6个内核除外).

So on my 8 CPU computer the ForkJoinPool naturally choose 8 threads, completed the first 8 tasks and finally the last two tasks which means that this took 20 seconds and if there were other tasks queued the pool could still have not used the clearly idle CPUs (except for 6 cores in the last 10 seconds).

那我用过...

IntStream.range(0, 10).parallel().forEach((x) -> callInManagedBlock(() -> { sleep(); return null; }));

...而不是...

...instead of...

IntStream.range(0, 10).parallel().forEach((x) -> sleep());

...并得到以下结果:

...and got the following result:

18:44:10.93: Start
18:44:10.945: Sleeping main
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-7
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-1
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-6
18:44:10.953: Sleeping ForkJoinPool.commonPool-worker-3
18:44:10.955: Sleeping ForkJoinPool.commonPool-worker-2
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-4
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-5
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-0
18:44:10.956: Sleeping ForkJoinPool.commonPool-worker-11
18:44:20.957: End

在我看来,这是可行的,已启动了额外的线程来补偿我的模拟阻塞I/O操作"(休眠).时间减少到10秒,我想如果我将更多任务排队,这些任务仍然可以使用可用的CPU能力.

It looks to me like this works, extra threads were started to compensate my mock "blocking I/O action" (sleep). Time was cut down to 10 seconds and I suppose that if I'd queue more tasks that those could still use the available CPU power.

此解决方案是否存在问题,或者如果将I/O操作包装在ManagedBlock中,则通常在流中使用I/O吗?

Is there anything wrong with this solution or in general using I/O in streams if the I/O operation is wrapped in a ManagedBlock?

推荐答案

简而言之,是的,您的解决方案存在一些问题.使用并行流中的阻塞代码肯定会改善,并且某些第三方库提供了类似的解决方案(例如,参见

In short, yes, there are some problems with your solution. It definitely improves using blocking code inside parallel stream, and some third-party libraries provide similar solution (see, for example, Blocking class in jOOλ library). However this solution does not change the internal splitting strategy used in Stream API. The number of subtasks created by Stream API is controlled by the predefined constant in AbstractTask class:

/**
 * Default target factor of leaf tasks for parallel decomposition.
 * To allow load balancing, we over-partition, currently to approximately
 * four tasks per processor, which enables others to help out
 * if leaf tasks are uneven or some processors are otherwise busy.
 */
static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;

如您所见,它比普通池并行性(默认情况下为CPU内核数)大四倍.真正的拆分算法稍微有些棘手,但是即使所有任务都被阻塞,您大概也不能拥有超过4x-8x的任务.

As you can see it's four times bigger than common pool parallelism (which is by default number of CPU cores). The real splitting algorithm is a little bit more tricky, but roughly you cannot have more than 4x-8x tasks even if all of them are blocking.

例如,如果您有8个CPU内核,则Thread.sleep()测试在IntStream.range(0, 32)之前都可以很好地工作(因为32 = 8 * 4).但是对于IntStream.range(0, 64),您将有32个并行任务,每个任务处理两个输入数字,因此整个处理将花费20秒,而不是10秒.

For example, if you have 8 CPU cores, your Thread.sleep() test will work nicely up to IntStream.range(0, 32) (as 32 = 8*4). However for IntStream.range(0, 64) you will have 32 parallel tasks each processing two input numbers, so the whole processing would take 20 seconds, not 10.

这篇关于在Java8 parallelStream()中使用I/O + ManagedBlocker有什么问题吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆