Scalaz 7 Iteratee处理大型zip文件(OutOfMemoryError) [英] Scalaz 7 Iteratee to process large zip file (OutOfMemoryError)

查看:101
本文介绍了Scalaz 7 Iteratee处理大型zip文件(OutOfMemoryError)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用scalaz iteratee包在恒定空间中处理大型zip文件.我需要对zip文件中的每个文件执行一个长期运行的过程.这些进程可以(并且应该)并行运行.

I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.

我创建了一个EnumeratorT,将每个ZipEntry膨胀为一个File对象.签名如下:

I created an EnumeratorT that inflates each ZipEntry into a File object. The signature looks like:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]

我想附加一个IterateeT,它将对每个文件执行长时间运行的过程.我基本上得到这样的结果:

I want to attach an IterateeT that will perform the long-running process on each file. I basically end up with something like:

type IOE[A] = IoExceptionOr[A]

def action(f:File):IO[List[Promise[IOE[File]]]] = (
  consume[Promise[IOE[File]], IO, List] %=
  map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
  Promise { Thread.sleep(5000); iof }

当我尝试运行它时:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get

我收到一条java.lang.OutOfMemoryError: Java heap space消息.这对我来说很有意义,因为它试图在所有这些IOPromise对象的内存中建立一个庞大的列表.

I get a java.lang.OutOfMemoryError: Java heap space message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO and Promise objects.

几个问题:

  • 有人对如何避免这种情况有任何想法吗?感觉我在错误地解决问题,因为我真的只在乎longRunningProcess的副作用.
  • 这里的Enumerator方法是错误的方法吗?
  • Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the longRunningProcess for its side-effects.
  • Is the Enumerator approach here the wrong approach?

我几乎没有主意,所以任何事情都会有所帮助.

I'm pretty much out of ideas, so anything will help.

谢谢!

更新#1

这是堆栈跟踪:

[error] java.lang.OutOfMemoryError: Java heap space
[error]         at scalaz.Free.flatMap(Free.scala:46)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error]         at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error]         at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)

我目前正在接受nadavwr的建议,以确保一切正常.我将报告所有更新.

I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.

更新#2

使用以下两个答案中的想法,我找到了一个不错的解决方案.正如huynhjl所建议的那样(并且我使用nadavwr的分析堆转储的建议进行了验证),consume导致每个膨胀的ZipEntry都保留在内存中,这就是进程耗尽内存的原因.我将consume更改为foldM,并更新了长时间运行的过程,仅返回了Promise[IOE[Unit]]而不是对该文件的引用.这样一来,我最后便拥有了所有IoExceptions的集合.这是有效的解决方案:

Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume was causing every inflated ZipEntry to be held in memory, which is why the process was running out of memory. I changed consume to foldM and updated the long-running process to just return a Promise[IOE[Unit]] instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
  foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
  map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
  map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
  enumZipFile(f)
).run

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
  Promise { Thread.sleep(5000); iof.map(println) }

此解决方案会在异步上传它们时夸大每个条目.最后,我有一个庞大的已实现Promise对象列表,其中包含任何错误.我仍然不完全相信这是对Iteratee的正确使用,但是我现在确实有几个可重用,可组合的部分,可以在我们系统的其他部分中使用(这对我们来说是很常见的模式).

This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).

感谢您的所有帮助!

推荐答案

请勿使用consume.查看我最近的其他答案:如何在Scalaz7 Iteratees中使用IO而又不会溢出堆栈?

Don't use consume. See my other recent answer: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM可能是一个更好的选择.

foldM may be a better choice.

还尝试将文件映射到其他内容(例如成功返回代码),以查看是否允许JVM垃圾收集膨胀的zip条目.

Also try to map the file to something else (like a success return code) to see if that allows the JVM to garbage collect the inflated zip entries.

这篇关于Scalaz 7 Iteratee处理大型zip文件(OutOfMemoryError)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆