压缩 observable 的错误处理 [英] Error handling for zipped observables

查看:16
本文介绍了压缩 observable 的错误处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的用例是:我得到了一个永久链接列表,并且需要为每个永久链接发出两个 REST 请求以分部分获取它们的数据.当两个请求都返回时,我想将它们的信息合并在一起并对其进行处理(在这里 - 打印出来).我想使用 zip 运算符使用代码来完成它.这是我当前的代码(以及我正在使用的库的模拟):

My use case is: I get a list of permalinks, and need to issue two REST requests per permalink to get their data in parts. When both requests are back, I want to merge their info together and do something with it (here - print it out). I want to do it with code using the zip operator. Here is my current code (together with mocks for the library I'm using):

public class Main {

    public static void main(String[] args) {
        ContentManager cm = new ContentManager();

        Observable
                .from(cm.getPermalinks(10))
                .flatMap(permalink -> Observable.zip(
                        Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                        (dataContent, streamUrlContent) -> {
                            if (dataContent == null || streamUrlContent == null) {
                                System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
                                return Observable.empty();
                            }

                            return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                        }))
                .subscribe(System.out::println);
    }
}

class SubscribingRestCallback implements RestCallback {

    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        System.err.println(message);
        subscriber.onNext(null);
        subscriber.onCompleted();
    }
}

public class Content {

    public final String permalink;

    public final String logoUrl;

    public final String streamUrl;

    public Content(String permalink, String logoUrl, String streamUrl) {
        this.permalink = permalink;
        this.logoUrl = logoUrl;
        this.streamUrl = streamUrl;
    }

    @Override
    public String toString() {
        return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
    }
}

public interface RestCallback {

    void onSuccess(Content content);

    void onFailure(int code, String message);
}

class ContentManager {

    private final Random random = new Random();

    public List<String> getPermalinks(int n) {
        List<String> permalinks = new ArrayList<>(n);
        for (int i = 1; i <= n; ++i) {
            permalinks.add("perma_" + i);
        }

        return permalinks;
    }

    public void getDataByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, false);
    }

    public void getStreamByPermalink(String permalink, RestCallback callback) {
        getByPermalink(permalink, callback, true);
    }

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
        // simulate network latency and unordered results
        new Thread(() -> {
            try {
                Thread.sleep(random.nextInt(1000) + 200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            if (random.nextInt(100) < 95) {
                String logoUrl;
                String streamUrl;
                if (stream) {
                    logoUrl = null;
                    streamUrl = "http://" + permalink + "/stream";
                } else {
                    logoUrl = "http://" + permalink + "/logo.png";
                    streamUrl = null;
                }
                callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
            } else {
                callback.onFailure(-1, permalink + " data failure");
            }
        }).start();
    }
}

总的来说,它有效,但我不喜欢这个实现中的错误处理.基本上,REST 请求可能会失败,在这种情况下 onFailure 方法调用 subscriber.onNext(null) 以便 zip 方法总是有一些东西一起工作(一个请求可能失败了,但另一个可能没有,我不知道哪个失败了).然后,在 zip 函数中,我需要一个 if 来检查两者都不是 null(如果任何部分 Contents 是 null).

In general, it works, but I don't like the error handling in this implementation. Basically, the REST requests may fail, in which case the onFailure method calls subscriber.onNext(null) so that the zip method always has something to work with (one request may have failed, but the other one may have not, and I don't know which failed). Then, in the zip function I need an if which checks that both are not null (my code will crash if any of the partial Contents is null).

如果可能的话,我希望能够在某处使用 filter 运算符过滤掉 null.或者也许有比为失败情况发出 null 值更好的方法,但它仍然可以与 zip 功能一起使用?

I would like to be able to filter out the null using the filter operator somewhere, if possible. Or maybe there is a better way than emitting null values for the failure case but so that it still works with the zip function?

推荐答案

首先,通知 Subscriber 错误的正确方法是调用 subscriber.onError 方法:

First of all, the right way to notify a Subscriber about an error is to call subscriber.onError method:

class SubscribingRestCallback implements RestCallback {
    private final Subscriber<? super Content> subscriber;

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSuccess(Content content) {
        subscriber.onNext(content);
        subscriber.onCompleted();
    }

    @Override
    public void onFailure(int code, String message) {
        subscriber.onError(new Exception(message));
    }
}

即使您不希望整个流失败,您仍然需要调用 subscriber.onError() 方法.还有一些其他方法可以减少错误.其中之一是 onErrorResumeNext 运算符:

Even if you don't want the whole stream to fail, you still need to call a subscriber.onError() method. There are some other ways to shallow the errors. One of them is an onErrorResumeNext operator:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);

编辑

我还有最后一个问题:如果您注意到我的拉链功能,我会返回Observable.empty() 如果两个对象不能被压缩,并且一旦我返回内容.这似乎是错误的.我应该如何处理这样的错误zipper 函数中的条件?

I have one last question: if you notice my zipper functions, I return Observable.empty() if the two objects cannot be zipped, and once I return Content. This seems wrong. How should I handle such error conditions in the zipper function?

是的,返回 Observable.empty() 是完全错误的.从 zip 函数抛出异常似乎是最好的解决方案:

Yes, returning Observable.empty() is totally wrong. Throwing an exception from zip function seems like the best solution:

Observable
        .from(cm.getPermalinks(10))
        .flatMap(permalink -> Observable.zip(
                Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
                (dataContent, streamUrlContent) -> {
                    if (!isDataValid(dataContent, streamUrlContent)) {
                        throw new RuntimeException("Something went wrong.");
                    }
                    return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
                }).onErrorResumeNext(Observable.empty()))
        .subscribe(System.out::println);

这篇关于压缩 observable 的错误处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆