Java 8 流和 RxJava 可观察对象之间的区别 [英] Difference between Java 8 streams and RxJava observables

查看:28
本文介绍了Java 8 流和 RxJava 可观察对象之间的区别的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Java 8 流是否类似于 RxJava 可观察对象?

Are Java 8 streams similar to RxJava observables?

Java 8 流定义:

Java 8 stream definition:

java.util.stream 包中的类提供了 Stream API支持对元素流进行函数式操作.

Classes in the new java.util.stream package provide a Stream API to support functional-style operations on streams of elements.

推荐答案

简答

所有序列/流处理库都为管道构建提供了非常相似的 API.不同之处在于用于处理多线程和管道组合的 API.

Short answer

All sequence/stream processing libs are offering very similar API for pipeline building. The differences are in API for handling multi-threading and composition of pipelines.

RxJava 与 Stream 完全不同.在所有 JDK 中,最接近 rx.Observable 的可能是 java.util.stream.Collector Stream +CompletableFuture 组合(以处理额外的 monad 层为代价,即必须处理 Stream<CompletableFuture<T>>CompletableFuture<Stream<T>>).

RxJava is quite different from Stream. Of all JDK things, the closest to rx.Observable is perhaps java.util.stream.Collector Stream + CompletableFuture combo (which comes at a cost of dealing with extra monad layer, i. e. having to handle conversion between Stream<CompletableFuture<T>> and CompletableFuture<Stream<T>>).

Observable 和 Stream 有很大区别:

There are significant differences between Observable and Stream:

  • 流是基于拉的,Observable 是基于推的.这听起来可能过于抽象,但它会产生非常具体的重大后果.
  • Stream只能使用一次,Observable可以订阅多次.
  • Stream#parallel() 将序列拆分为分区,Observable#subscribeOn()Observable#observeOn() 不会;用 Observable 模拟 Stream#parallel() 行为是很棘手的,它曾经有 .parallel() 方法,但是这种方法引起了很多混乱,以至于 .parallel() 支持已移至单独的存储库:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava.更多详细信息在另一个答案中.
  • Stream#parallel() 不允许指定要使用的线程池,这与大多数接受可选调度程序的 RxJava 方法不同.由于 JVM 中的所有流实例使用相同的 fork-join 池,因此添加 .parallel() 可能会意外影响程序另一个模块中的行为.
  • 流缺少与时间相关的操作,例如 Observable#interval()Observable#window() 等;这主要是因为 Streams 是基于拉取的,上游无法控制何时向下游发出下一个元素.
  • 与 RxJava 相比,Streams 提供了一组受限的操作.例如.流缺少截止操作(takeWhile()takeUntil());使用 Stream#anyMatch() 的解决方法是有限的:它是终端操作,因此每个流不能多次使用它
  • 从 JDK 8 开始,没有 Stream#zip() 操作,这有时非常有用.
  • Streams 很难自己构建,Observable 可以通过多种方式构建 如评论中所述,有多种方式可以构建 Stream.但是,由于没有非终端短路,你不能 e.G.在文件中轻松生成行流(JDK 提供了 Files#lines()BufferedReader#lines() 开箱即用,其他类似的场景可以通过构造来自迭代器的流).
  • Observable 提供资源管理工具 (Observable#using());您可以用它包装 IO 流或互斥锁,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;Stream 具有 onClose(Runnable) 方法,但您必须手动调用它或通过 try-with-resources 调用它.例如.您必须记住 Files#lines() 必须 包含在 try-with-resources 块中.
  • Observables 一直是同步的(我实际上并没有检查 Streams 是否同样如此).这让您无需考虑基本操作是否是线程安全的(答案始终是是",除非存在错误),但无论您的代码是否需要,都会存在与并发相关的开销.
  • Streams are pull-based, Observables are push-based. This may sound too abstract, but it has significant consequences that are very concrete.
  • Stream can only be used once, Observable can be subscribed to many times.
  • Stream#parallel() splits sequence into partitions, Observable#subscribeOn() and Observable#observeOn() do not; it is tricky to emulate Stream#parallel() behavior with Observable, it once had .parallel() method but this method caused so much confusion that .parallel() support was moved to separate repository: ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava. More details are in another answer.
  • Stream#parallel() does not allow to specify a thread pool to use, unlike most of RxJava methods accepting optional Scheduler. Since all stream instances in a JVM use the same fork-join pool, adding .parallel() can accidentally affect the behaviour in another module of your program.
  • Streams are lacking time-related operations like Observable#interval(), Observable#window() and many others; this is mostly because Streams are pull-based, and upstream has no control on when to emit next element downstream.
  • Streams offer restricted set of operations in comparison with RxJava. E.g. Streams are lacking cut-off operations (takeWhile(), takeUntil()); workaround using Stream#anyMatch() is limited: it is terminal operation, so you can't use it more than once per stream
  • As of JDK 8, there's no Stream#zip() operation, which is quite useful sometimes.
  • Streams are hard to construct by yourself, Observable can be constructed by many ways As noted in comments, there are ways to construct Stream. However, since there's no non-terminal short-circuiting, you can't e. g. easily generate Stream of lines in file (JDK provides Files#lines() and BufferedReader#lines() out of the box though, and other similar scenarios can be managed by constructing Stream from Iterator).
  • Observable offers resource management facility (Observable#using()); you can wrap IO stream or mutex with it and be sure that the user will not forget to free the resource - it will be disposed automatically on subscription termination; Stream has onClose(Runnable) method, but you have to call it manually or via try-with-resources. E. g. you have to keep in mind that Files#lines() must be enclosed in try-with-resources block.
  • Observables are synchronized all the way through (I didn't actually check whether the same is true for Streams). This spares you from thinking whether basic operations are thread-safe (the answer is always 'yes', unless there's a bug), but the concurrency-related overhead will be there, no matter if your code need it or not.

RxJava 与 Streams 显着不同.真正的 RxJava 替代品是 ReactiveStreams 的其他实现,例如.G.Akka 的相关部分.

RxJava differs from Streams significantly. Real RxJava alternatives are other implementations of ReactiveStreams, e. g. relevant part of Akka.

Stream#parallel 有使用非默认分叉连接池的技巧,请参阅 Java 8 并行流中的自定义线程池.

There's trick to use non-default fork-join pool for Stream#parallel, see Custom thread pool in Java 8 parallel stream.

以上所有内容均基于使用 RxJava 1.x 的经验.现在 RxJava 2.x 在这里,这个答案可能已经过时了.

All of the above is based on the experience with RxJava 1.x. Now that RxJava 2.x is here, this answer may be out-of-date.

这篇关于Java 8 流和 RxJava 可观察对象之间的区别的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆