用于多线程的Scala迭代器 [英] Scala Iterator for multithreading
问题描述
我正在使用scala Iterator
在synchronized
块中等待循环:
I am using scala Iterator
for waiting loop in synchronized
block:
anObject.synchronized {
if (Try(anObject.foo()).isFailure) {
Iterator.continually {
anObject.wait()
Try(anObject.foo())
}.dropWhile(_.isFailure).next()
}
anObject.notifyAll()
}
将Iterator
与并发和多线程一起使用是否可以接受?如果没有,为什么?然后使用什么以及如何使用?
Is it acceptable to use Iterator
with concurrency and multithreading? If not, why? And then what to use and how?
有一些细节,如果重要的话. anObject
是可变队列.队列中有多个生产者和消费者.因此,上面的代码块是此类生产者或消费者的代码. anObject.foo
是一种通用的简化功能声明,可以使数据入队(对于生产者)或出队(对于消费者)到/从队列中.
There are some details, if it matters. anObject
is a mutable queue. And there are multiple producers and consumers to the queue. So the block above is a code of such producer or consumer. anObject.foo
is a common simplified declaration of function that either enqueue (for producer) or dequeue (for consumer) data to/from the queue.
推荐答案
Iterator
在内部是可变的,因此,如果在多线程环境中使用它,则必须考虑到这一点.如果您保证不会在例如以下情况下陷入困境
Iterator
is mutable internally, so you have to take that into consideration if you use it in multi-threaded environment. If you guaranteed that you won't end up in situation when e.g.
- 2个线程检查
hasNext()
- 其中一个调用
next()
-它恰好是最后一个元素 - 其他电话
next()
-NPE
- 2 threads check
hasNext()
- one of them calls
next()
- it happens to be the last element - the other calls
next()
- NPE
(或类似),那么您就可以了.在您的示例中,Iterator
甚至都没有离开范围,因此错误不应该来自Iterator
.
(or similar) then you should be ok. In your example Iterator
doesn't even leave the scope, so the errors shouldn't come from Iterator
.
但是,在您的代码中,我看到aObject.wait()
和aObject.notifyAll()
彼此相邻的问题-如果调用.wait
,则不会到达.notifyAll
,这将解除阻止.您可以在REPL中检查它是否挂起:
However, in your code I see the issue with having aObject.wait()
and aObject.notifyAll()
next to each other - if you call .wait
then you won't reach .notifyAll
which would unblock it. You can check in REPL that this hangs:
@ val anObject = new Object { def foo() = throw new Exception }
anObject: {def foo(): Nothing} = ammonite.$sess.cmd21$$anon$1@126ae0ca
@ anObject.synchronized {
if (Try(anObject.foo()).isFailure) {
Iterator.continually {
anObject.wait()
Try(anObject.foo())
}.dropWhile(_.isFailure).next()
}
anObject.notifyAll()
}
// wait indefinitelly
我建议将设计更改为不依赖wait
和notifyAll
.但是,从您的代码很难说出要实现的目标,所以我无法确定这是否更像Promise
-Future
情况,monix.Observable
,monix.Task
或其他内容.
I would suggest changing the design to NOT rely on wait
and notifyAll
. However, from your code it is hard to say what you want to achieve so I cannot tell if this is more like Promise
-Future
case, monix.Observable
, monix.Task
or something else.
如果您的用例是队列,生产者和使用者,那么这听起来像是反应流的用例-例如FS2 + Monix,但可能是FS2 + IO或Akka Streams中的某些内容
If your use case is a queue, produces and consumers, then it sound like a use case for reactive streams - e.g. FS2 + Monix, but it could be FS2+IO or something from Akka Streams
val queue: Queue[Task, Item] // depending on use case queue might need to be bounded
// in one part of the application
queue.enqueu1(item) // Task[Unit]
// in other part of the application
queue
.dequeue
.evalMap { item =>
// ...
result: Task[Result]
}
.compile
.drain
这种方法在设计应用程序时将需要进行一些更改,因为您将不再直接处理线程,而是设计流数据并声明什么是顺序的以及可以并行执行的操作,其中线程只是一个线程.实施细节.
This approach would require some change in thinking about designing an application, because you would no longer work on thread directly, but rather designed a flow data and declaring what is sequential and what can be done in parallel, where threads become just an implementation detail.
这篇关于用于多线程的Scala迭代器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!