RxJava 中的超时 [英] Timeout in RxJava
问题描述
我是 RxJava 的新手,我需要以异步方式使用 Observable 特性.
I'm new to RxJava, and I need to use the Observable feature in an asynchronous way.
我还需要使用超时:在我的例子中,我希望每个进程在 1 秒或更短的时间内结束.
I also need to use timeouts : in my exemple, I want every process to end in 1 second or less.
这是我现在所做的:
public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io());
obs2.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if("CCCCC".equals(s)){
try {
Thread.sleep(3200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(s + " " + new Date() +" "+Thread.currentThread().getName());
}
});
}
public static void main(final String[] args) throws InterruptedException, IOException {
hello("AAAAA");
hello("CCCCC");
hello("BBBBBB");
System.in.read();
}
结果:
AAAAA Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-4
CCCCC Thu Oct 05 09:43:49 CEST 2017 RxIoScheduler-3
我实际上希望从名为RxIoScheduler-3"的线程获得 TimeoutException,因为它已经休眠了 3 秒.
I was actually expecting to get a TimeoutException from the thread named "RxIoScheduler-3" since it has been sleeping for 3 seconds.
我的代码和我在 RxJava 中的超时方法有什么问题?
What's wrong with my code and my approach of timeouts in RxJava?
谢谢你帮助我.
推荐答案
根据 文档 timeout
运算符将:
镜像源 Observable,但如果经过特定时间段没有发出任何项目,则发出错误通知
mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
因此,如果发出事件有延迟,但您在消耗事件中延迟并且不会导致超时,则认为发生了超时.
So, a timeout is deemed to have occurred if there is a delay in emitting events but you have put a delay in consuming events and that will not cause a timeout.
如果您重新编写代码以在发射期间暂停,则会发生超时.例如:
If you rework your code to pause during emission then a timeout will occur. For example:
public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.fromCallable(() -> {
if ("CCCCC".equals(name)) {
// pause for 150ms before emitting "CCCCC"
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return name;
}
).timeout(100, MILLISECONDS) // timeout if there is a pause in emission of more than 100ms
.subscribeOn(Schedulers.io());
obs2.subscribe(s -> System.out.println(s + " " + new Date() + " " + Thread.currentThread().getName()),
throwable -> System.err.println(throwable.getClass().getSimpleName() + " " + new Date() + " " + Thread.currentThread().getName()));
}
使用上述形式的 hello()
,您将获得以下输出写入控制台:
Using the above form of hello()
you'll get the following output written to console:
AAAAA Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-4
TimeoutException Thu Oct 05 10:10:33 IST 2017 RxComputationScheduler-1
这篇关于RxJava 中的超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!