Java中无限流的并行处理 [英] parallel processing with infinite stream in Java

查看:134
本文介绍了Java中无限流的并行处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为什么下面的代码不打印任何输出,而如果我们删除parallel,它会打印0,1?

Why the below code doesn't print any output whereas if we remove parallel, it prints 0, 1?

IntStream.iterate(0, i -> ( i + 1 ) % 2)
         .parallel()
         .distinct()
         .limit(10)
         .forEach(System.out::println);

虽然我知道理想的限制应该放在不同之前,但我的问题更多地与造成的差异有关通过添加并行处理。

Though I know ideally limit should be placed before distinct, but my question is more related with the difference caused by adding parallel processing.

推荐答案

真正原因是有序并行 .distinct()是完整的屏障操作,因为描述

The real cause is that ordered parallel .distinct() is the full barrier operation as described in documentation:


保持 distinct()的稳定性相对昂贵(要求操作充当完全屏障,具有大量缓冲开销),并且通常不需要稳定性。

Preserving stability for distinct() in parallel pipelines is relatively expensive (requires that the operation act as a full barrier, with substantial buffering overhead), and stability is often not needed.

全屏障操作意味着必须在下游开始之前执行所有上游操作。 Stream API中只有两个完整的屏障操作: .sorted()(每次)和 .distinct() (按顺序排列)。由于你有非短路无限流提供给 .distinct(),你最终会得到无限循环。通过契约 .distinct()不能以任何顺序向下游发射元素:它应该始终发出第一个重复元素。虽然理论上可以更好地实现并行排序 .distinct(),但实现起来要复杂得多。

The "full barrier operation" means that all the upstream operations must be performed before the downstream can start. There are only two full barrier operations in Stream API: .sorted() (every time) and .distinct() (in ordered parallel case). As you have non short-circuit infinite stream supplied to the .distinct() you end up with infinite loop. By contract .distinct() cannot just emit elements to the downstream in any order: it should always emit the first repeating element. While it's theoretically possible to implement parallel ordered .distinct() better, it would be much more complex implementation.

至于解决方案,@ user140547是正确的:在 .distinct()之前添加 .unordered()此开关 distinct()算法无序化(只使用共享 ConcurrentHashMap 来存储所有观察到的元素并将每个新元素发送到下游)。请注意,在 .distinct()之后添加 .unordered() 将无济于事。

As for solution, @user140547 is right: add .unordered() before .distinct() this switches distinct() algorithm to unordered one (which just uses shared ConcurrentHashMap to store all the observed elements and emits every new element to the downstream). Note that adding .unordered() after .distinct() will not help.

这篇关于Java中无限流的并行处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆