Scala 中有 FIFO 流吗? [英] Is there a FIFO stream in Scala?

查看:26
本文介绍了Scala 中有 FIFO 流吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Scala 中寻找 FIFO 流,即提供

I'm looking for a FIFO stream in Scala, i.e., something that provides the functionality of

  • immutable.Stream (a stream that can be finite and memorizes the elements that have already been read)
  • mutable.Queue (which allows for added elements to the FIFO)

流应该是可关闭的,并且应该阻止对下一个元素的访问,直到该元素被添加或流被关闭.

The stream should be closable and should block access to the next element until the element has been added or the stream has been closed.

实际上我有点惊讶收藏库没有(似乎)包含这样的数据结构,因为它是 IMO 一个非常经典的数据结构.

Actually I'm a bit surprised that the collection library does not (seem to) include such a data structure, since it is IMO a quite classical one.

我的问题:

  • 1) 我是否忽略了什么?是否已有提供此功能的类?

  • 1) Did I overlook something? Is there already a class providing this functionality?

2) 好的,如果它没有包含在集合库中,那么它可能只是现有集合类的简单组合.然而,我试图找到这个微不足道的代码,但对于这样一个简单的问题,我的实现看起来仍然相当复杂.这种 FifoStream 有没有更简单的解决方案?

2) OK, if it's not included in the collection library then it might by just a trivial combination of existing collection classes. However, I tried to find this trivial code but my implementation looks still quite complex for such a simple problem. Is there a simpler solution for such a FifoStream?

class FifoStream[T] extends Closeable {

val queue = new Queue[Option[T]]

lazy val stream = nextStreamElem

private def nextStreamElem: Stream[T] = next() match {
    case Some(elem) => Stream.cons(elem, nextStreamElem)
    case None       => Stream.empty
}

/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = {
    queue.synchronized {
        if (queue.isEmpty) queue.wait()
        queue.dequeue()
    }
}

/** Adds new elements to this stream. */
def enqueue(elems: T*) {
    queue.synchronized {
        queue.enqueue(elems.map{Some(_)}: _*)
        queue.notify()
    }
}

/** Closes this stream. */
def close() {
    queue.synchronized {
        queue.enqueue(None)
        queue.notify()
    }
}
}

感谢您的建议.我稍微修改了范式的解决方案,以便 toStream 返回一个不可变的流(允许可重复读取),以满足我的需要.为了完整起见,这里是代码:

Thanks for your suggestions. I slightly modified paradigmatic's solution so that toStream returns an immutable stream (allows for repeatable reads) so that it fits my needs. Just for completeness, here is the code:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

推荐答案

在 Scala 中,流是函数式迭代器".人们期望它们是纯粹的(没有副作用)和不可变的.在你的情况下,每次你在流上迭代时,你都会修改队列(所以它不是纯的).这会造成很多误解,因为对同一个流迭代两次,会产生两种不同的结果.

In Scala, streams are "functional iterators". People expect them to be pure (no side effects) and immutable. In you case, everytime you iterate on the stream you modify the queue (so it's no pure). This can create a lot of misunderstandings, because iterating twice the same stream, will have two different results.

话虽如此,您还是应该使用 Java BlockingQueues,而不是滚动您自己的实现.它们被认为在安全和性能方面得到了很好的实施.这是我能想到的最干净的代码(使用您的方法):

That being said, you should rather use Java BlockingQueues, rather than rolling your own implementation. They are considered well implemented in term of safety and performances. Here is the cleanest code I can think of (using your approach):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}

这篇关于Scala 中有 FIFO 流吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆