并行流非并发无序收集器 [英] Parallel Stream non-concurrent unordered collector

查看:134
本文介绍了并行流非并发无序收集器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有这个自定义收集器:

Suppose I have this custom collector :

  public class CustomToListCollector<T> implements Collector<T, List<T>, List<T>> {

     @Override
     public Supplier<List<T>> supplier() {
         return ArrayList::new;
     }

     @Override
     public BiConsumer<List<T>, T> accumulator() {
         return List::add;
     }

     @Override
     public BinaryOperator<List<T>> combiner() {
         return (l1, l2) -> {
            l1.addAll(l2);
            return l1;
         };
     }

     @Override
     public Function<List<T>, List<T>> finisher() {
         return Function.identity();
     }

     @Override
     public Set<java.util.stream.Collector.Characteristics> characteristics() {
         return EnumSet.of(Characteristics.IDENTITY_FINISH, Characteristics.UNORDERED);
     }
}

这正是收藏家#toList 实现与一个小的区别:还添加了UNORDERED特征。

This is exactly the Collectors#toList implementation with one minor difference: there's also UNORDERED characteristics added.

我会假设运行此代码:

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);

    for (int i = 0; i < 100_000; i++) {
        List<Integer> result = list.parallelStream().collect(new CustomToListCollector<>());
        if (!result.equals(list)) {
            System.out.println(result);
            break;
        }
    }

实际上应该产生一些结果。但事实并非如此。

should actually produce some result. But it does not.

我看了一下。 ReferencePipeline#collect 首先检查流是并行的,收集器是并发的还是收集器是无序的。缺少并发,因此它通过从此收集器创建TerminalOp来委托方法评估。引擎盖下的是一个ReducingSink,实际上关心收集器是否无序

I've looked under the hood a bit. ReferencePipeline#collect first checks if the stream is parallel, if the collector is concurrent and if the collector is unordered. Concurrent is missing, so it delegates to a method evaluate by creating a TerminalOp out of this collector. This under the hood is a ReducingSink, that actually cares if the collector is unordered or not:

         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }

        @Override
        public int getOpFlags() {
            return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
                   ? StreamOpFlag.NOT_ORDERED
                   : 0;
        }
    }; 

我没有进一步调试,因为它变得非常复杂。

I have not debugged further since it gets pretty complicated fast.

因此,这里可能有一条捷径,有人可以解释我所缺少的东西。它是一个并行流,用于收集非并发无序收集器中的元素。不应该没有关于线程如何将结果组合在一起的顺序?如果没有,这里的订单是如何强加的(由谁)?

Thus may be there is a shortcut here and someone could explain what I am missing. It is a parallel stream that collects elements in a non-concurrent unordered collector. Shouldn't there be no order in how the threads combine the results together? If not, how is the order imposed here (by whom)?

推荐答案

注意使用<$时的结果是相同的c $ c> list .parallelStream()。unordered()。COLLlect(Collectors.toList()),在任何一种情况下,当前实现中都不使用无序属性。

Note that the result is the same when using list .parallelStream() .unordered() .collect(Collectors.toList()), in either case, the unordered property is not used within the current implementation.

但是让我们稍微改变一下设置:

But let’s change the setup a little bit:

List<Integer> list = Collections.nCopies(10, null).stream()
    .flatMap(ig -> IntStream.range(0, 100).boxed())
    .collect(Collectors.toList());
List<Integer> reference = new ArrayList<>(new LinkedHashSet<>(list));

for (int i = 0; i < 100_000; i++) {
    List<Integer> result = list.parallelStream()
      .distinct()
      .collect(characteristics(Collectors.toList(), Collector.Characteristics.UNORDERED));
    if (!result.equals(reference)) {
        System.out.println(result);
        break;
    }
}

使用特征 此答案的收藏家工厂

有趣的结果是在 1.8.0_60 之前的Java 8版本中,此具有不同的结果。如果我们使用具有不同身份的对象而不是规范的 Integer 实例,我们可以检测到在这些早期版本中,不仅列表的顺序不同,而且对象在结果列表不是第一个遇到的实例。

using the characteristics collector factory of this answer
The interesting outcome is that in Java 8 versions prior to 1.8.0_60, this has a different outcome. If we use objects with distinct identities instead of the canonical Integer instance, we could detect that in these earlier versions, not only the order of the list differs, but that the objects in the result list are not the first encountered instances.

因此,终端操作的无序特征传播到流,影响 distinct的行为(),类似于 skip limit ,如上所述在此处此处

So the unordered characteristic of a terminal operation was propagated to the stream, affecting the behavior of distinct(), similar to that of skip and limit, as discussed here and here.

如第二个链接线程所述,反向传播已被完全删除,这在第二次考虑时是合理的。对于 distinct 跳过限制,顺序为源是相关的,忽略它只是因为在后续阶段将忽略订单是不对的。因此,可以从反向传播中受益的唯一剩余的有状态中间操作将是 sorted ,这将在以后忽略该订单时变得过时。但是将 sorted 与无序接收器相结合更像是一个编程错误...

As discussed in the second linked thread, the back-propagation has been removed completely, which is reasonable when thinking about it a second time. For distinct, skip and limit, the order of the source is relevant and ignoring it just because the order will be ignored in subsequent stages is not right. So the only remaining stateful intermediate operation that could benefit from back-propagation would be sorted, which would be rendered obsolete when the order is being ignored afterwards. But combining sorted with an unordered sink is more like a programming error anyway…

对于无状态中间操作,订单是无论如何都无关紧要。流处理的工作原理是将源拆分为块,在合并到结果容器之前,独立地对其元素应用所有无状态中间操作并收集到本地容器中。所以合并步骤是唯一的地方,尊重或忽略(块的)顺序会对结果产生影响,也许会影响性能。

For stateless intermediate operations the order is irrelevant anyway. The stream processing works by splitting the source into chunks, apply all stateless intermediate operations on their elements independently and collecting into a local container, before merging into the result container. So the merging step is the only place, where respecting or ignoring the order (of the chunks) will have an impact on the result and perhaps on the performance.

但是影响不是很大。当您实施这样的操作时,例如,通过 ForkJoinTask s,您只需将任务拆分为两个,等待它们完成并合并它们。或者,任务可以将块拆分为子任务,就地处理其剩余块,等待子任务并合并。在任何一种情况下,由于启动任务具有对相邻任务的引用这一事实,因此按顺序合并结果是自然的。要改为合并不同的块,首先必须以某种方式找到相关的子任务。

But the impact isn’t very big. When you implement such an operation, e.g. via ForkJoinTasks, you simply split a task into two, wait for their completion and merge them. Alternatively, a task may split off a chunk into a sub-task, process its remaining chunk in-place, wait for the sub-task and merge. In either case, merging the results in order comes naturally due to the fact that the initiating task has hands on references to the adjacent tasks. To merge with different chunks instead, the associated sub-tasks first have to be found somehow.

合并另一个任务的唯一好处是你可以合并第一个完成的任务,如果任务需要不同的时间来完成。但是当等待Fork / Join框架中的子任务时,线程将不会处于空闲状态,框架将使用该线程来处理其中的其他待处理任务。因此,只要主要任务被分成足够的子任务,就会有完全的CPU利用率。此外,分裂器试图分成均匀的块以减少计算时间之间的差异。很可能,替代无序合并实现的好处并不能证明代码重复的合理性,至少在当前实现方面是这样。

The only benefit from merging with a different task would be that you can merge with the first completed task, if the tasks need different time to complete. But when waiting for a sub-task in the Fork/Join framework, the thread won’t be idle, the framework will use the thread for working on other pending tasks in-between. So as long as the main task has been split into enough sub-tasks, there will be full CPU utilization. Also, the spliterators attempt to split into even chunks to reduce the differences between the computing times. It’s very likely that the benefit of an alternative unordered merging implementation doesn’t justify the code duplication, at least with the current implementation.

仍然,报告无序特征允许实现在有益的时候利用它,实现可以改变。

Still, reporting an unordered characteristic allows the implementation to utilize it when beneficial and implementations can change.

这篇关于并行流非并发无序收集器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆