生成无限并行流 [英] Generate infinite parallel stream

查看:158
本文介绍了生成无限并行流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个函数,我将返回无限的并行流(是的,在这种情况下它要快得多)生成的结果.所以很明显(或没有)我用过

Hi, I have a function where i going to return infinite stream of parallel (yes, it is much faster in that case) generated results. So obviously (or not) i used

Stream<Something> stream = Stream.generate(this::myGenerator).parallel()

它起作用了,但是...当我想限制结果时,它就没有了(当流是顺序的时,一切都很好).我的意思是,当我做类似的事情时,它会产生结果

It works, however ... it doesn't when i want to limit the result (everything is fine when the stream is sequential). I mean, it creates results when i make something like

stream.peek(System.out::println).limit(2).collect(Collectors.toList())

,但是即使peek输出产生了10个以上的元素,collect仍未最终确定(生成速度很慢,因此这10个元素甚至需要一分钟的时间)...这就是简单的示例.实际上,限制这些结果是一个 future ,因为主要的期望是直到最近的结果好于用户,直到用户终止该过程为止(其他情况是首先返回我可以抛出异常的结果,如果什么也没有发生的话)否则会帮助[findFirst没有帮助,即使我在控制台上有更多元素并且在大约30秒内没有更多结果].

but even when peek output produces more than 10 elements, collect is still not finallized (generating is slow so those 10 can took even a minute)... and that is easy example. Actually, limiting those results is a future due the main expectation is to get only better than recent results until the user will kill the process (other case is to return first what i can make with throwing exception if nothing else will help [findFirst didn't, even when i had more elements on the console and no more results for about 30 sec]).

该如何复制?我的想法也是使用RxJava,还有另一个问题-如何使用该工具(或其他工具)获得相似的结果.

how to copy with that? My idea was also to use RxJava, and there is another question - how to achieve similar result with that tool (or other).

public Stream<Solution> generateSolutions() {
     final Solution initialSolution = initialSolutionMaker.findSolution();
     return Stream.concat(
          Stream.of(initialSolution),
          Stream.generate(continuousSolutionMaker::findSolution)
    ).parallel();
}

new Solver(instance).generateSolutions()
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .limit(5).collect(Collectors.toList());

findSolution的实现并不重要. 它有一些副作用,例如添加到解决方案回购中(单例,sych等.),但仅此而已.

Implementation of findSolution is not important. It has some side effect like adding to solutions repo (singleton, sych etc..), but nothing more.

推荐答案

已链接的答案所述,高效并行流的关键点是使用已经具有固有大小的流源,而不是使用大小不限甚至无限的流,并在其上应用limit.注入大小完全不适用于当前实施,而确保已知大小不会丢失则容易得多.即使无法保留确切的尺寸(例如应用filter时),该尺寸仍将作为估算尺寸.

As explained in the already linked answer, the key point to an efficient parallel stream is to use a stream source already having an intrinsic size instead of using an unsized or even infinite stream and apply a limit on it. Injecting a size doesn’t work with the current implementation at all, while ensuring that a known size doesn’t get lost is much easier. Even if the exact size can’t be retained, like when applying a filter, the size still will be carried as an estimate size.

所以不是

Stream.generate(this::myGenerator).parallel()
      .peek(System.out::println)
      .limit(2)
      .collect(Collectors.toList())

只需使用

IntStream.range(0, /* limit */ 2).unordered().parallel()
         .mapToObj(unused -> this.myGenerator())
         .peek(System.out::println)
         .collect(Collectors.toList())

或者,更接近示例代码

public Stream<Solution> generateSolutions(int limit) {
    final Solution initialSolution = initialSolutionMaker.findSolution();
    return Stream.concat(
         Stream.of(initialSolution),
         IntStream.range(1, limit).unordered().parallel()
               .mapToObj(unused -> continuousSolutionMaker.findSolution())
   );
}

new Solver(instance).generateSolutions(5)
    .map(Solution::getPurpose)
    .peek(System.out::println)
    .collect(Collectors.toList());

这篇关于生成无限并行流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆