RxJava 中的超时 [英] Timeout in RxJava

查看:304
本文介绍了RxJava 中的超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 RxJava 的新手,我需要以异步方式使用 Observable 特性.

I'm new to RxJava, and I need to use the Observable feature in an asynchronous way.

我还需要使用超时:在我的例子中,我希望每个进程在 1 秒或更短的时间内结束.

I also need to use timeouts : in my exemple, I want every process to end in 1 second or less.

这是我现在所做的:

public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io());
    obs2.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            if("CCCCC".equals(s)){
                try {
                    Thread.sleep(3200);
                } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } 
        System.out.println(s + " " + new Date() +" "+Thread.currentThread().getName());
        }
    });
}

public static void main(final String[] args) throws InterruptedException, IOException {     
    hello("AAAAA");
    hello("CCCCC");
    hello("BBBBBB");
    System.in.read();
}

结果:

AAAAA Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-4
CCCCC Thu Oct 05 09:43:49 CEST 2017 RxIoScheduler-3

我实际上希望从名为RxIoScheduler-3"的线程获得 TimeoutException,因为它已经休眠了 3 秒.

I was actually expecting to get a TimeoutException from the thread named "RxIoScheduler-3" since it has been sleeping for 3 seconds.

我的代码和我在 RxJava 中的超时方法有什么问题?

What's wrong with my code and my approach of timeouts in RxJava?

谢谢你帮助我.

推荐答案

根据 文档 timeout 运算符将:

镜像源 Observable,但如果经过特定时间段没有发出任何项目,则发出错误通知

mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items

因此,如果发出事件有延迟,但您在消耗事件中延迟并且不会导致超时,则认为发生了超时.

So, a timeout is deemed to have occurred if there is a delay in emitting events but you have put a delay in consuming events and that will not cause a timeout.

如果您重新编写代码以在发射期间暂停,则会发生超时.例如:

If you rework your code to pause during emission then a timeout will occur. For example:

public static void hello(String name) throws IOException {
    Observable<String> obs2 = Observable.fromCallable(() -> {
                if ("CCCCC".equals(name)) {
                    // pause for 150ms before emitting "CCCCC"
                    try {
                        Thread.sleep(150);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return name;
            }
    ).timeout(100, MILLISECONDS) // timeout if there is a pause in emission of more than 100ms
            .subscribeOn(Schedulers.io());

    obs2.subscribe(s -> System.out.println(s + " " + new Date() + " " + Thread.currentThread().getName()),
            throwable -> System.err.println(throwable.getClass().getSimpleName() + " " + new Date() + " " + Thread.currentThread().getName()));
}

使用上述形式的 hello(),您将获得以下输出写入控制台:

Using the above form of hello() you'll get the following output written to console:

AAAAA Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-4
TimeoutException Thu Oct 05 10:10:33 IST 2017 RxComputationScheduler-1

这篇关于RxJava 中的超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆