RxJava 出错后如何继续流式传输项目? [英] How to continue streaming items after error in RxJava?

查看:31
本文介绍了RxJava 出错后如何继续流式传输项目?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 RxJava 新手,我遇到了以下问题.假设我有项目序列并且有项目传播错误,我想忽略它并继续处理其他项目.

I'm RxJava newbie, and I've got following problem. Say I have sequence of items and on of items propagates error, I want to ignore it and to continue processing other items.

我有以下片段:

    Observable.from(Arrays.asList("1", "2", "3"))
            .map(x -> {
                if (x.equals("2")) {
                    throw new NullPointerException();
                }
                return x + "-";
            })
            .onExceptionResumeNext(Observable.empty())
            .subscribe(System.out::println);

我得到:1-

但我想得到: 1- , 3-

But I want to get: 1- , 3-

我该怎么做?

推荐答案

诀窍是将值包装起来,以某种方式将其转换为一个新的 observable 和平面图,如下例所示.flatMap 中的每个值现在都可以抛出异常并逐个值地处理它.因为 flatMap 中的子流只包含一个元素,所以在 onError 之后是否关闭 observable 并不重要.我使用 RxJava2 作为测试环境.

the trick is to wrap the value, which would be transformed somehow, into a new observable and flatmap over it as in the following example. Each value in the flatMap can now throw a exception and handle it value by value. Becuase the substream in flatMap consists only of one element, it does not matter if the observable will be closed after onError. I use RxJava2 as test-environment.

@Test
public void name() throws Exception {
    Observable<String> stringObservable = Observable.fromArray("1", "2", "3")
            .flatMap(x -> {
                return Observable.defer(() -> {
                    try {
                        if (x.equals("2")) {
                            throw new NullPointerException();
                        }
                        return Observable.just(x + "-");
                    } catch (Exception ex) {
                        return Observable.error(ex);
                    }
                }).map(s -> {
                    if (s.equals("3-")) {
                        throw new IllegalArgumentException();
                    }
                    return s + s;
                }).take(1)
                        .zipWith(Observable.just("X"), (s, s2) -> s + s2)
                        .onErrorResumeNext(Observable.empty());
            });

    TestObserver<String> test = stringObservable.test();

    test.assertResult("1-1-X");
}

这篇关于RxJava 出错后如何继续流式传输项目?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆