并行 flatMap 总是顺序的 [英] Parallel flatMap always sequential

查看:41
本文介绍了并行 flatMap 总是顺序的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有这个代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());

输出是相同的线程名称,因此 parallel 在这里没有任何好处 - 我的意思是有一个线程完成所有工作.

Output is the same thread name, so there is no benefit from parallel here - what I mean by that is that there is a single thread that does all the work.

flatMap里面有这样的代码:

result.sequential().forEach(downstream);

我理解强制 sequential 属性,如果外部"流是并行的(它们可能会阻塞),外部"将不得不等待flatMap"完成,反之亦然(因为使用了相同的公共池)但是为什么总是强制这样做?

I understand forcing the sequential property if the "outer" stream would be parallel (they could probably block), "outer" would have to wait for "flatMap" to finish and the other way around (since the same common pool is used) But why always force that?

这是可以在以后的版本中改变的事情之一吗?

Is that one of those things that could change in a later version?

推荐答案

有两个不同的方面.

首先,只有一个顺序或并行的流水线.在内部流中选择顺序还是并行是无关紧要的.请注意,您在引用的代码片段中看到的 downstream 使用者代表整个后续流管道,因此在您的代码中,以 .collect(Collectors.toSet()); 结尾,这个消费者最终会将结果元素添加到一个非线程安全的 Set 实例中.因此,与单个消费者并行处理内部流会破坏整个操作.

First, there is only a single pipeline which is either sequential or parallel. The choice of sequential or parallel at the inner stream is irrelevant. Note that the downstream consumer you see in the cited code snippet represents the entire subsequent stream pipeline, so in your code, ending with .collect(Collectors.toSet());, this consumer will eventually add the resulting elements to a single Set instance which is not thread safe. So processing the inner stream in parallel with that single consumer would break the entire operation.

如果外部流被拆分,则引用的代码可能会随着不同的消费者添加到不同的集合而同时被调用.这些调用中的每一个都会处理映射到不同内部流实例的外部流的不同元素.由于您的外部流仅由单个元素组成,因此无法拆分.

If an outer stream gets split, that cited code might get invoked concurrently with different consumers adding to different sets. Each of these calls would process a different element of the outer stream mapping to a different inner stream instance. Since your outer stream consists of a single element only, it can’t be split.

方式,这个已经实现了,也是为什么在flatMap()之后filter()不完全"的原因在 Java 流中惰性? 问题,因为 forEach 在内部流上被调用,它将所有元素传递给下游消费者.正如 this answer 所证明的那样,支持惰性和子流拆分的替代实现是可能的.但这是一种完全不同的实现方式.Stream 实现的当前设计主要通过消费者组合工作,因此最后,源拆分器(以及从中分离出来的那些)接收一个 Consumer 表示 tryAdvance 中的整个流管道forEachRemaining.相比之下,链接答案的解决方案进行拆分器组合,生成一个新的 Spliterator 委托给源拆分器.我想,这两种方法都有优点,但我不确定 OpenJDK 实现在相反的情况下会损失多少.

The way, this has been implemented, is also the reason for the Why filter() after flatMap() is "not completely" lazy in Java streams? issue, as forEach is called on the inner stream which will pass all elements to the downstream consumer. As demonstrated by this answer, an alternative implementation, supporting laziness and substream splitting, is possible. But this is a fundamentally different way of implementing it. The current design of the Stream implementation mostly works by consumer composition, so in the end, the source spliterator (and those split off from it) receives a Consumer representing the entire stream pipeline in either tryAdvance or forEachRemaining. In contrast, the solution of the linked answer does spliterator composition, producing a new Spliterator delegating to source spliterators. I supposed, both approaches have advantages and I’m not sure, how much the OpenJDK implementation would lose when working the other way round.

这篇关于并行 flatMap 总是顺序的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆