在flatMap {...}映射器函数内部进行阻塞操作是否安全? [英] If it safe to have blocking operation inside flatMap { ... } mapper function?

查看:65
本文介绍了在flatMap {...}映射器函数内部进行阻塞操作是否安全?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想组织一个线程屏障:给定一个锁对象,任何线程都可以获取它并进一步延伸线程的链,但是任何其他线程将保持休眠状态,直到第一个线程完成并释放该线程锁定.

I'd like to organize a thread barrier: given a single lock object, any thread can obtain it and continue thread's chain further, but any other thread will stay dormant on the same lock object until the first thread finishes and releases the lock.

让我们用代码表达我的意图(log()只是在日志中打印字符串):

Let's express my intention in code (log() simply prints string in a log):

val mutex = Semaphore(1)  // number of permits is 1

source
.subscribeOn(Schedulers.newThread())  // any unbound scheduler (io, newThread)
.flatMap {
    log("#1")
    mutex.acquireUninterruptibly()
    log("#2")
    innerSource
       .doOnSubscribe(log("#3"))
       .doFinally {
          mutex.release()
          log("#4")
       }
}
.subscribe()

它实际上运行良好,我可以看到多个线程如何显示日志#1",并且其中只有一个进一步传播,获得锁定对象 mutex ,然后释放它,我可以看到其他日志,然后下一个线程起作用.好

It actually works well, i can see how multiple threads show log "#1" and only one of them propagates further, obtaining lock object mutex, then it releases it and i can see other logs, and next threads comes into play. OK

但是有时候,当压力很高并且线程数更大时,例如4-5,我会遇到死锁:

But sometimes, when pressure is quite high and number of threads is greater, say 4-5, i experience DEADLOCK:

实际上,已获得锁的线程会打印#1"和#2",但随后从不打印#3"(因此 doOnSubscribe() >未调用),因此它实际上停止并且不执行任何操作,没有订阅 flatMap 中的 innerSource .因此,所有线程都被阻止,应用程序完全没有响应.

Actually, the thread that has acquired the lock, prints "#1" and "#2" but it then never print "#3" (so doOnSubscribe() not called), so it actually stops and does nothing, not subscribing to innerSource in flatMap. So all threads are blocked and app is not responsive at all.

我的问题-在 flatMap 内部进行屏蔽操作是否安全?我深入研究了 flatMap 源代码,然后看到了其内部订阅的位置:

My question - is it safe to have blocking operation inside flatMap? I dig into flatMap source code and i see the place where it internally subscribes:

if (!isDisposed()) {
    o.subscribe(new FlatMapSingleObserver<R>(this, downstream));
}

获取锁的线程订阅是否可能以某种方式被处置?

Is it possible that thread's subscription, that has acquired lock, was disposed somehow?

推荐答案

您可以使用flatMap第二个参数maxConcurrency并将其设置为1,这样它就可以执行您想要的操作而无需手动锁定

You can use flatMap second parameter maxConcurrency and set it to 1, so it does what you want without manually locking

这篇关于在flatMap {...}映射器函数内部进行阻塞操作是否安全?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆