有没有办法强制 parallelStream() 并行? [英] Is there a way to force parallelStream() to go parallel?

查看:27
本文介绍了有没有办法强制 parallelStream() 并行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果输入大小太小,库 自动序列化流中地图的执行,但这种自动化没有也不能考虑地图操作的繁重程度.有没有办法强制 parallelStream() 实际并行化 CPU heavy 映射?

If the input size is too small the library automatically serializes the execution of the maps in the stream, but this automation doesn't and can't take in account how heavy is the map operation. Is there a way to force parallelStream() to actually parallelize CPU heavy maps?

推荐答案

似乎存在根本性的误解.链接的问答讨论了流显然不能并行工作,因为 OP 没有看到预期的加速.结论是,如果工作负载太小,并行处理没有任何好处不是自动回退到顺序执行.

There seems to be a fundamental misunderstanding. The linked Q&A discusses that the stream apparently doesn’t work in parallel, due to the OP not seeing the expected speedup. The conclusion is that there is no benefit in parallel processing if the workload is too small, not that there was an automatic fallback to sequential execution.

其实恰恰相反.如果您请求并行,就会得到并行,即使它实际上降低了性能.在这种情况下,实现不会切换到可能更高效的顺序执行.

It’s actually the opposite. If you request parallel, you get parallel, even if it actually reduces the performance. The implementation does not switch to the potentially more efficient sequential execution in such cases.

因此,如果您确信每个元素的工作负载足够高以证明无论元素数量如何都可以使用并行执行,那么您可以简单地请求并行执行.

So if you are confident that the per-element workload is high enough to justify the use of a parallel execution regardless of the small number of elements, you can simply request a parallel execution.

很容易证明:

Stream.of(1, 2).parallel()
      .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
      .forEach(System.out::println);

在 Ideone 上打印

processing 2 in Thread[main,5,main]
2
processing 1 in Thread[ForkJoinPool.commonPool-worker-1,5,main]
1

但消息和详细信息的顺序可能会有所不同.甚至有可能在某些环境中,两个任务可能碰巧由同一​​个线程执行,如果它可以在另一个线程开始接收第二个任务之前完成第二个任务.但当然,如果任务足够昂贵,这不会发生.重要的一点是整个工作负载已被拆分并排队等待其他工作线程潜在地接收.

but the order of messages and details may vary. It may even be possible that in some environments, both task may happen to get executed by the same thread, if it can steel the second task before another thread gets started to pick it up. But of course, if the tasks are expensive enough, this won’t happen. The important point is that the overall workload has been split and enqueued to be potentially picked up by other worker threads.

如果上面的简单示例在您的环境中由单个线程执行,您可以像这样插入模拟工作负载:

If execution by a single thread happens in your environment for the simple example above, you may insert simulated workload like this:

Stream.of(1, 2).parallel()
      .peek(x -> System.out.println("processing "+x+" in "+Thread.currentThread()))
      .map(x -> {
           LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(3));
           return x;
        })
      .forEach(System.out::println);

然后,您可能还会看到,如果每个元素的处理时间"足够长.

Then, you may also see that the overall execution time will be shorter than "number of elements"×"processing time per element" if the "processing time per element" is high enough.

更新:误解可能是由 Brian Goetz 的误导性陈述引起的:在您的情况下,您的输入集太小而无法分解".

Update: the misunderstanding might be cause by Brian Goetz’ misleading statement: "In your case, your input set is simply too small to be decomposed".

必须强调,这不是Stream API的通用属性,而是已经使用的Map.HashMap 有一个后备数组,条目根据它们的哈希码分布在该数组中.将数组拆分为 n 个范围可能不会导致所包含元素的平衡拆分,尤其是当只有两个时.HashMapSpliterator 的实现者考虑过在数组中搜索元素以获得完美平衡的拆分过于昂贵,而不是拆分两个元素不值得.

It must be emphasized that this is not a general property of the Stream API, but the Map that has been used. A HashMap has a backing array and the entries are distributed within that array depending on their hash code. It might be the case that splitting the array into n ranges doesn’t lead to a balanced split of the contained element, especially, if there are only two. The implementors of the HashMap’s Spliterator considered searching the array for elements to get a perfectly balanced split to be too expensive, not that splitting two elements was not worth it.

由于 HashMap 的默认容量是 16 并且示例只有两个元素,我们可以说地图过大.简单地修复也可以修复示例:

Since the HashMap’s default capacity is 16 and the example had only two elements, we can say that the map was oversized. Simply fixing that would also fix the example:

long start = System.nanoTime();

Map<String, Supplier<String>> input = new HashMap<>(2);
input.put("1", () -> {
    System.out.println(Thread.currentThread());
    LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
    return "a";
});
input.put("2", () -> {
    System.out.println(Thread.currentThread());
    LockSupport.parkNanos("simulated workload", TimeUnit.SECONDS.toNanos(2));
    return "b";
});
Map<String, String> results = input.keySet()
        .parallelStream().collect(Collectors.toConcurrentMap(
    key -> key,
    key -> input.get(key).get()));

System.out.println("Time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime()- start));

在我的机器上,它打印

Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Time: 2058

结论是 Stream 实现总是尝试使用并行执行,如果您请求它,无论输入大小如何.但这取决于输入的结构如何将工作负载分配到工作线程.事情可能会更糟,例如如果您从文件中流式传输行.

The conclusion is that the Stream implementation always tries to use parallel execution, if you request it, regardless of the input size. But it depends on the input’s structure how well the workload can be distributed to the worker threads. Things could be even worse, e.g. if you stream lines from a file.

如果您认为平衡拆分的好处值得复制步骤的成本,您也可以使用 new ArrayList<>(input.keySet()).parallelStream() 代替input.keySet().parallelStream(),因为ArrayList中元素的分布总是允许完美平衡的分割.

If you think that the benefit of a balanced splitting is worth the cost of a copying step, you could also use new ArrayList<>(input.keySet()).parallelStream() instead of input.keySet().parallelStream(), as the distribution of elements within ArrayList always allows a perflectly balanced split.

这篇关于有没有办法强制 parallelStream() 并行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆