RxJava 并行获取 Observable [英] RxJava Fetching Observables In Parallel

查看:40
本文介绍了RxJava 并行获取 Observable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一些帮助来在 RxJava 中实现并行异步调用.我选择了一个简单的用例,其中 FIRST 调用获取(而不是搜索)要显示的产品列表(Tile).随后的调用出去并获取 (A) 评论和 (B) 产品图像

I need some help in implementing parallel asynchronous calls in RxJava. I have picked up a simple use case wherein the FIRST call fetches (rather searches) a list of products (Tile) to be displayed. The subsequent calls go out and fetch (A) REVIEWS and (B) PRODUCT IMAGES

经过多次尝试,我到达了这个地方.

After several attempts I got to this place.

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
 2    List<Tile> allTiles = new ArrayList<Tile>();
 3    ClientResponse response = new ClientResponse();

 4    searchTile.parallel(oTile -> {
 5      return oTile.flatMap(t -> {
 6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
 7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());

 8        return Observable.zip(reviews, imageUrl, (r, u) -> {
 9          t.setReviews(r);
10          t.setImageUrl(u);

11          return t;
12        });

13      });
14    }).subscribe(e -> {
15      allTiles.add((Tile) e);
16    });

第1行:出去取要展示的产品(Tile)

Line 1: goes out and fetches the product (Tile) to be displayed

第 4 行:我们获取 Observable 的列表并对其进行 SHARD 以获取评论和 imageUrls

Line 4: We take the list of the Observable and SHARD it to fetch reviews and imageUrls

谎言 6,7:获取 Observable 评论和 Observable url

Lie 6,7: Fetch the Observable review and Observable url

第 8 行:最后将 2 个 observable 压缩以返回更新的 Observable

Line 8: Finally the 2 observables are zipped up to return an updated Observable

第 15 行:最后第 15 行整理所有要显示在集合中的单个产品,该集合可以返回到调用层

Line 15: finally line 15 collates all the individual products to be displayed in a collection which can be returned back to the calling layer

虽然 Observable 已被分片,并且在我们的测试中运行了 4 个不同的线程;获取评论和图像似乎是一个接一个.我怀疑第 8 行的 zip 步骤基本上导致了 2 个 observables(评论和 url)的顺序调用.

While the Observable has been sharded and in our tests run over 4 different threads; fetching of reviews and images seems to be one after another. I suspect that the zip step on line 8 is basically causing the sequential invocation of the the 2 observables (reviews and url).

这个小组有什么建议可以并行获取 reiews 和图像 url.本质上,上面附加的瀑布图应该看起来更垂直堆叠.对评论和图片的调用应该并行

Does this group have any suggestion to parallely fetch reiews and image urls. In essence the waterfall chart attached above should look more vertically stacked. The calls to reviews and images should be in parallel

谢谢阿南拉曼

推荐答案

并行运算符被证明是几乎所有用例的问题,并且不能满足大多数人的期望,因此它在 1.0.0 中被删除.rc.4 版本:https://github.com/ReactiveX/RxJava/pull/1716

The parallel operator proved to be a problem for almost all use cases and does not do what most expect from it, so it was removed in the 1.0.0.rc.4 release: https://github.com/ReactiveX/RxJava/pull/1716

可以看到如何执行此类行为并获得并行执行的一个很好的示例 这里.

A good example of how to do this type of behavior and get parallel execution can be seen here.

在您的示例代码中,不清楚 searchServiceClient 是同步还是异步.它会稍微影响如何解决问题,就好像它已经是异步的,不需要额外的调度.如果需要同步额外调度.

In your example code it is unclear if searchServiceClient is synchronous or asynchronous. It affects how to solve the problem slightly as if it is already async no extra scheduling is needed. If synchronous extra scheduling is needed.

首先是一些显示同步和异步行为的简单示例:

First here are some simple examples showing synchronous and asynchronous behavior:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

以下是尝试提供与您的代码更匹配的示例:

Following is an attempt at providing an example that more closely matches your code:

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample {

    public static void main(String[] args) {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(() -> logTime("Search started ", startTime))
                .doOnCompleted(() -> logTime("Search completed ", startTime));

        Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
            Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                    .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
            Observable<String> imageUrl = getProductImage(t.getProductId())
                    .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));

            return Observable.zip(reviews, imageUrl, (r, u) -> {
                return new TileResponse(t, r, u);
            }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
        });

        List<TileResponse> allTiles = populatedTiles.toList()
                .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
                .toBlocking().single();
    }

    private static Observable<Tile> getSearchResults(String string) {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id) {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id) {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime) {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(T... ts) {
        return Observable.create((Subscriber<? super T> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                for (T t : ts) {
                    s.onNext(t);
                }
                s.onCompleted();
            }).subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse {

        public TileResponse(Tile t, Reviews r, String u) {
            // store the values
        }

    }

    public static class Tile {

        private final int id;

        public Tile(int i) {
            this.id = i;
        }

        public int getSellerId() {
            return id;
        }

        public int getProductId() {
            return id;
        }

    }

    public static class Reviews {

    }
}

输出:

Search started  => 65ms
Search completed  => 1094ms
getProductImage[1] completed  => 2095ms
getSellerReviews[2] completed  => 2095ms
getProductImage[3] completed  => 2095ms
zip[1] completed  => 2096ms
zip[2] completed  => 2096ms
getProductImage[2] completed  => 2096ms
getSellerReviews[1] completed  => 2096ms
zip[3] completed  => 2096ms
All Tiles Completed  => 2097ms
getSellerReviews[3] completed  => 2097ms

我已将每个 IO 调用模拟为花费 1000 毫秒,因此很明显延迟在哪里并且它是并行发生的.它会打印出在经过的毫秒内完成的进度.

I have made each IO call be simulated to take 1000ms so it is obvious where the latency is and that it is happening in parallel. It prints out the progress is makes in elapsed milliseconds.

这里的技巧是 flatMap 合并异步调用,所以只要被合并的 Observables 是异步的,它们都会并发执行.

The trick here is that flatMap merges async calls, so as long as the Observables being merged are async, they will all be executed concurrently.

如果像 getProductImage(t.getProductId()) 这样的调用是同步的,它可以像这样变成异步的:getProductImage(t.getProductId()).subscribeOn(Schedulers.io).

If a call like getProductImage(t.getProductId()) was synchronous, it can be made asynchronous like this: getProductImage(t.getProductId()).subscribeOn(Schedulers.io).

这是上面示例的重要部分,没有所有日志记录和样板类型:

Here is the important part of the above example without all the logging and boilerplate types:

    Observable<Tile> searchTile = getSearchResults("search term");;

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
        Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
        Observable<String> imageUrl = getProductImage(t.getProductId());

        return Observable.zip(reviews, imageUrl, (r, u) -> {
            return new TileResponse(t, r, u);
        });
    });

    List<TileResponse> allTiles = populatedTiles.toList()
            .toBlocking().single();

我希望这会有所帮助.

这篇关于RxJava 并行获取 Observable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆