RxJava Observable" Iteration"工作? [英] How does RxJava Observable "Iteration" work?

查看:138
本文介绍了RxJava Observable" Iteration"工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开始玩RxJava和ReactFX,我对此非常着迷。但是当我在试验时,我有几十个问题而且我一直在研究答案。



我正在观察的一件事(没有双关语意)当然是懒惰的执行。使用下面的探索性代码,我注意到在调用 merge.subscribe(pet - > System.out.println(pet))之前没有执行任何操作。但令我着迷的是,当我订阅第二个订阅者 merge.subscribe(pet - > System.out.println(Feed+ pet))时,它解雇了迭代再次。



我想要了解的是迭代的行为。它似乎不像Java 8 那样只能使用一次。它是否逐字逐句地通过每个 String 并将其作为该时刻的值发布?任何以前被解雇的订阅者之后的任何新订阅者都会收到这些项目,就像他们是新用户一样吗?

 公共类RxTest {

public static void main(String [] args){

Observable< String> dogs = Observable.from(ImmutableList.of(Dasher,Rex))
.filter(dog - > dog.matches(D. *));

Observable< String> cats = Observable.from(ImmutableList.of(Tabby,Grumpy Cat,Meowmers,Peanut));

Observable< String> ferrets = Observable.from(CompletableFuture.supplyAsync(() - >Harvey));

Observable< String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

merge.subscribe(pet - > System.out.println(pet));


merge.subscribe(pet - > System.out.println(Feed+ pet));

}
}


解决方案

Observable< T> 表示monad,链式操作,而不是操作本身的执行。它是描述性语言,而不是您习惯的命令。要执行操作,您需要 .subscribe()。每次订阅时,都会从头开始创建新的执行流。不要将流与线程混淆,因为订阅是同步执行的,除非您使用指定线程更改 .subscribeOn() .observeOn()。您将新元素链接到任何现有操作/ monad / Observable以添加新行为,如更改线程,过滤,累积,转换等。如果你的observable是一个昂贵的操作,你不想在每个订阅上重复,你可以使用 .cache()



要将任何异步/同步 Observable< T> 操作转换为同步内联的,请使用 .toBlocking()将其类型更改为 BlockingObservable< T> 。而不是 .subscribe()它包含用 .forEach()或强制执行每个结果的操作的新方法与 .first()



Observables是一个很好的工具,因为它们大部分是*确定性的(相同的输入总是产生相同的输出,除非你做错了什么),可重用(你可以将它们作为命令/策略模式的一部分发送)并且在大多数情况下忽略并发,因为它们不应该依赖共享状态(也就是说做错了)。如果你试图将一个基于可观察的库带入命令式语言,或者只是在一个Observable上执行一个100%有信心的操作,那么BlockingObservables是很好的。



<围绕这些原则构建你的应用程序是一种范式的改变,我无法真正涵盖这个答案。


*有违规行为与 Subject Observable.create()一样,需要与命令式框架集成。



I started to play around with RxJava and ReactFX, and I became pretty fascinated with it. But as I'm experimenting I have dozens of questions and I'm constantly researching for answers.

One thing I'm observing (no pun intended) is of course lazy execution. With my exploratory code below, I noticed nothing gets executed until the merge.subscribe(pet -> System.out.println(pet)) is called. But what fascinated me is when I subscribed a second subscriber merge.subscribe(pet -> System.out.println("Feed " + pet)), it fired the "iteration" again.

What I'm trying to understand is the behavior of the iteration. It does not seem to behave like a Java 8 stream that can only be used once. Is it literally going through each String one at a time and posting it as the value for that moment? And do any new subscribers following any previously fired subscribers receive those items as if they were new?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

解决方案

Observable<T> represents a monad, a chained operation, not the execution of the operation itself. It is descriptive language, rather than the imperative you're used to. To execute an operation, you .subscribe() to it. Every time you subscribe a new execution stream is created from scratch. Do not confuse streams with threads, as subscription are executed synchronously unless you specify a thread change with .subscribeOn() or .observeOn(). You chain new elements to any existing operation/monad/Observable to add new behaviour, like changing threads, filtering, accumulation, transformation, etc. In case your observable is an expensive operation you don't want to repeat on every subscription, you can prevent recreation by using .cache().

To make any asynchronous/synchronous Observable<T> operation into a synchronous inlined one, use .toBlocking() to change its type to BlockingObservable<T>. Instead of .subscribe() it contains new methods to execute operations on each result with .forEach(), or coerce with .first()

Observables are a good tool because they're mostly* deterministic (same inputs always yield same outputs unless you're doing something wrong), reusable (you can send them around as part of a command/policy pattern) and for the most part ignore concurrence because they should not rely on shared state (a.k.a. doing something wrong). BlockingObservables are good if you're trying to bring an observable-based library into imperative language, or just executing an operation on an Observable that you have 100% confidence it's well managed.

Architecting your application around these principles is a change of paradigm that I can't really cover on this answer.

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.

这篇关于RxJava Observable&quot; Iteration&quot;工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆