使用 Scalaz 7 zipWithIndex/group 枚举数避免内存泄漏

背景资料

正如在 这个问题中指出的,我使用 Scalaz 7迭代程序在常量堆空间中处理大量(即无界)数据流。

我的代码是这样的:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A]
type ErrorOr[A] = ErrorOrT[IO, A]


def processChunk(c: Chunk, idx: Long): Result


def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =
Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>
rs ++ vs map {
case (c, i) => processChunk(c, i)
}
} &= (data.zipWithIndex mapE Iteratee.group(P))

问题

我似乎遇到了内存泄漏,但我对 Scalaz/FP 还不够熟悉,不知道这个 bug 是在 Scalaz 还是在我的代码中。凭直觉,我希望这段代码只需要(按顺序) P乘以 Chunk大小的空间。

注意: 我发现 一个类似的问题中遇到了 OutOfMemoryError,但是我的代码没有使用 consume

测试

我进行了一些测试,试图分离出问题。总之,只有在同时使用 zipWithIndexgroup时才会出现泄漏。

// no zipping/grouping
scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO
res47: Long = 4294967296


// grouping only
scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO
res49: Long = 4294967296


// zipping and grouping
scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO
java.lang.OutOfMemoryError: Java heap space


// zipping only
scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO
res51: Long = 4294967296


// no zipping/grouping, larger arrays
scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO
res53: Long = 17179869184


// zipping only, larger arrays
scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO
res54: Long = 17179869184

测试代码:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._


// define an enumerator that produces a stream of new, zero-filled arrays
def enumArrs(sz: Int, n: Int) =
Iteratee.enumIterator[Array[Int], IO](
Iterator.continually(Array.fill(sz)(0)).take(n))


// define an iteratee that consumes a stream of arrays
// and computes its length
val i1 = Iteratee.fold[Array[Int], IO, Long](0) {
(c, a) => c + a.length
}


// define an iteratee that consumes a grouped stream of arrays
// and computes its length
val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {
(c, as) => c + as.map(_.length).sum
}


// define an iteratee that consumes a grouped/zipped stream of arrays
// and computes its length
val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {
(c, vs) => c + vs.map(_._1.length).sum
}


// define an iteratee that consumes a zipped stream of arrays
// and computes its length
val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {
(c, v) => c + v._1.length
}

问题

  • 我的代码有漏洞吗?
  • 如何在常量堆空间中实现这一点?
3441 次浏览

This will come as little consolation for anyone who's stuck with the older iteratee API, but I recently verified that an equivalent test passes against the scalaz-stream API. This is a newer stream processing API that is intended to replace iteratee.

For completeness, here's the test code:

// create a stream containing `n` arrays with `sz` Ints in each one
def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =
(Process emit Array.fill(sz)(0)).repeat take n


(streamArrs(1 << 25, 1 << 14).zipWithIndex
pipe process1.chunk(4)
pipe process1.fold(0L) {
(c, vs) => c + vs.map(_._1.length.toLong).sum
}).runLast.run

This should work with any value for the n parameter (provided you're willing to wait long enough) -- I tested with 2^14 32MiB arrays (i.e., a total of half a TiB of memory allocated over time).