使用RxJava生成无限序列的自然数 [英] Generate infinite sequence of Natural numbers using RxJava
问题描述
我正在尝试使用RxJava编写一个简单的程序来生成无限的自然数序列。所以,我发现了两种使用 Observable.timer()和 Observable.interval()生成数字序列的方法。我不确定这些功能是否是解决此问题的正确方法。我期待像Java 8中的一个简单的函数来生成无限的自然数。
I am trying to write a simple program using RxJava to generate an infinite sequence of natural numbers. So, far I have found two ways to generate sequence of numbers using Observable.timer() and Observable.interval(). I am not sure if these functions are the right way to approach this problem. I was expecting a simple function like one we have in Java 8 to generate infinite natural numbers.
IntStream.iterate(1,value - > value +1)。forEach(System.out :: println);
IntStream.iterate(1, value -> value +1).forEach(System.out::println);
我尝试使用带有Observable的IntStream,但这不能正常工作。它只向第一个用户发送无限的数字流。如何正确生成无限自然数序列?
I tried using IntStream with Observable but that does not work correctly. It sends infinite stream of numbers only to first subscriber. How can I correctly generate infinite natural number sequence?
import rx.Observable;
import rx.functions.Action1;
import java.util.stream.IntStream;
public class NaturalNumbers {
public static void main(String[] args) {
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, val -> val + 1);
stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
});
Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
naturalNumbers.subscribe(first);
naturalNumbers.subscribe(second);
naturalNumbers.subscribe(third);
}
}
推荐答案
问题是你实现的on naturalNumbers.subscribe(first);
, OnSubscribe
被调用并且你在无限流中执行 forEach
,这就是为什么你的程序永远不会终止。
The problem is that the on naturalNumbers.subscribe(first);
, the OnSubscribe
you implemented is being called and you are doing a forEach
over an infinite stream, hence why your program never terminates.
你可以用一种方法处理它是在不同的线程上异步订阅它们。为了轻松查看结果,我必须在流处理中引入睡眠:
One way you could deal with it is to asynchronously subscribe them on a different thread. To easily see the results I had to introduce a sleep into the Stream processing:
Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
IntStream stream = IntStream.iterate(1, i -> i + 1);
stream.peek(i -> {
try {
// Added to visibly see printing
Thread.sleep(50);
} catch (InterruptedException e) {
}
}).forEach(subscriber::onNext);
});
final Subscription subscribe1 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(first);
final Subscription subscribe2 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(second);
final Subscription subscribe3 = naturalNumbers
.subscribeOn(Schedulers.newThread())
.subscribe(third);
Thread.sleep(1000);
System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");
这篇关于使用RxJava生成无限序列的自然数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!