RxJava flatMap 和背压奇怪的行为 [英] RxJava flatMap and backpressure strange behavior

查看:55
本文介绍了RxJava flatMap 和背压奇怪的行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用 RxJava 编写数据同步作业时,我发现了一个我无法解释的奇怪行为.我是 RxJava 的新手,希望得到帮助.

While writing a data synchronization job with RxJava I discovered a strange behavior that I cannot explain. I'm quite novice with RxJava and would appreciate help.

简而言之,我的工作非常简单,我有一个元素 ID 列表,我调用 Web 服务以通过 ID 获取每个元素,进行一些处理并进行多次调用以将数据推送到数据库.数据加载比数据存储快,所以我遇到了 OutOfMemory 错误.

Briefely my job is quite simple I have a list of element IDs, I call a webservice to get each element by ID, do some processing and do multiple call to push data to DB. Data loading is faster than data storing so I encounted OutOfMemory errors.

我的代码看起来很像失败"测试,但后来做了一些测试,我意识到删除了该行:

My code pretty much look like "failing" test but then doning some test I realized that removing the line :

flatMap(dt -> Observable.just(dt))

让它发挥作用.失败的测试输出清楚地表明未消耗的项目堆积起来,这导致 OutOfMemory.工作测试输出显示生产者将始终等待消费者,因此这永远不会导致 OutOfMemory.

Make it work. Failing test output shows clearly that unconsumed items stack up and this lead to OutOfMemory. Working test output shows that producer will always wait consumer so this never lead to OutOfMemory.

public static class DataStore {
    public Integer myVal;
    public byte[] myBigData;

    public DataStore(Integer myVal) {
        this.myVal = myVal;
        this.myBigData = new byte[1000000];
    }
}

@Test
public void working() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

@Test
public void failing() {
    int MAX_CONCURRENT_LOAD = 1;
    int MAX_CONCURRENT_STORE = 2;

    AtomicInteger nbUnconsumed = new AtomicInteger(0);

    List<Integer> ids = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
    Observable.from(ids)
        .flatMap(this::produce, MAX_CONCURRENT_LOAD)
        .doOnNext(s -> logger.info("+1 Total unconsumed values: " + nbUnconsumed.incrementAndGet()))
        .flatMap(dt -> Observable.just(dt))
        .flatMap(this::consume, MAX_CONCURRENT_STORE)
        .doOnNext(s -> logger.info("-1 Total unconsumed values: " + nbUnconsumed.decrementAndGet()))
        .toBlocking().forEach(s -> {});

    logger.info("Finished");
}

private Observable<DataStore> produce(final int value) {
    return Observable.<DataStore>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(200); //Here I synchronous call WS to retrieve data
                s.onNext(new DataStore(value));
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onError(e);
        }
    }).subscribeOn(Schedulers.io());
}

private Observable<Boolean> consume(DataStore value) {
    return Observable.<Boolean>create(s -> {
        try {
            if (!s.isUnsubscribed()) {
                Thread.sleep(1000); //Here I synchronous call DB to store data
                s.onNext(true);
                s.onCompleted();
            }
        } catch (Exception e) {
            s.onNext(false);
            s.onCompleted();
        }
    }).subscribeOn(Schedulers.io());
}

这种行为背后的解释是什么?如何在不删除 Observable.just(dt)) 的情况下解决我失败的测试,在我的真实情况下它是 Observable.from(someListOfItme)

What is explaination behind this behavior? How could I solve my failing test without removing the Observable.just(dt)) which in my real case is a Observable.from(someListOfItme)

推荐答案

flatMap 默认情况下合并无限数量的源,并且通过应用没有 maxConcurrent 参数的特定 lambda,您基本上没有限制上游,现在可以全速运行,压倒其他算子的内部缓冲区.

flatMap by default merges an unlimited amount of sources and by applying that specific lambda without maxConcurrent parameter, you essentially unbounded the upstream which now can run at full speed, overwhelming the internal buffers of the other operators.

这篇关于RxJava flatMap 和背压奇怪的行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆