在Reactive Extensions中使用Flowable时避免同池死锁 [英] Avoiding same-pool deadlocks when using Flowable in Reactive Extensions

查看:249
本文介绍了在Reactive Extensions中使用Flowable时避免同池死锁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在订阅Reactive Extensions Flowable 流时,我注意到在128个项目出现之后,该流暂停/挂起(不再发射任何将来的项目,并且没有返回错误)。

While subscribing to a Reactive Extensions Flowable stream, I noticed the stream halts/hangs (no more future items are emitted, and no error is returned) after 128 items have been returned.

val download: Flowable<DownloadedRecord> = sensor.downloadRecords()
download
    .doOnComplete { Log.i( "TEST", "Finished!" ) }
    .subscribe(
        { record ->
            Log.i( "TEST", "Got record: ${record.record.id}; left: ${record.recordsLeft}" )
        },
        { error ->
            Log.i( "TEST", "Error while downloading records: $error" )
        } )

最有可能与Reactive Extensions有关。我发现默认缓冲区大小为 Flowable 设置为128

Most likely, this is related to Reactive Extensions. I discovered the default buffer size of Flowable is set to 128; unlikely to be a coincidence.

在试图了解正在发生的事情时,我在> Flowable.subscribeOn

While trying to understand what is happening, I ran into the following documentation on Flowable.subscribeOn.


如果存在 create(FlowableOnSubscribe,BackpressureStrategy)类型作为链中的源,建议使用 requestOn false以避免同一池死锁,因为请求可能堆积在渴望/阻止的发射器后面

If there is a create(FlowableOnSubscribe, BackpressureStrategy) type source up in the chain, it is recommended to have requestOn false to avoid same-pool deadlock because requests may pile up behind an eager/blocking emitter.

尽管我不太了解在这种情况下相同池死锁是什么,就像我的信息流中正在发生类似的事情。

Although I do not quite understand what a same-pool deadlock is in this situation, it looks like something similar is happening to my stream.

1。什么是Reactive Extensions中的同池死锁什么是在Android上重新创建它的最小代码示例?

1. What is a same-pool deadlock in Reactive Extensions? What would be a minimal code sample to recreate it (on Android)?

目前不知所措,我尝试在 .subscribe 之前应用 .subscribeOn(Schedulers.io(),false) ,虽然并没有真正理解它的作用,但是在发送128个项目之后,我的流仍然锁定。

Currently at a loss, I tried applying .subscribeOn( Schedulers.io(), false ) before .subscribe, without really understanding what this does, but my stream still locks up after 128 items have been emitted.

2。我该如何调试该问题,以及如何/在何处解决?

推荐答案



  1. 什么是Reactive Extensions中的同池死锁?


RxJava在标准调度程序中使用单线程执行程序。当阻塞或急切的源发出项目时,它占用了该单个线程,即使下游请求更多, subscribeOn 也会将这些请求安排在当前正在运行/阻塞的代码之后,然后永远不会收到有关新机会的通知。

RxJava uses single threaded executors in the standard schedulers. When a blocking or eager source is emitting items, it occupies this single thread and even though the downstream requests more, subscribeOn will schedule those requests behind the currently running/blocking code that then never gets notified about the new opportunities.


在Android上重新创建它的最小代码示例是什么?

What would be a minimal code sample to recreate it (on Android)?

为什么要让代码陷入僵局?

Why would you want code that deadlocks?


应用.subscribeOn(Schedulers.io(),false)

I tried applying .subscribeOn( Schedulers.io(), false )

您的实际流量是多少?您可能将 subscribeOn 应用于离源太远的地方,因此它没有任何作用。最可靠的方法是将其放在 create 旁边。

What is your actual flow? You likely applied subscribeOn too far from the source and thus it has no effect. The most reliable is to put it right next to create.


我怎么能去调试这个问题,以及如何/在哪里解决?

How could I go about debugging this issue, and how/where can it be resolved?

放入 doOnNext doOnRequest 在各个地方,查看信号消失的地方。

Putting doOnNext and doOnRequest at various places and see where signals disappear.

这篇关于在Reactive Extensions中使用Flowable时避免同池死锁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆