使用Scalaz 7 zipWithIndex/group枚举避免内存泄漏 [英] Avoiding memory leaks with Scalaz 7 zipWithIndex/group enumeratees
问题描述
背景
如此问题中所述,我正在使用Scalaz 7迭代器进行处理恒定堆空间中的大量(即无边界)数据流.
As noted in this question, I'm using Scalaz 7 iteratees to process a large (i.e., unbounded) stream of data in constant heap space.
我的代码如下:
type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]
def processChunk(c: Chunk, idx: Long): Result
def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))
问题
我似乎遇到了内存泄漏,但是我对Scalaz/FP不够熟悉,无法知道该错误是在Scalaz中还是在我的代码中.直观地讲,我希望这段代码只需要(按顺序) P 乘以Chunk
大小的空间即可.
I seem to have run into a memory leak, but I'm not familiar enough with Scalaz/FP to know whether the bug is in Scalaz or in my code. Intuitively, I expect this code to require only (on the order of) P times the Chunk
-size space.
注意:我发现了类似的问题其中遇到了OutOfMemoryError
,但是我的代码未使用consume
.
Note: I found a similar question in which an OutOfMemoryError
was encountered, but my code is not using consume
.
测试
我进行了一些测试以尝试找出问题所在.总而言之,仅当同时使用zipWithIndex
和group
时,才会出现泄漏.
I ran some tests to try and isolate the problem. To summarize, the leak only appears to arise when both zipWithIndex
and group
are used.
// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296
// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296
// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space
// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296
// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184
// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184
测试代码:
import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._
// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))
// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}
// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}
// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}
// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}
问题
- 代码中的错误吗?
- 如何在恒定的堆空间中完成这项工作?
推荐答案
对于那些使用较旧的iteratee
API的人来说,这简直就是安慰,但是我最近验证了针对 scalaz-stream API .这是一种较新的流处理API,旨在替代iteratee
.
This will come as little consolation for anyone who's stuck with the older iteratee
API, but I recently verified that an equivalent test passes against the scalaz-stream API. This is a newer stream processing API that is intended to replace iteratee
.
为完整起见,下面是测试代码:
For completeness, here's the test code:
// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
(Process emit Array.fill(sz)(0)).repeat take n
(streamArrs(1 << 25, 1 << 14).zipWithIndex
pipe process1.chunk(4)
pipe process1.fold(0L) {
(c, vs) => c + vs.map(_._1.length.toLong).sum
}).runLast.run
这对于n
参数的任何值都可以使用(假设您愿意等待足够长的时间)–我测试了2 ^ 14个32MiB阵列(即,随着时间的推移总共分配了一半TiB的内存) ).
This should work with any value for the n
parameter (provided you're willing to wait long enough) -- I tested with 2^14 32MiB arrays (i.e., a total of half a TiB of memory allocated over time).
这篇关于使用Scalaz 7 zipWithIndex/group枚举避免内存泄漏的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!