RxJava抓取观测量在并行 [英] RxJava Fetching Observables In Parallel

查看:187
本文介绍了RxJava抓取观测量在并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在落实RxJava并行异步调用一些帮助。我拿起一个简单的用例,其中要显示的第一个电话取(而搜索)的产品清单(瓦)。随后调用出去取(A)审查和(B)产品图片

I need some help in implementing parallel asynchronous calls in RxJava. I have picked up a simple use case wherein the FIRST call fetches (rather searches) a list of products (Tile) to be displayed. The subsequent calls go out and fetch (A) REVIEWS and (B) PRODUCT IMAGES

经过多次尝试之后我得到了这个地方。

After several attempts I got to this place.

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
 2    List<Tile> allTiles = new ArrayList<Tile>();
 3    ClientResponse response = new ClientResponse();

 4    searchTile.parallel(oTile -> {
 5      return oTile.flatMap(t -> {
 6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
 7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());

 8        return Observable.zip(reviews, imageUrl, (r, u) -> {
 9          t.setReviews(r);
10          t.setImageUrl(u);

11          return t;
12        });

13      });
14    }).subscribe(e -> {
15      allTiles.add((Tile) e);
16    });

1号线:熄灭,并获取产品(瓦),以显示

Line 1: goes out and fetches the product (Tile) to be displayed

4号线:我们把观测的列表,并碎片它来获取评论和imageUrls

Line 4: We take the list of the Observable and SHARD it to fetch reviews and imageUrls

烈6,7:抓取观测审查和观测网址

Lie 6,7: Fetch the Observable review and Observable url

8号线:最后2个观测值都压缩,返回一个更新的观测

Line 8: Finally the 2 observables are zipped up to return an updated Observable

15号线:线路终于整理15要显示它可以返回给调用层的集合中的所有单个产品

Line 15: finally line 15 collates all the individual products to be displayed in a collection which can be returned back to the calling layer

虽然观测已经分片,并在我们的测试运行在4个不同的线程;评论和图像的获取似乎是一个接一个。我猜想,第8行压缩步骤基本上引起的2个观测值(审查和URL)顺序调用。

While the Observable has been sharded and in our tests run over 4 different threads; fetching of reviews and images seems to be one after another. I suspect that the zip step on line 8 is basically causing the sequential invocation of the the 2 observables (reviews and url).

请问这组有任何建议,平行取reiews和图像的URL。在本质上面附着的瀑布图应该看起来更加垂直堆叠。以评论和图像的调用应该是并行

Does this group have any suggestion to parallely fetch reiews and image urls. In essence the waterfall chart attached above should look more vertically stacked. The calls to reviews and images should be in parallel

谢谢
阿南德拉曼

thanks anand raman

推荐答案

并行运营商被证明是一个问题,几乎所有的使用案例和没有做什么最期待它,所以它是在1.0.0中删除。 rc.4发布<一个href=\"https://github.com/ReactiveX/RxJava/pull/1716\">https://github.com/ReactiveX/RxJava/pull/1716

The parallel operator proved to be a problem for almost all use cases and does not do what most expect from it, so it was removed in the 1.0.0.rc.4 release: https://github.com/ReactiveX/RxJava/pull/1716

如何做这种类型的行为,并得到并行执行的一个很好的例子可以看出<一个href=\"https://github.com/Netflix/ReactiveLab/blob/master/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java\">here.

A good example of how to do this type of behavior and get parallel execution can be seen here.

在您的例子code不清楚是否 searchServiceClient 是同步或异步的。它影响如何稍微解决问题,就好像它是已经异步不需要额外的调度。如果需要额外的同步调度。

In your example code it is unclear if searchServiceClient is synchronous or asynchronous. It affects how to solve the problem slightly as if it is already async no extra scheduling is needed. If synchronous extra scheduling is needed.

首先这里有展示同步和异步行为的一些简单的例子:

First here are some simple examples showing synchronous and asynchronous behavior:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

以下是在提供更符合您的code为例尝试:

Following is an attempt at providing an example that more closely matches your code:

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample {

    public static void main(String[] args) {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(() -> logTime("Search started ", startTime))
                .doOnCompleted(() -> logTime("Search completed ", startTime));

        Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
            Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                    .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
            Observable<String> imageUrl = getProductImage(t.getProductId())
                    .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));

            return Observable.zip(reviews, imageUrl, (r, u) -> {
                return new TileResponse(t, r, u);
            }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
        });

        List<TileResponse> allTiles = populatedTiles.toList()
                .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
                .toBlocking().single();
    }

    private static Observable<Tile> getSearchResults(String string) {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id) {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id) {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime) {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(T... ts) {
        return Observable.create((Subscriber<? super T> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                for (T t : ts) {
                    s.onNext(t);
                }
                s.onCompleted();
            }).subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse {

        public TileResponse(Tile t, Reviews r, String u) {
            // store the values
        }

    }

    public static class Tile {

        private final int id;

        public Tile(int i) {
            this.id = i;
        }

        public int getSellerId() {
            return id;
        }

        public int getProductId() {
            return id;
        }

    }

    public static class Reviews {

    }
}

此输出:

Search started  => 65ms
Search completed  => 1094ms
getProductImage[1] completed  => 2095ms
getSellerReviews[2] completed  => 2095ms
getProductImage[3] completed  => 2095ms
zip[1] completed  => 2096ms
zip[2] completed  => 2096ms
getProductImage[2] completed  => 2096ms
getSellerReviews[1] completed  => 2096ms
zip[3] completed  => 2096ms
All Tiles Completed  => 2097ms
getSellerReviews[3] completed  => 2097ms

我已每个IO呼叫模拟取1000毫秒所以很明显,其中延迟是,它是并行发生的。它打印出的进步使得经过的毫秒。

I have made each IO call be simulated to take 1000ms so it is obvious where the latency is and that it is happening in parallel. It prints out the progress is makes in elapsed milliseconds.

这里的窍门是,flatMap合并异步调用,因此只要观测量被合并是异步,他们都将同时进行。

The trick here is that flatMap merges async calls, so as long as the Observables being merged are async, they will all be executed concurrently.

如果像 getProductImage(t.getProductId())是同步的,它可以异步了这样一个电话:getProductImage(t.getProductId())subscribeOn( Schedulers.io)。

If a call like getProductImage(t.getProductId()) was synchronous, it can be made asynchronous like this: getProductImage(t.getProductId()).subscribeOn(Schedulers.io).

下面是上述实施例的没有所有的记录和样板类型的重要部分:

Here is the important part of the above example without all the logging and boilerplate types:

    Observable<Tile> searchTile = getSearchResults("search term");;

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
        Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
        Observable<String> imageUrl = getProductImage(t.getProductId());

        return Observable.zip(reviews, imageUrl, (r, u) -> {
            return new TileResponse(t, r, u);
        });
    });

    List<TileResponse> allTiles = populatedTiles.toList()
            .toBlocking().single();

我希望这有助于。

I hope this helps.

这篇关于RxJava抓取观测量在并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆