如何在RxJava 2中发生错误后继续处理? [英] How to continue processing after an error happens in RxJava 2?

查看:711
本文介绍了如何在RxJava 2中发生错误后继续处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 PublishSubject 和一个 Subscriber 我用来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我怎么能这样做?我尝试过这样的事情:

I have a PublishSubject and a Subscriber which I use to process a (possibly) infinite stream of preprocessed data. The problem is that some of the elements might contain some error. I'd like to ignore them and continue processing. How can I do so? I've tried something like this:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

My问题是没有任何错误处理方法可以帮助我:

My problem is that none of the error handling methods help me out here:


onErrorResumeNext() - 指示Observable在遇到错误时发出一系列
项目

onErrorResumeNext() — instructs an Observable to emit a sequence of items if it encounters an error

onErrorReturn() - 指示
Observable在遇到错误时发出特定项目

onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error

onExceptionResumeNext() - 指示Observable继续
发射物品af它遇到异常(但不是另一种
各种抛出)

onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)

retry() - 如果一个来源Observable发出
错误,重新订阅它,希望它能在没有
错误的情况下完成

retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

retryWhen( ) - 如果源Observable发出错误,将该
错误传递给另一个Observable以确定是否重新订阅

retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

我尝试了 retry(),但它会在错误无限期后挂起我的进程。

I tried retry() for example but it hangs my process after the error indefinitely.

我也试过 onErrorResumeNext()但它没有按预期工作:

I also tried onErrorResumeNext() but it does not work as expected:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

这只打印 foo bar

推荐答案

如果你想在错误后继续处理,这意味着你的错误就像你的 String s,应该通过 onNext 。为确保在这种情况下的类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如, io.reactivex.Notification< T> 在RxJava 2中可用:

If you want to continue processing after an error, it means your error is a value just like your Strings and should go through onNext. To ensure type safety in this case, you should use some form of wrapper that can either take a regular value or an error; for example, the io.reactivex.Notification<T> is available in RxJava 2:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));

这篇关于如何在RxJava 2中发生错误后继续处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆