如何在RxJava 2中发生错误后继续处理? [英] How to continue processing after an error happens in RxJava 2?
问题描述
我有一个 PublishSubject
和一个 Subscriber
我用来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我怎么能这样做?我尝试过这样的事情:
I have a PublishSubject
and a Subscriber
which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
My问题是没有任何错误处理方法可以帮助我:
My problem is that none of the error handling methods help me out here:
onErrorResumeNext()
- 指示Observable在遇到错误时发出一系列
项目
onErrorResumeNext()
— instructs an Observable to emit a sequence of items if it encounters an error
onErrorReturn()
- 指示
Observable在遇到错误时发出特定项目
onErrorReturn( )
— instructs an
Observable to emit a particular item when it encounters an error
onExceptionResumeNext()
- 指示Observable继续
发射物品af它遇到异常(但不是另一种
各种抛出)
onExceptionResumeNext( )
— instructs an Observable to continue
emitting items after it encounters an exception (but not another
variety of throwable)
retry()
- 如果一个来源Observable发出
错误,重新订阅它,希望它能在没有
错误的情况下完成
retry( )
— if a source Observable emits an
error, resubscribe to it in the hopes that it will complete without
error
retryWhen( )
- 如果源Observable发出错误,将该
错误传递给另一个Observable以确定是否重新订阅
源
retryWhen( )
— if a source Observable emits an error, pass that
error to another Observable to determine whether to resubscribe to the
source
我尝试了 retry()
,但它会在错误无限期后挂起我的进程。
I tried retry()
for example but it hangs my process after the error indefinitely.
我也试过 onErrorResumeNext()
但它没有按预期工作:
I also tried onErrorResumeNext()
but it does not work as expected:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
这只打印 foo
和 bar
。
推荐答案
如果你想在错误后继续处理,这意味着你的错误就像你的 String
s,应该通过 onNext
。为确保在这种情况下的类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如, io.reactivex.Notification< T>
在RxJava 2中可用:
If you want to continue processing after an error, it means your error is a value just like your String
s and should go through onNext
. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T>
is available in RxJava 2:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
这篇关于如何在RxJava 2中发生错误后继续处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!