使用 RxJava 并行调用网络服务.这是正确的方法吗? [英] Calling network services in parallel using RxJava. Is this the right way?

查看:36
本文介绍了使用 RxJava 并行调用网络服务.这是正确的方法吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想法是并行进行 3 个网络调用.(我使用谷歌作为演示目的的服务.以下工作,但不确定这是正确的方法还是可以简化.如果我必须结合所有三个搜索的响应,我该怎么办?请指教.

Idea is to make 3 network calls in parallel. (I am using Google as the servies for demo purpose. The following works but not sure if this is the right way or it can be simplified. What should I do if I have to combine the responses of all the three searches? Please advise.

public class GoogleSearchRx
{
    public static void main(String args[])
    {
        CountDownLatch latch = new CountDownLatch(3);

        search("RxJava").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        //run the last one on current thread
        search("Erik Meijer").subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        try
        {
            latch.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    public static Observable<Elements> search(String q)
    {
        String google = "http://www.google.com/search?q=";

        String charset = "UTF-8";
        String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!

        return Observable.create(new Observable.OnSubscribe<Elements>()
        {

            @Override public void call(Subscriber<? super Elements> subscriber)
            {
                out.println(currentThreadName() + "\tOnSubscribe.call");

                try
                {
                    Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
                    subscriber.onNext(links);
                }
                catch (IOException e)
                {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }
}

推荐答案

通过您问题的组合所有三个搜索的响应"部分,您可能正在寻找 Zip.

Going by the "combine the responses of all the three searches" part of your question, you might be looking for Zip.

Observable<Elements> search1 = search("RxJava");
Observable<Elements> search2 = search("Reactive Extensions");
Observable<Elements> search3 = search("Eric Meijer");
Observable.zip(searc1, search2, search3,
            new Func3<Elements, Elements, Elements, Elements>() {
                @Override
                public Elements call(Elements result1, Elements result2, Elements result3) {
                    // Add all the results together...
                    return results;
                }
            }
    ).subscribeOn(Schedulers.io()).subscribe(
            links -> {
                links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                latch.countDown();
            },
            e -> {
                out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                latch.countDown();
            }
    );

这假设您想同时处理所有结果(在订阅者中,这里)并且不关心给定结果使用了哪个查询.

This assumes you want to deal with all the results at the same time (in the subscriber, here) and not care about which query was used for the given result.

注意 zip 函数的不同版本,从 1..N observables 和 Func1Func9FuncN,允许您压缩特定或任意数量的 observable.

Note there's different versions of the zip function, taking from 1..N observables, and Func1 to Func9 or FuncN, allowing you to zip a specific or arbitrarily large number of observables.

这篇关于使用 RxJava 并行调用网络服务.这是正确的方法吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆