为什么通过原始数据在无限数量的数字流中进行过滤,如果并行处理则永远过滤? [英] Why is filtering by primality in an inifinite stream of numbers taking forever if processed in parallel?

查看:118
本文介绍了为什么通过原始数据在无限数量的数字流中进行过滤,如果并行处理则永远过滤?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个无限的整数流,从200万开始,使用一个朴素的素数测试实现来生成负载并将结果限制为10。

 谓词<整数> isPrime = new Predicate< Integer>(){
@Override
public boolean test(Integer n){
for(int i = 2; i< n; i ++){
if(n%i == 0)return false;
}
return true;
}
};

Stream.iterate(200_000_000,n - > ++ n)
.filter(isPrime)
.limit(10)
.forEach(i - > ; System.out.print(i +));

这样可以正常工作。



,如果我在过滤之前添加对parallel()的调用,则不会生成任何内容,并且处理未完成。

  Stream.iterate (200_000_000,n  - > ++ n)
.parallel()
.filter(isPrime)
.limit(10)
.forEach(i-& out.print(i +));有人可能指示我在这里发生的事情的正确方向吗?


编辑:我不是寻找更好的原始性测试实现(它是一个长期运行的实现),但是解释使用并行流的负面影响。

解决方案

处理实际上完成,虽然可能需要很长时间,这取决于机器上的硬件线程数。 API文档约限制警告并行流可能较慢。



实际上,并行流首先根据可用并行性级别将计算分割到几个部分,执行计算每个部分,然后将结果加在一起。你的任务有多少部分?每个公共FJP线程(= Runtime.getRuntime()。availableProcessors())加上(有时?)一个当前线程,如果它不在FJP。您可以控制它添加

  System.setProperty(java.util.concurrent.ForkJoinPool.common.parallelism,4 ); 

实际上,对于您的任务,您设置的数字越小,计算速度越快。



如何分割无限任务?您的特定任务由IteratorSpliterator处理,其中 trySplit 方法从1024开始创建大小不断增加的块。你可以自己尝试:

  Spliterator< Integer> spliterator = Stream.iterate(200_000_000,n  - > ++ n).spliterator(); 
Spliterator [] spliterators = new Spliterator [10];
for(int i = 0; i< spliterators.length; i ++){
spliterators [i] = spliterator.trySplit();
}
for(int i = 0; i< spliterators.length; i ++){
System.out.print((i + 1)+:);
spliterators [i] .tryAdvance(System.out :: println);
}

因此,第一个块处理数字范围200000000-200001023,的范围为200001024-200003071等。如果你只有一个硬件线程,你的任务将被拆分为两个块,因此将检查3072。如果您有8个硬件线程,您的任务将被拆分为9个块,并将检查46080个数字。只有在所有块被处理后,并行计算才会停止。分裂任务到这样一个大块的启发式在你的情况下不工作,但你会看到,该地区周围的素数出现一次数千个数字的性能提升。



您的特定场景可能在内部优化(即,如果第一个线程发现已经达到限制条件,则停止计算)。随时向Java错误追踪程式回报错误。






更新 Stream API我得出结论,当前行为是一个错误,提出问题,并发布了补丁。这很可能是修补程序将接受JDK9,甚至可能回溯到JDK 8u分支。使用我的补丁,并行版本仍然不能提高性能,但至少它的工作时间与顺序流工作时间相当。


I'm creating an infinite stream of Integers starting at 200 Million, filter this stream using a naive primality test implementation to generate load and limit the result to 10.

Predicate<Integer> isPrime = new Predicate<Integer>() {
    @Override
    public boolean test(Integer n) {
        for (int i = 2; i < n; i++) {
            if (n % i == 0) return false;   
        }
        return true;
    }
};

Stream.iterate(200_000_000, n -> ++n)
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

This works as expected.

Now, if I add a call to parallel() before filtering, nothing is produced and the processing does not complete.

Stream.iterate(200_000_000, n -> ++n)
    .parallel()
    .filter(isPrime)
    .limit(10)
    .forEach(i -> System.out.print(i + " "));

Can someone point me in the right direction of what's happening here?

EDIT: I am not looking for better primality test implementations (it is intended to be a long running implementation) but for an explanation of the negative impact of using a parallel stream.

解决方案

Processing actually completes, though may take quite a long time depending on number of hardware threads on your machine. API documentation about limit warns that it might be slow for parallel streams.

Actually the parallel stream first splits the computation to the several parts according to the available parallelism level, performs a computation for every part, then join the results together. How many parts do you have in your task? One per common FJP thread (=Runtime.getRuntime().availableProcessors()) plus (sometimes?) one for current thread if it's not in FJP. You can control it adding

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4");

Practically for your task the lower number you set, the faster it will compute.

How to split the unlimited task? You particular task is handled by IteratorSpliterator which trySplit method creates chunks of ever-increasing size starting from 1024. You may try by yourself:

Spliterator<Integer> spliterator = Stream.iterate(200_000_000, n -> ++n).spliterator();
Spliterator[] spliterators = new Spliterator[10];
for(int i=0; i<spliterators.length; i++) {
    spliterators[i] = spliterator.trySplit();
}
for(int i=0; i<spliterators.length; i++) {
    System.out.print((i+1)+": ");
    spliterators[i].tryAdvance(System.out::println);
}       

So the first chunk handles numbers of range 200000000-200001023, the second handles numbers of range 200001024-200003071 and so on. If you have only 1 hardware thread, your task will be split to two chunks, so 3072 will be checked. If you have 8 hardware threads, your task will be split to 9 chunks and 46080 numbers will be checked. Only after all the chunks are processed the parallel computation will stop. The heuristic of splitting the task to such a big chunks doesn't work good in your case, but you would see the performance boost had the prime numbers around that region appear once in several thousand numbers.

Probably your particular scenario could be optimized internally (i.e. stop the computation if the first thread found that limit condition is already achieved). Feel free to report a bug to Java bug tracker.


Update after digging more inside the Stream API I concluded that current behavior is a bug, raised an issue and posted a patch. It's likely that the patch will be accepted for JDK9 and probably even backported to JDK 8u branch. With my patch the parallel version still does not improve the performance, but at least its working time is comparable to sequential stream working time.

这篇关于为什么通过原始数据在无限数量的数字流中进行过滤,如果并行处理则永远过滤?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆