Java 8流和RxJava可观察量之间的区别 [英] Difference between Java 8 streams and RxJava observables

查看:269
本文介绍了Java 8流和RxJava可观察量之间的区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Java 8流是否类似于RxJava observables?

Are Java 8 streams similar to RxJava observables?

Java 8流定义:

Java 8 stream definition:


新的 java.util.stream 包中的类为
提供了一个Stream API,支持对元素流进行功能样式的操作。

Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements.


推荐答案

TL; DR :所有序列/流处理库提供的非常相似管道建​​设的API。不同之处在于用于处理多线程和管道组合的API。

TL;DR: All sequence/stream processing libs are offering very similar API for pipeline building. The differences are in API for handling multi-threading and composition of pipelines.

RxJava与Stream完全不同。在所有JDK事物中,最接近rx.Observable的可能是 java.util.stream.Collector Stream + CompletableFuture组合(其代价是处理额外的monad层,即必须处理之间的转换) Stream< CompletableFuture< T>> CompletableFuture< Stream< T>> )。

RxJava is quite different from Stream. Of all JDK things, the closest to rx.Observable is perhaps java.util.stream.Collector Stream + CompletableFuture combo (which comes at a cost of dealing with extra monad layer, i. e. having to handle conversion between Stream<CompletableFuture<T>> and CompletableFuture<Stream<T>>).

Observable和Stream之间存在显着差异:

There are significant differences between Observable and Stream:


  • Streams是基于拉式的,Observables是推的基。这可能听起来过于抽象,但它具有非常具体的重要后果。

  • Stream只能使用一次,Observable可以订阅多次

  • Stream#parallel()将序列拆分为分区, Observable#subscribeOn()可观察#observeOn()不要;使用Observable模拟 Stream#parallel()行为很棘手,它曾经有 .parallel()方法但是这个方法造成了很多混乱, .parallel()支持被移动到github上的独立存储库RxJavaParallel。有关详细信息,请参阅其他答案

  • Stream# parallel()不允许指定要使用的线程池,这与大多数接受可选Scheduler的RxJava方法不同。由于 JVM中的所有流实例使用相同的fork-join池,因此添加 .parallel()可能会意外地影响您的另一个模块中的行为程序

  • Streams缺少与时间相关的操作,如 Observable #interset() Observable#window() 和许多其他人;这主要是因为Streams是基于拉式的

  • 与RxJava相比,Streams提供了一组受限制的操作。例如。 Streams缺少截止操作( takeWhile() takeUntil());解决方法使用 Stream#anyMatch()是有限的:它是终端操作,因此每个流不能多次使用它

  • 从JDK 8开始,没有Stream#zip操作,这有时非常有用

  • Streams很难自己构造,Observable可以通过多种方式构建 编辑:如评论中所述,有一些方法可以构建Stream。但是,由于没有非终端短路,你不能e。 G。容易生成文件中的行流(JDK提供开箱即用的文件#行和BufferedReader#行,其他类似的方案可以通过从Iterator构造Stream来管理)。

  • 可观察的优惠资源管理设施( Observable#using());你可以用它包装IO流或互斥,并确保用户不会忘记释放资源 - 它将在订阅终止时自动处理; Stream有 onClose(Runnable)方法,但您必须手动或通过try-with-resources调用它。 E. g。你必须记住,文件#lines()必须包含在try-with-resources块中。

  • Observable一直同步(I实际上没有检查Streams是否也是如此)。这使您不必考虑基本操作是否是线程安全的(答案总是是,除非有错误),但无论您的代码是否需要,都会出现与并发相关的开销。

  • Streams are pull-based, Observables are push-based. This may sound too abstract, but it has significant consequences that are very concrete.
  • Stream can only be used once, Observable can be subscribed to many times
  • Stream#parallel() splits sequence into partitions, Observable#subscribeOn() and Observable#observeOn() do not; it is tricky to emulate Stream#parallel() behavior with Observable, it once had .parallel() method but this method caused so much confusion that .parallel() support was moved to separate repository on github, RxJavaParallel. More details are in another answer.
  • Stream#parallel() does not allow to specify a thread pool to use, unlike most of RxJava methods accepting optional Scheduler. Since all stream instances in a JVM use the same fork-join pool, adding .parallel() can accidentally affect the behaviour in another module of your program
  • Streams are lacking time-related operations like Observable#interval(), Observable#window() and many others; this is mostly because Streams are pull-based
  • Streams offer restricted set of operations in comparison with RxJava. E.g. Streams are lacking cut-off operations (takeWhile(), takeUntil()); workaround using Stream#anyMatch() is limited: it is terminal operation, so you can't use it more than once per stream
  • As of JDK 8, there's no Stream#zip operation, which is quite useful sometimes
  • Streams are hard to construct by yourself, Observable can be constructed by many ways As noted in comments, there are ways to construct Stream. However, since there's no non-terminal short-circuiting, you can't e. g. easily generate Stream of lines in file (JDK provides Files#lines and BufferedReader#lines out of the box though, and other similar scenarios can be managed by constructing Stream from Iterator).
  • Observable offers resource management facility (Observable#using()); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable) method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines() must be enclosed in try-with-resources block.
  • Observables are synchronized all the way through (I didn't actually check whether the same is true for Streams). This spares you from thinking whether basic operations are thread-safe (the answer is always 'yes', unless there's a bug), but the concurrency-related overhead will be there, no matter if your code need it or not.

综述:RxJava与Streams显着不同。 Real RxJava替代品是 ReactiveStreams 的其他实现,例如。 G。 Akka的相关部分。

Round-up: RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.

更新。对于 Stream#parallel 使用非默认的fork-join池有诀窍,请参阅 Java 8并行流中的自定义线程池

Update. There's trick to use non-default fork-join pool for Stream#parallel, see Custom thread pool in Java 8 parallel stream

更新。以上所有内容均基于RxJava 1.x的经验。现在 RxJava 2.x在这里,这个答案可能是已过期。

Update. All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.

这篇关于Java 8流和RxJava可观察量之间的区别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆