并行流是否可以正常运行? [英] Will parallel stream work fine with distinct operation?

查看:89
本文介绍了并行流是否可以正常运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读有关无国籍状态的文章,并在 doc

I was reading about statelessness and came across this in doc:


如果
行为,流管道结果可能是不确定的或不正确的流操作的参数是有状态的。
有状态lambda(或实现适当的
功能接口的其他对象)的结果取决于
在流管道执行期间可能发生的任何状态。

Stream pipeline results may be nondeterministic or incorrect if the behavioral parameters to the stream operations are stateful. A stateful lambda (or other object implementing the appropriate functional interface) is one whose result depends on any state which might change during the execution of the stream pipeline.

现在,如果我有一个字符串列表( strList 说),然后尝试删除重复的字符串从它以下列方式使用并行流:

Now if I have the a list of string (strList say) and then trying to remove duplicate strings from it using parallel streams in the following way:

List<String> resultOne = strList.parallelStream().distinct().collect(Collectors.toList());

或者我们想要不区分大小写:

or in case we want case insensitive:

List<String> result2 = strList.parallelStream().map(String::toLowerCase)
                       .distinct().collect(Collectors.toList());

此代码是否有任何问题,因为并行流将分割输入并且在一个块中不同并不一定在整个输入中意味着不同?

Can this code have any problem as parallel streams will split the input and distinct in one chunk does not necessarily mean distinct in the whole input?

distinct 是有状态操作,在有状态中间操作的情况下,并行流可能需要多次传递或大量缓冲开销。如果元素的排序不相关,也可以更有效地实现 distinct
另外根据 doc

The distinct is a stateful operation and in case of stateful intermediate operations parallel streams may require multiple passes or substantial buffering overheads. Also distinct can be implemented more efficiently if ordering of elements is not relevant. Also as per doc:


对于有序流,不同元素的选择是稳定的(对于
重复元素,遇到
订单时首先出现的元素被保留。)对于无序流,没有稳定性保证

For ordered streams, the selection of distinct elements is stable (for duplicated elements, the element appearing first in the encounter order is preserved.) For unordered streams, no stability guarantees are made.

但是如果有序流并行运行,则distinct可能不稳定 - 意味着它将保留任意元素以防重复,并且不一定是第一个预期的 distinct 除此以外。

But in case of ordered stream running in parallel distinct may be unstable - means it will keep an arbitrary element in case of duplicates and not necessarily the first one as expected from distinct otherwise.

来自链接


在内部,distinct()操作保持一个包含
元素的Set之前已经看过,但是它被埋没在
操作中,我们无法从应用程序代码中获取它。

Internally, the distinct() operation keeps a Set that contains elements that have been seen previously, but it’s buried inside the operation and we can’t get to it from application code.

因此,在并行流的情况下,它可能会消耗整个流,或者可能使用CHM(类似于 ConcurrentHashMap.newKeySet())。对于有序的,它很可能是使用 LinkedHashSet 或类似的contruct。

So in case of parallel streams it would probably consume the entire stream or may use CHM (sth like ConcurrentHashMap.newKeySet()). And for ordered ones most likely it would be using LinkedHashSet or similar contruct.

推荐答案

大致指出 doc 强调我的):


中级操作进一步分为无状态和
状态操作
。无状态操作,例如过滤器和映射,
在处理新的
元素时不保留先前看到的元素的状态 - 每个元素可以独立于其他元素上的操作
进行处理。 有状态操作,例如distinct和sorted,
可以在处理
新元素时包含先前看到的元素的状态

Intermediate operations are further divided into stateless and stateful operations. Stateless operations, such as filter and map, retain no state from previously seen element when processing a new element -- each element can be processed independently of operations on other elements. Stateful operations, such as distinct and sorted, may incorporate state from previously seen elements when processing new elements

有状态操作可能需要在
产生结果之前处理整个输入
。例如,一个人不能从
排序流产生任何结果,直到看到流的所有元素。 作为
结果,在并行计算下,某些包含有状态
中间操作的管道可能需要对数据进行多次传递,或者
需要缓冲重要数据
。只包含
无状态中间操作的管道可以一次性处理,
无论是顺序还是并行,最小数据缓冲

Stateful operations may need to process the entire input before producing a result. For example, one cannot produce any results from sorting a stream until one has seen all elements of the stream. As a result, under parallel computation, some pipelines containing stateful intermediate operations may require multiple passes on the data or may need to buffer significant data. Pipelines containing exclusively stateless intermediate operations can be processed in a single pass, whether sequential or parallel, with minimal data buffering

如果你进一步阅读(订购部分):

If you read further down (section on ordering):


Streams可能有也可能没有定义的遭遇订单。流
是否具有遭遇顺序取决于源和
中间操作。 某些流源(例如List或
数组)本质上是有序的,而其他流(例如HashSet)
则不是。某些中间操作(例如sorted())可能会在其他无序流上强加
遭遇订单
,而其他可能
呈现无序的有序流,例如BaseStream.unordered()。
此外,一些终端操作可能会忽略遭遇订单,例如
forEach()。

Streams may or may not have a defined encounter order. Whether or not a stream has an encounter order depends on the source and the intermediate operations. Certain stream sources (such as List or arrays) are intrinsically ordered, whereas others (such as HashSet) are not. Some intermediate operations, such as sorted(), may impose an encounter order on an otherwise unordered stream, and others may render an ordered stream unordered, such as BaseStream.unordered(). Further, some terminal operations may ignore encounter order, such as forEach().

...


对于并行流,放宽排序约束有时可以使
更有效地执行。 如果元素的
排序不相关,则可以更有效地实现某些聚合操作,例如
过滤重复项(distinct())或分组减少
(Collectors.groupingBy())
即可。同样,
的操作本身与遇到订单相关联,例如limit(),可能需要
缓冲以确保正确排序,从而破坏
并行性的好处。 如果流具有遭遇订单,但
用户并不特别关心该遭遇订单,则显式
使用无序()对流进行解除顺序可以改善并行
某些有状态或终端操作的性能
。但是,大多数
流管道,例如上面的块的权重总和示例,
即使在订购约束下仍然可以高效并行化。

For parallel streams, relaxing the ordering constraint can sometimes enable more efficient execution. Certain aggregate operations, such as filtering duplicates (distinct()) or grouped reductions (Collectors.groupingBy()) can be implemented more efficiently if ordering of elements is not relevant. Similarly, operations that are intrinsically tied to encounter order, such as limit(), may require buffering to ensure proper ordering, undermining the benefit of parallelism. In cases where the stream has an encounter order, but the user does not particularly care about that encounter order, explicitly de-ordering the stream with unordered() may improve parallel performance for some stateful or terminal operations. However, most stream pipelines, such as the "sum of weight of blocks" example above, still parallelize efficiently even under ordering constraints.

总之,


  • distinct适用于并行流,但正如您可能已经知道的那样,它有在继续之前使用整个流,这可能会占用大量内存。

  • 如果项目的来源是无序集合(例如hashset)或者流是 unordered(),然后 distinct 不担心订购输出因此效率高

  • distinct will work fine with parallel streams, but as you may already know, it has to consume the entire stream before continuing and this may use a lot of memory.
  • If the source of the items is an unordered collection (such as hashset) or the stream is unordered(), then distinct is not worried about ordering the output and thus will be efficient

如果您不担心订单并希望看到,请将 .unordered()添加到流管道更多表现。

Solution is to add .unordered() to the stream pipeline if you are not worried about order and would like to see more performance.

List<String> result2 = strList.parallelStream()
                              .unordered()
                              .map(String::toLowerCase)
                              .distinct()
                              .collect(Collectors.toList());






唉,没有(可用的内置)并发hashset在Java中(除非他们聪明地使用 ConcurrentHashMap ),所以我只能给你一个不幸的可能性,即使用常规Java集以阻塞方式实现distinct。在这种情况下,我没有看到做并行不同的任何好处。


Alas there is no (available builtin) concurrent hashset in Java (unless they got clever with ConcurrentHashMap), so I can only leave you with the unfortunate possibility that distinct is implemented in a blocking fashion using a regular Java set. In which case, I don't see any benefit of doing a parallel distinct.

编辑:我说得太早了。使用具有不同的并行流可能会有一些好处。看起来 distinct 的实现比我最初想象的更聪明。请参阅 @ Eugene's 回答

I spoke too soon. There might be some benefit with using parallel streams with distinct. It looks like distinct is implemented with more cleverness than I initially thought. See @Eugene's answer.

这篇关于并行流是否可以正常运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆