Scala中有FIFO流吗? [英] Is there a FIFO stream in Scala?

查看:75
本文介绍了Scala中有FIFO流吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Scala中寻找FIFO流,即提供以下功能的东西

I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of

  • immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
  • mutable.Queue (which allows for added elements to the FIFO)

该流应该是可关闭的,并且应该阻止对下一个元素的访问,直到添加了该元素或关闭了该流为止.

The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.

实际上,我对集合库不(似乎)包含这样的数据结构感到有些惊讶,因为它是IMO的一种非常经典的结构.

Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.

我的问题:

  • 1)我是否忽略了某些内容?已经有提供这种功能的类了吗?

  • 1) Did I overlook something? Is there already a class providing this functionality?

2)好吧,如果它不包含在集合库中,那么它可能只是现有集合类的简单组合.但是,我试图找到这种简单的代码,但是对于这样一个简单的问题,我的实现仍然相当复杂.这样的FifoStream是否有更简单的解决方案?

2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?

class FifoStream[T] extends Closeable {

val queue = new Queue[Option[T]]

lazy val stream = nextStreamElem

private def nextStreamElem: Stream[T] = next() match {
    case Some(elem) => Stream.cons(elem, nextStreamElem)
    case None       => Stream.empty
}

/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = {
    queue.synchronized {
        if (queue.isEmpty) queue.wait()
        queue.dequeue()
    }
}

/** Adds new elements to this stream. */
def enqueue(elems: T*) {
    queue.synchronized {
        queue.enqueue(elems.map{Some(_)}: _*)
        queue.notify()
    }
}

/** Closes this stream. */
def close() {
    queue.synchronized {
        queue.enqueue(None)
        queue.notify()
    }
}
}

感谢您的建议.我稍微修改了范式的解决方案,以便toStream返回一个不可变的流(允许重复读取),使其适合我的需求.为了完整起见,下面是代码:

Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

推荐答案

在Scala中,流是功能迭代器".人们期望它们是纯净的(无副作用)且一成不变.在这种情况下,每次在流上进行迭代时,都会修改队列(因此​​,它不是纯粹的).这可能会产生很多误解,因为对同一流进行两次迭代将产生两个不同的结果.

In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.

话虽这么说,您应该使用Java BlockingQueues,而不是滚动自己的实现.在安全性和性能方面,它们被认为实施得很好.这是我能想到的最干净的代码(使用您的方法):

That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}

这篇关于Scala中有FIFO流吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆