用于多线程的Scala迭代器 [英] Scala Iterator for multithreading

查看:264
本文介绍了用于多线程的Scala迭代器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用scala Iteratorsynchronized块中等待循环:

I am using scala Iterator for waiting loop in synchronized block:

anObject.synchronized {
    if (Try(anObject.foo()).isFailure) {
        Iterator.continually { 
            anObject.wait()
            Try(anObject.foo()) 
        }.dropWhile(_.isFailure).next()
    }
    anObject.notifyAll()
}

Iterator与并发和多线程一起使用是否可以接受?如果没有,为什么?然后使用什么以及如何使用?

Is it acceptable to use Iterator with concurrency and multithreading? If not, why? And then what to use and how?

有一些细节,如果重要的话. anObject是可变队列.队列中有多个生产者和消费者.因此,上面的代码块是此类生产者或消费者的代码. anObject.foo是一种通用的简化功能声明,可以使数据入队(对于生产者)或出队(对于消费者)到/从队列中.

There are some details, if it matters. anObject is a mutable queue. And there are multiple producers and consumers to the queue. So the block above is a code of such producer or consumer. anObject.foo is a common simplified declaration of function that either enqueue (for producer) or dequeue (for consumer) data to/from the queue.

推荐答案

Iterator在内部是可变的,因此,如果在多线程环境中使用它,则必须考虑到这一点.如果您保证不会在例如以下情况下陷入困境

Iterator is mutable internally, so you have to take that into consideration if you use it in multi-threaded environment. If you guaranteed that you won't end up in situation when e.g.

  • 2个线程检查hasNext()
  • 其中一个调用next()-它恰好是最后一个元素
  • 其他电话next()-NPE
  • 2 threads check hasNext()
  • one of them calls next() - it happens to be the last element
  • the other calls next() - NPE

(或类似),那么您就可以了.在您的示例中,Iterator甚至都没有离开范围,因此错误不应该来自Iterator.

(or similar) then you should be ok. In your example Iterator doesn't even leave the scope, so the errors shouldn't come from Iterator.

但是,在您的代码中,我看到aObject.wait()aObject.notifyAll()彼此相邻的问题-如果调用.wait,则不会到达.notifyAll,这将解除阻止.您可以在REPL中检查它是否挂起:

However, in your code I see the issue with having aObject.wait() and aObject.notifyAll() next to each other - if you call .wait then you won't reach .notifyAll which would unblock it. You can check in REPL that this hangs:

@ val anObject = new Object { def foo() = throw new Exception }
anObject: {def foo(): Nothing} = ammonite.$sess.cmd21$$anon$1@126ae0ca

@ anObject.synchronized {
      if (Try(anObject.foo()).isFailure) {
          Iterator.continually {
              anObject.wait()
              Try(anObject.foo())
          }.dropWhile(_.isFailure).next()
      }
      anObject.notifyAll()
  }
// wait indefinitelly

我建议将设计更改为不依赖waitnotifyAll.但是,从您的代码很难说出要实现的目标,所以我无法确定这是否更像Promise-Future情况,monix.Observablemonix.Task或其他内容.

I would suggest changing the design to NOT rely on wait and notifyAll. However, from your code it is hard to say what you want to achieve so I cannot tell if this is more like Promise-Future case, monix.Observable, monix.Task or something else.

如果您的用例是队列,生产者和使用者,那么这听起来像是反应流的用例-例如FS2 + Monix,但可能是FS2 + IO或Akka Streams中的某些内容

If your use case is a queue, produces and consumers, then it sound like a use case for reactive streams - e.g. FS2 + Monix, but it could be FS2+IO or something from Akka Streams

val queue: Queue[Task, Item] // depending on use case queue might need to be bounded

// in one part of the application
queue.enqueu1(item) // Task[Unit]

// in other part of the application
queue
  .dequeue
  .evalMap { item =>
    // ...
    result: Task[Result]
  }
  .compile
  .drain

这种方法在设计应用程序时将需要进行一些更改,因为您将不再直接处理线程,而是设计流数据并声明什么是顺序的以及可以并行执行的操作,其中线程只是一个线程.实施细节.

This approach would require some change in thinking about designing an application, because you would no longer work on thread directly, but rather designed a flow data and declaring what is sequential and what can be done in parallel, where threads become just an implementation detail.

这篇关于用于多线程的Scala迭代器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆