生成无限并行流 [英] Generate infinite parallel stream
问题描述
我有一个函数,我将返回无限的并行流(是的,在这种情况下它要快得多)生成的结果.所以很明显(或没有)我用过
Hi, I have a function where i going to return infinite stream of parallel (yes, it is much faster in that case) generated results. So obviously (or not) i used
Stream<Something> stream = Stream.generate(this::myGenerator).parallel()
它起作用了,但是...当我想限制结果时,它就没有了(当流是顺序的时,一切都很好).我的意思是,当我做类似的事情时,它会产生结果
It works, however ... it doesn't when i want to limit the result (everything is fine when the stream is sequential). I mean, it creates results when i make something like
stream.peek(System.out::println).limit(2).collect(Collectors.toList())
,但是即使peek
输出产生了10个以上的元素,collect
仍未最终确定(生成速度很慢,因此这10个元素甚至需要一分钟的时间)...这就是简单的示例.实际上,限制这些结果是一个 future ,因为主要的期望是直到最近的结果好于用户,直到用户终止该过程为止(其他情况是首先返回我可以抛出异常的结果,如果什么也没有发生的话)否则会帮助[findFirst
没有帮助,即使我在控制台上有更多元素并且在大约30秒内没有更多结果].
but even when peek
output produces more than 10 elements, collect
is still not finallized (generating is slow so those 10 can took even a minute)... and that is easy example. Actually, limiting those results is a future due the main expectation is to get only better than recent results until the user will kill the process (other case is to return first what i can make with throwing exception if nothing else will help [findFirst
didn't, even when i had more elements on the console and no more results for about 30 sec]).
该如何复制?我的想法也是使用RxJava,还有另一个问题-如何使用该工具(或其他工具)获得相似的结果.
how to copy with that? My idea was also to use RxJava, and there is another question - how to achieve similar result with that tool (or other).
public Stream<Solution> generateSolutions() {
final Solution initialSolution = initialSolutionMaker.findSolution();
return Stream.concat(
Stream.of(initialSolution),
Stream.generate(continuousSolutionMaker::findSolution)
).parallel();
}
new Solver(instance).generateSolutions()
.map(Solution::getPurpose)
.peek(System.out::println)
.limit(5).collect(Collectors.toList());
findSolution
的实现并不重要.
它有一些副作用,例如添加到解决方案回购中(单例,sych等.),但仅此而已.
Implementation of findSolution
is not important.
It has some side effect like adding to solutions repo (singleton, sych etc..), but nothing more.
推荐答案
如已链接的答案所述,高效并行流的关键点是使用已经具有固有大小的流源,而不是使用大小不限甚至无限的流,并在其上应用limit
.注入大小完全不适用于当前实施,而确保已知大小不会丢失则容易得多.即使无法保留确切的尺寸(例如应用filter
时),该尺寸仍将作为估算尺寸.
As explained in the already linked answer, the key point to an efficient parallel stream is to use a stream source already having an intrinsic size instead of using an unsized or even infinite stream and apply a limit
on it. Injecting a size doesn’t work with the current implementation at all, while ensuring that a known size doesn’t get lost is much easier. Even if the exact size can’t be retained, like when applying a filter
, the size still will be carried as an estimate size.
所以不是
Stream.generate(this::myGenerator).parallel()
.peek(System.out::println)
.limit(2)
.collect(Collectors.toList())
只需使用
IntStream.range(0, /* limit */ 2).unordered().parallel()
.mapToObj(unused -> this.myGenerator())
.peek(System.out::println)
.collect(Collectors.toList())
或者,更接近示例代码
public Stream<Solution> generateSolutions(int limit) {
final Solution initialSolution = initialSolutionMaker.findSolution();
return Stream.concat(
Stream.of(initialSolution),
IntStream.range(1, limit).unordered().parallel()
.mapToObj(unused -> continuousSolutionMaker.findSolution())
);
}
new Solver(instance).generateSolutions(5)
.map(Solution::getPurpose)
.peek(System.out::println)
.collect(Collectors.toList());
这篇关于生成无限并行流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!