score:4

Accepted answer

In general you can't safely use any OutputStream with the Enumerator/Iteratee framework because OutputStream doesn't support non-blocking pushback. However, if you can control the writing to the OutputStream you can hack together something like:

val baos = new ByteArrayOutputStream
val zos = new ZipOutputStream(baos)

val enumerator = Enumerator.generateM {
  Future.successful {
    if (moreDateToWrite) {
      // Write data into zos
      val r = Some(baos.toByteArray)
      baos.reset()
      r
    } else None
  }
}

If all you need is compression, take a look at the Enumeratee instances provided in play.filters.gzip.Gzip and the play.filters.gzip.GzipFilter filter.

score:0

The only backpressure mechanism for OutputStream is blocking the thread. So one way or another, there will have to be a thread that is able to be blocked.

One way is to use piped streams.

import java.io.OutputStream
import java.io.PipedInputStream
import java.io.PipedOutputStream
import play.api.libs.iteratee.Enumerator
import scala.concurrent.ExecutorContext

def outputStream2(a: OutputStream => Unit, bufferSize: Int)
    (implicit ec1: ExecutionContext, ec2: ExecutionContext) = {
  val outputStream = new PipedOutputStream
  Future(a(outputStream))(ec1)
  val inputStream = new PipedInputStream(pipedOutputStream, bufferSize)
  Enumerator.fromStream(inputStream)(ec2)
}

Since the operations are blocking, you must take care to prevent deadlock.

Either use two different thread pools, or used a cached (unbounded) thread pool.


Related Query

More Query from same tag