Scala - 懒惰地对有序迭代器进行分组 [英] Scala - grouping on an ordered iterator lazily

查看:30
本文介绍了Scala - 懒惰地对有序迭代器进行分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Iterator[Record],它在 record.id 上这样排序:

I have an Iterator[Record] which is ordered on record.id this way:

record.id=1
record.id=1
...
record.id=1
record.id=2
record.id=2
..
record.id=2

特定ID的记录可能会出现很多次,所以我想写一个函数,把这个迭代器作为输入,并在一个Iterator[Iterator[Record]]输出中返回一个懒惰的方式.

Records of a specific ID could occur a large number of times, so I want to write a function that takes this iterator as input, and returns an Iterator[Iterator[Record]] output in a lazy manner.

我能够想出以下内容,但在大约 500K 条记录后 StackOverflowError 失败:

I was able to come up with the following, but it fails on StackOverflowError after 500K records or so:

def groupByIter[T, B](iterO: Iterator[T])(func: T => B): Iterator[Iterator[T]] = new Iterator[Iterator[T]] {
    var iter = iterO
    def hasNext = iter.hasNext

    def next() = {
      val first = iter.next()
      val firstValue = func(first)
      val (i1, i2) = iter.span(el => func(el) == firstValue)
      iter = i2
      Iterator(first) ++ i1
    }
  }

我做错了什么?

推荐答案

这里的问题是每个 Iterator.span 调用都会为 trailing 迭代器创建另一个堆叠闭包,并且没有任何蹦床都很容易溢出.

Trouble here is that each Iterator.span call makes another stacked closure for trailing iterator, and without any trampolining it's very easy to overflow.

实际上我不认为有一个实现,它不会记住前缀迭代器的元素,因为可以在前缀耗尽之前访问后续迭代器.

Actually I dont think there is an implementation, which is not memoizing elements of prefix iterator, since followed iterator could be accessed earlier than prefix is drain out.

即使在 .span 实现 有一个 Queue 来记忆 Leading 定义中的元素.

Even in .span implementation there is a Queue to memoize elements in the Leading definition.

我能想象到的最简单的实现是通过 Stream 实现的.

So easiest implementation that I could imagine is the following via Stream.

implicit class StreamChopOps[T](xs: Stream[T]) {
  def chopBy[U](f: T => U): Stream[Stream[T]] = xs match {
    case x #:: _ =>
      def eq(e: T) = f(e) == f(x)
      xs.takeWhile(eq) #:: xs.dropWhile(eq).chopBy(f)
    case _ => Stream.empty
  }
}

虽然它可能不是最好的,因为它可以记住很多.但是通过适当的迭代,GC 应该处理过多的中间流的问题.

Although it could be not the most performant as it memoize a lot. But with proper iterating of that, GC should handle problem of excess intermediate streams.

您可以将其用作 myIterator.toStream.chopBy(f)

简单检查验证以下代码可以在没有 SO 的情况下运行

Simple check validates that following code can run without SO

Iterator.fill(10000000)(Iterator(1,1,2)).flatten //1,1,2,1,1,2,...
  .toStream.chopBy(identity)                     //(1,1),(2),(1,1),(2),...
  .map(xs => xs.sum * xs.size).sum               //60000000

这篇关于Scala - 懒惰地对有序迭代器进行分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆