在Scala中使用惰性评估或融合进行迭代? [英] Iteratees in Scala that use lazy evaluation or fusion?

查看:96
本文介绍了在Scala中使用惰性评估或融合进行迭代?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我听说迭代者是懒惰的,但是他们到底有多懒惰?另外,是否可以将迭代器与后处理功能融合在一起,从而不必构建中间数据结构?

I have heard that iteratees are lazy, but how lazy exactly are they? Alternatively, can iteratees be fused with a postprocessing function, so that an intermediate data structure does not have to be built?

例如,我是否可以在我的iteratee中从java.io.BufferedReader 构建一百万个项目Stream[Option[String]] ,然后以合成的方式过滤掉None,而无需整个Stream要保存在内存中?同时确保我不会炸毁烟囱吗?或类似的东西-不必使用Stream.

Can I in my iteratee for example build a 1 million item Stream[Option[String]] from a java.io.BufferedReader, and then subsequently filter out the Nones, in a compositional way, without requiring the entire Stream to be held in memory? And at the same time guarantee that I don't blow the stack? Or something like that - it doesn't have to use a Stream.

我目前正在使用Scalaz 6,但是如果其他iteratee实现能够更好地做到这一点,我很想知道.

I'm currently using Scalaz 6 but if other iteratee implementations are able to do this in a better way, I'd be interested to know.

请提供完整的解决方案,包括关闭BufferedReader并致电unsafePerformIO(如果适用).

Please provide a complete solution, including closing the BufferedReader and calling unsafePerformIO, if applicable.

推荐答案

下面是使用Scalaz 7库的快速iteratee示例,该示例演示了您感兴趣的属性:恒定内存和堆栈使用情况.

Here's a quick iteratee example using the Scalaz 7 library that demonstrates the properties you're interested in: constant memory and stack usage.

首先假设我们有一个大文本文件,每行上都有一个十进制数字字符串,并且我们要查找所有包含至少二十个零的行.我们可以生成一些示例数据,如下所示:

First assume we've got a big text file with a string of decimal digits on each line, and we want to find all the lines that contain at least twenty zeros. We can generate some sample data like this:

val w = new java.io.PrintWriter("numbers.txt")
val r = new scala.util.Random(0)

(1 to 1000000).foreach(_ =>
  w.println((1 to 100).map(_ => r.nextInt(10)).mkString)
)

w.close()

现在,我们有了一个名为numbers.txt的文件.让我们用BufferedReader:

Now we've got a file named numbers.txt. Let's open it with a BufferedReader:

val reader = new java.io.BufferedReader(new java.io.FileReader("numbers.txt"))

它不是太大(〜97兆字节),但是它足够大,我们可以轻松查看在处理过程中内存使用是否实际上保持不变.

It's not excessively large (~97 megabytes), but it's big enough for us to see easily whether our memory use is actually staying constant while we process it.

对于某些进口品,首先:

First for some imports:

import scalaz._, Scalaz._, effect.IO, iteratee.{ Iteratee => I }

和一个枚举器(请注意,为了方便起见,我将IoExceptionOr更改为Option):

And an enumerator (note that I'm changing the IoExceptionOrs into Options for the sake of convenience):

val enum = I.enumReader(reader).map(_.toOption)

Scalaz 7当前尚不提供枚举文件行的好方法,因此我们一次将文件中的每个字符分块.这当然会非常痛苦地缓慢,但是这里我不必担心,因为此演示的目的是表明我们可以在恒定内存中处理这个大文件,而不会浪费堆栈.该答案的最后一部分提供了一种具有更好性能的方法,但是在这里,我们仅在换行符处进行分割:

Scalaz 7 doesn't currently provide a nice way to enumerate a file's lines, so we're chunking through the file one character at a time. This will of course be painfully slow, but I'm not going to worry about that here, since the goal of this demo is to show that we can process this large-ish file in constant memory and without blowing the stack. The final section of this answer gives an approach with better performance, but here we'll just split on line breaks:

val split = I.splitOn[Option[Char], List, IO](_.cata(_ != '\n', false))

如果splitOn带有指定 not 拆分位置的谓词这一事实使您感到困惑,那么您并不孤单. split是我们的第一个枚举示例.我们将继续使用它,并将枚举器包装在其中:

And if the fact that splitOn takes a predicate that specifies where not to split confuses you, you're not alone. split is our first example of an enumeratee. We'll go ahead and wrap our enumerator in it:

val lines = split.run(enum).map(_.sequence.map(_.mkString))

现在我们在IO monad中有一个Option[String] s的枚举器.

Now we've got an enumerator of Option[String]s in the IO monad.

接下来是谓词-请记住,我们说过我们想要至少有20个零的行:

Next for our predicate—remember that we said we wanted lines with at least twenty zeros:

val pred = (_: String).count(_ == '0') >= 20

我们可以将其转换为过滤枚举,然后将枚举器包装在其中:

We can turn this into a filtering enumeratee and wrap our enumerator in that:

val filtered = I.filter[Option[String], IO](_.cata(pred, true)).run(lines)

我们将设置一个简单的操作,该操作仅打印通过此过滤器进行过滤的所有内容:

We'll set up a simple action that just prints everything that makes it through this filter:

val printAction = (I.putStrTo[Option[String]](System.out) &= filtered).run

当然,我们还没有真正阅读任何内容.为此,我们使用unsafePerformIO:

Of course we've not actually read anything yet. To do that we use unsafePerformIO:

printAction.unsafePerformIO()

现在我们可以看到Some("0946943140969200621607610...")缓慢滚动,同时我们的内存使用率保持不变.它很慢,而且错误处理和输出有点笨拙,但对于大约九行代码,我认为还不错.

Now we can watch the Some("0946943140969200621607610...")s slowly scroll by while our memory usage remains constant. It's slow and the error handling and output are a little clunky, but not too bad I think for about nine lines of code.

这是foreach -ish用法.我们还可以创建一个更像折叠的iteratee,例如,收集通过过滤器构成的iteratee并将它们返回到列表中.只需重复以上所有内容,直到printAction定义,然后写成这样即可:

That was the foreach-ish usage. We can also create an iteratee that works more like a fold—for example gathering up the elements that make it through the filter and returning them in a list. Just repeat everything above up until the printAction definition, and then write this instead:

val gatherAction = (I.consume[Option[String], IO, List] &= filtered).run

开始执行该操作

val xs: Option[List[String]] = gatherAction.unsafePerformIO().sequence

现在去喝杯咖啡(可能需要很远的地方).当您回来时,您将拥有一个None(对于沿途某处的IOException而言)或一个包含一个1,943个字符串列表的Some.

Now go get a coffee (it might need to be pretty far away). When you come back you'll either have a None (in the case of an IOException somewhere along the way) or a Some containing a list of 1,943 strings.

要回答有关关闭阅读器的问题,下面是一个完整的工作示例,该示例大致等效于上面的第二个程序,但有一个枚举器负责打开和关闭阅读器.它也快很多,因为它读取行而不是字符.首先用于导入和一些辅助方法:

To answer your question about closing the reader, here's a complete working example that's roughly equivalent to the second program above, but with an enumerator that takes responsibility for opening and closing the reader. It's also much, much faster, since it reads lines, not characters. First for imports and a couple of helper methods:

import java.io.{ BufferedReader, File, FileReader }
import scalaz._, Scalaz._, effect._, iteratee.{ Iteratee => I, _ }

def tryIO[A, B](action: IO[B]) = I.iterateeT[A, IO, Either[Throwable, B]](
  action.catchLeft.map(
    r => I.sdone(r, r.fold(_ => I.eofInput, _ => I.emptyInput))
  )
)

def enumBuffered(r: => BufferedReader) =
  new EnumeratorT[Either[Throwable, String], IO] {
    lazy val reader = r
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(reader.readLine())).flatMap {
          case Right(null) => s.pointI
          case Right(line) => k(I.elInput(Right(line))) >>== apply[A]
          case e => k(I.elInput(e))
        }
    )
  }

现在是枚举数:

def enumFile(f: File): EnumeratorT[Either[Throwable, String], IO] =
  new EnumeratorT[Either[Throwable, String], IO] {
    def apply[A] = (s: StepT[Either[Throwable, String], IO, A]) => s.mapCont(
      k =>
        tryIO(IO(new BufferedReader(new FileReader(f)))).flatMap {
          case Right(reader) => I.iterateeT(
            enumBuffered(reader).apply(s).value.ensuring(IO(reader.close()))
          )
          case Left(e) => k(I.elInput(Left(e)))
        }
      )
  }

我们准备出发:

val action = (
  I.consume[Either[Throwable, String], IO, List] %=
  I.filter(_.fold(_ => true, _.count(_ == '0') >= 20)) &=
  enumFile(new File("numbers.txt"))
).run

现在,完成处理后,阅读器将关闭.

Now the reader will be closed when the processing is done.

这篇关于在Scala中使用惰性评估或融合进行迭代?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆