Scalaz 7 Iteratee处理大型zip文件(OutOfMemoryError) [英] Scalaz 7 Iteratee to process large zip file (OutOfMemoryError)
问题描述
我正在尝试使用scalaz iteratee包在恒定空间中处理大型zip文件.我需要对zip文件中的每个文件执行一个长期运行的过程.这些进程可以(并且应该)并行运行.
I'm trying to use the scalaz iteratee package to process a large zip file in constant space. I have a long-running process I need to perform on each file in the zip file. Those processes can (and should) be run in parallel.
我创建了一个EnumeratorT
,将每个ZipEntry
膨胀为一个File
对象.签名如下:
I created an EnumeratorT
that inflates each ZipEntry
into a File
object. The signature looks like:
def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO]
我想附加一个IterateeT
,它将对每个文件执行长时间运行的过程.我基本上得到这样的结果:
I want to attach an IterateeT
that will perform the long-running process on each file. I basically end up with something like:
type IOE[A] = IoExceptionOr[A]
def action(f:File):IO[List[Promise[IOE[File]]]] = (
consume[Promise[IOE[File]], IO, List] %=
map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] =
Promise { Thread.sleep(5000); iof }
当我尝试运行它时:
action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get
我收到一条java.lang.OutOfMemoryError: Java heap space
消息.这对我来说很有意义,因为它试图在所有这些IO
和Promise
对象的内存中建立一个庞大的列表.
I get a java.lang.OutOfMemoryError: Java heap space
message. That makes sense to me, since it's trying to build up a massive list in memory of all these IO
and Promise
objects.
几个问题:
- 有人对如何避免这种情况有任何想法吗?感觉我在错误地解决问题,因为我真的只在乎
longRunningProcess
的副作用. - 这里的
Enumerator
方法是错误的方法吗?
- Does anyone have any ideas on how to avoid this? It feels like I'm approaching the problem incorrectly, because I really only care about the
longRunningProcess
for its side-effects. - Is the
Enumerator
approach here the wrong approach?
我几乎没有主意,所以任何事情都会有所帮助.
I'm pretty much out of ideas, so anything will help.
谢谢!
更新#1
这是堆栈跟踪:
[error] java.lang.OutOfMemoryError: Java heap space
[error] at scalaz.Free.flatMap(Free.scala:46)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61)
[error] at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222)
[error] at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62)
我目前正在接受nadavwr的建议,以确保一切正常.我将报告所有更新.
I am currently taking the advice of nadavwr to ensure everything is acting like I think it is. I will report back any updates.
更新#2
使用以下两个答案中的想法,我找到了一个不错的解决方案.正如huynhjl所建议的那样(并且我使用nadavwr的分析堆转储的建议进行了验证),consume
导致每个膨胀的ZipEntry
都保留在内存中,这就是进程耗尽内存的原因.我将consume
更改为foldM
,并更新了长时间运行的过程,仅返回了Promise[IOE[Unit]]
而不是对该文件的引用.这样一来,我最后便拥有了所有IoExceptions的集合.这是有效的解决方案:
Using ideas from both the answers below, I found a decent solution. As huynhjl suggested (and I verified using nadavwr's suggestion of analyzing the heap dump), consume
was causing every inflated ZipEntry
to be held in memory, which is why the process was running out of memory. I changed consume
to foldM
and updated the long-running process to just return a Promise[IOE[Unit]]
instead of a reference to the file. That way I have a collection of all IoExceptions at the end. Here is the working solution:
def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %=
map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %=
map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &=
enumZipFile(f)
).run
def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] =
Promise { Thread.sleep(5000); iof.map(println) }
此解决方案会在异步上传它们时夸大每个条目.最后,我有一个庞大的已实现Promise
对象列表,其中包含任何错误.我仍然不完全相信这是对Iteratee的正确使用,但是我现在确实有几个可重用,可组合的部分,可以在我们系统的其他部分中使用(这对我们来说是很常见的模式).
This solution inflates each entry while asynchronously uploading them. At the end, I have a huge list of fulfilled Promise
objects that contain any errors. I still not fully convinced this is the correct use of an Iteratee, but I do now have several reusable, composeable pieces that I can use in other pieces of our system (this is a very common pattern for us).
感谢您的所有帮助!
推荐答案
请勿使用consume
.查看我最近的其他答案:如何在Scalaz7 Iteratees中使用IO而又不会溢出堆栈?
Don't use consume
. See my other recent answer: How to use IO with Scalaz7 Iteratees without overflowing the stack?
foldM
可能是一个更好的选择.
foldM
may be a better choice.
还尝试将文件映射到其他内容(例如成功返回代码),以查看是否允许JVM垃圾收集膨胀的zip条目.
Also try to map the file to something else (like a success return code) to see if that allows the JVM to garbage collect the inflated zip entries.
这篇关于Scalaz 7 Iteratee处理大型zip文件(OutOfMemoryError)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!