并行转换流时如何使用收集器 [英] How collectors are used when turning the stream in parallel

查看:149
本文介绍了并行转换流时如何使用收集器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我实际上试图回答这个问题如何跳过偶数行< String>从Files.lines获得。所以我虽然这个收集器并不能很好地并行工作:

I actually tried to answer this question How to skip even lines of a Stream<String> obtained from the Files.lines. So I though this collector wouldn't work well in parallel:

private static Collector<String, ?, List<String>> oddLines() {
    int[] counter = {1};
    return Collector.of(ArrayList::new,
            (l, line) -> {
                if (counter[0] % 2 == 1) l.add(line);
                counter[0]++;
            },
            (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            });
}

但它有效。

编辑:它实际上没有用;我被我的输入集太小而无法触发任何并行性这一事实所迷惑;请参阅评论中的讨论

我认为这是行不通的,因为我想到了以下两个执行计划。

I thought it wouldn't work because of the two following plans of executions comes to my mind.

线程t1读取Stream的第一个元素,所以if条件满足。它将第一个元素添加到其列表中。然后执行在他有时间更新数组值之前停止。

Thread t1 read the first element of the Stream, so the if condition is satisfied. It adds the first element to its list. Then the execution stops before he has the time to update the array value.

线程t2,从流的第4个元素开始,将其添加到其列表中。所以我们最终得到了一个非想要的元素。

Thread t2, which says started at the 4th element of the stream add it to its list. So we end up with a non-wanted element.

当然,由于这个收集器似乎有效,我想它不会那样工作。而且无论如何更新都不是原子的。

Of course since this collector seems to works, I guess it doesn't work like that. And the updates are not atomic anyway.

在这种情况下,更新没有更多的问题,但是没有什么能阻止线程t2不会在第4个启动流的元素。因此,他也不会那样工作。

In this case there is no more problems for the update, but nothing prevents me that the thread t2 will not start at the 4th element of the stream. So he doesn't work like that either.



所以它似乎根本不起作用,这让我感到高兴问题...收集器是如何并行使用的?


So it seems that it doesn't work like that at all, which brings me to the question... how the collector is used in parallel?

有人可以解释一下它是如何工作的以及为什么我的收集器在并行运行时工作?

Can someone explain me basically how it works and why my collector works when ran in parallel?

非常感谢!

推荐答案

传递 parallel()进入收藏家的源流足以打破逻辑,因为您的共享状态(计数器可能从不同的任务增加。您可以验证,因为它永远不会为任何有限流输入返回正确的结果:

Passing a parallel() source stream into your collector is enough to break the logic because your shared state (counter) may be incremented from different tasks. You can verify that, because it is never returning the correct result for any finite stream input:

    Stream<String> lines = IntStream.range(1, 20000).mapToObj(i -> i + "");
    System.out.println(lines.isParallel());
    lines = lines.parallel();
    System.out.println(lines.isParallel());

    List<String> collected = lines.collect(oddLines());

    System.out.println(collected.size());

请注意,对于无限流(例如,从 Files.lines()读取时)您需要在流中生成大量数据,因此它实际上要求任务同时运行一些块。

Note that for infinite streams (e.g. when reading from Files.lines()) you need to generate some significant amount of data in the stream, so it actually forks a task to run some chunks concurrently.

我的输出是:

false
true
12386

这显然是错误的。

正如评论中正确指出的@Holger,当你的收藏家指定 CONCURRENT UNORDERED ,在这种情况下,它们跨任务操作单个共享集合( ArrayList :: new 调用一次每个流),其中 - 只有 parallel()它会在集合每个任务上运行累加器,然后使用你的结果组合结果定义组合器。

As @Holger in the comments correctly pointed out, there is a different race that can happen when your collector is specifying CONCURRENT and UNORDERED, in which case they operate on a single shared collection across tasks (ArrayList::new called once per stream), where-as with only parallel() it will run the accumulator on a collection per task and then later combine the result using your defined combiner.

如果要将特征添加到收集器,由于单个集合中的共享状态,您可能会遇到以下结果:

If you'd add the characteristics to the collector, you might run into the following result due to the shared state in a single collection:

false
true
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 73
    at java.util.ArrayList.add(ArrayList.java:459)
    at de.jungblut.stuff.StreamPallel.lambda$0(StreamPallel.java:18)
    at de.jungblut.stuff.StreamPallel$$Lambda$3/1044036744.accept(Unknown Source)
    at java.util.stream.ReferencePipeline.lambda$collect$207(ReferencePipeline.java:496)
    at java.util.stream.ReferencePipeline$$Lambda$6/2003749087.accept(Unknown Source)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:496)
    at de.jungblut.stuff.StreamPallel.main(StreamPallel.java:32)12386

这篇关于并行转换流时如何使用收集器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆