如何使用 RxJava 串行批处理长进程? [英] How to batch long process in serial using RxJava?

查看:57
本文介绍了如何使用 RxJava 串行批处理长进程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一大串字符串需要根据远程 API 进行检查.

I have a big list of strings that needs to be checked against remote API.

Observable.from(List<String> strings) // let's say the `strings` has > 5000 items
   .buffer(50) // splitting the strings into 50-sized chunks, it returns Observable<List<String>> (fast)
   .flatMap((strings) -> {
       // checkPhoneNumbers is a network call using Retrofit's RxJava (slow)
       return mSyncApi.checkPhoneNumbers(strings);
   })
   .reduce( ... ) // aggregate all checking results
   .subscribe( ... );

问题是 buffer() 似乎发出 List 太快以至于所有多个 .checkPhoneNumbers() 几乎都被执行了在同一时间.我想要实现的是将 .checkPhoneNumbers() 加入队列以更好地支持连接速度较慢的设备.

The problem is buffer() seems to emit List<String> too fast that all of multiple .checkPhoneNumbers() get executed almost in the same time. What I would like to achieve is to enqueue .checkPhoneNumbers() to better support devices with slow connection.

按预定义的时间间隔限制发出的 List 没有意义,因为这对于具有闪电般快速连接的设备来说是一个缺点.我在 flatMap() 之后立即尝试了 RxJava 的 serialize() 但它似乎没有任何区别(尽管我不知道它是否正确使用 <代码>序列化).

Throttling the emitted List<String> by predefined time interval doesn't make sense since it will be a disadvantage for devices with lightning fast connection. I have tried RxJava's serialize() right after the flatMap() but it doesn't seems make any difference (although I don't know if it's the right use of serialize).

感谢任何替代方法!谢谢.

Any alternative approaches appreciated! Thanks.

推荐答案

正如@zsxwing 所建议的那样,我认为 maxConcurrent 重载是您在尝试限制并发时所需要的发生在 flatMap 内.

As @zsxwing suggested, I think the maxConcurrent overload is what you're looking for if you're trying to limit the concurrency occurring inside flatMap.

例如:https://gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78

private static void flatMapBufferedExampleAsync() {
    final AtomicInteger total = new AtomicInteger();
    Observable.range(0, 500000000)
            .doOnNext(i -> total.incrementAndGet())
            .buffer(100)
            .doOnNext(i -> System.out.println("emit " + i))
            .flatMap(i -> {
                return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
                    // simulate computational work
                        try {
                            Thread.sleep(10);
                        } catch (Exception e) {
                        }
                        return item + " processed " + Thread.currentThread();
                    });
            }, 2 /* limit concurrency to 2 */) // <--- note argument here
           .toBlocking().forEach(System.out::println);

    System.out.println("total emitted: " + total.get());
}

这篇关于如何使用 RxJava 串行批处理长进程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆