使用RxJava生成无限序列的自然数 [英] Generate infinite sequence of Natural numbers using RxJava

查看:172
本文介绍了使用RxJava生成无限序列的自然数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用RxJava编写一个简单的程序来生成无限的自然数序列。所以,我发现了两种使用 Observable.timer() Observable.interval()生成数字序列的方法。我不确定这些功能是否是解决此问题的正确方法。我期待像Java 8中的一个简单的函数来生成无限的自然数。

I am trying to write a simple program using RxJava to generate an infinite sequence of natural numbers. So, far I have found two ways to generate sequence of numbers using Observable.timer() and Observable.interval(). I am not sure if these functions are the right way to approach this problem. I was expecting a simple function like one we have in Java 8 to generate infinite natural numbers.


IntStream.iterate(1,value - > value +1)。forEach(System.out :: println);

IntStream.iterate(1, value -> value +1).forEach(System.out::println);

我尝试使用带有Observable的IntStream,但这不能正常工作。它只向第一个用户发送无限的数字流。如何正确生成无限自然数序列?

I tried using IntStream with Observable but that does not work correctly. It sends infinite stream of numbers only to first subscriber. How can I correctly generate infinite natural number sequence?

import rx.Observable;
import rx.functions.Action1;

import java.util.stream.IntStream;

public class NaturalNumbers {

    public static void main(String[] args) {
        Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
            IntStream stream = IntStream.iterate(1, val -> val + 1);
            stream.forEach(naturalNumber -> subscriber.onNext(naturalNumber));
        });

        Action1<Integer> first = naturalNumber -> System.out.println("First got " + naturalNumber);
        Action1<Integer> second = naturalNumber -> System.out.println("Second got " + naturalNumber);
        Action1<Integer> third = naturalNumber -> System.out.println("Third got " + naturalNumber);
        naturalNumbers.subscribe(first);
        naturalNumbers.subscribe(second);
        naturalNumbers.subscribe(third);

    }
}


推荐答案

问题是你实现的on naturalNumbers.subscribe(first); OnSubscribe 被调用并且你在无限流中执行 forEach ,这就是为什么你的程序永远不会终止。

The problem is that the on naturalNumbers.subscribe(first);, the OnSubscribe you implemented is being called and you are doing a forEach over an infinite stream, hence why your program never terminates.

你可以用一种方法处理它是在不同的线程上异步订阅它们。为了轻松查看结果,我必须在流处理中引入睡眠:

One way you could deal with it is to asynchronously subscribe them on a different thread. To easily see the results I had to introduce a sleep into the Stream processing:

Observable<Integer> naturalNumbers = Observable.<Integer>create(subscriber -> {
    IntStream stream = IntStream.iterate(1, i -> i + 1);
    stream.peek(i -> {
        try {
            // Added to visibly see printing
            Thread.sleep(50);
        } catch (InterruptedException e) {
        }
    }).forEach(subscriber::onNext);
});

final Subscription subscribe1 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(first);
final Subscription subscribe2 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(second);
final Subscription subscribe3 = naturalNumbers
    .subscribeOn(Schedulers.newThread())
    .subscribe(third);

Thread.sleep(1000);

System.out.println("Unsubscribing");
subscribe1.unsubscribe();
subscribe2.unsubscribe();
subscribe3.unsubscribe();
Thread.sleep(1000);
System.out.println("Stopping");

这篇关于使用RxJava生成无限序列的自然数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆