并行flatMap总是顺序的 [英] parallel flatMap always sequential

查看:223
本文介绍了并行flatMap总是顺序的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有这段代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());

输出是相同的线程名称,因此 parallel <没有任何好处/ code>这里 - 我的意思是,有一个线程可以完成所有工作。

Output is the same thread name, so there is no benefit from parallel here - what I mean by that is that there is a single thread that does all the work.

内部 flatMap 有这样的代码:

result.sequential().forEach(downstream);

我理解强制顺序属性如果外部流将是并行的(它们可能会阻塞),外部必须等待flatMap完成,反之亦然(因为使用相同的公共池)但为什么总是强制吗?

I understand forcing the sequential property if the "outer" stream would be parallel (they could probably block), "outer" would have to wait for "flatMap" to finish and the other way around (since the same common pool is used) But why always force that?

在以后的版本中,可以更改的那一件事是什么?

Is that one of those things that could change in a later version?

推荐答案

有两个不同的方面。

首先,只有一个管道是顺序或并行的。在内部流中选择顺序或并行是无关紧要的。请注意,您在引用的代码段中看到的下游使用者表示整个后续流管道,因此在您的代码中,以 .collect结尾(收集器。 toSet()); ,此消费者最终会将结果元素添加到一个非线程安全的 Set 实例中。因此,与该单个使用者并行处理内部流将破坏整个操作。

First, there is only a single pipeline which is either sequential or parallel. The choice of sequential or parallel at the inner stream is irrelevant. Note that the downstream consumer you see in the cited code snippet represents the entire subsequent stream pipeline, so in your code, ending with .collect(Collectors.toSet());, this consumer will eventually add the resulting elements to a single Set instance which is not thread safe. So processing the inner stream in parallel with that single consumer would break the entire operation.

如果外部流被拆分,则引用的代码可能会与不同的消费者同时调用到不同的集合。这些调用中的每一个都将处理外部流映射到不同内部流实例的不同元素。由于您的外部流只包含一个元素,因此无法拆分。

If an outer stream gets split, that cited code might get invoked concurrently with different consumers adding to different sets. Each of these calls would process a different element of the outer stream mapping to a different inner stream instance. Since your outer stream consists of a single element only, it can’t be split.

这种方式已经实现,也是为什么flatMap()之后的filter()在Java流中不完全懒惰?问题,因为 forEach <在内部流上调用/ code>,它将所有元素传递给下游使用者。正如此答案所示,可以实现支持懒惰和子流分割的替代实现。但这是实现它的一种根本不同的方式。 Stream实现的当前设计主要由消费者组合工作,因此最后,源分裂器(以及与它分离的那些)接收表示整个流管道的 Consumer tryAdvance forEachRemaining 中。相比之下,链接答案的解决方案会进行分裂器组合,从而生成一个新的 Spliterator 委托给源分裂器。我认为,这两种方法都有优势,我不确定,在反过来工作时,OpenJDK实施会失去多少。

The way, this has been implemented, is also the reason for the Why filter() after flatMap() is "not completely" lazy in Java streams? issue, as forEach is called on the inner stream which will pass all elements to the downstream consumer. As demonstrated by this answer, an alternative implementation, supporting laziness and substream splitting, is possible. But this is a fundamentally different way of implementing it. The current design of the Stream implementation mostly works by consumer composition, so in the end, the source spliterator (and those split off from it) receives a Consumer representing the entire stream pipeline in either tryAdvance or forEachRemaining. In contrast, the solution of the linked answer does spliterator composition, producing a new Spliterator delegating to source spliterators. I supposed, both approaches have advantages and I’m not sure, how much the OpenJDK implementation would lose when working the other way round.

这篇关于并行flatMap总是顺序的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆