score:3

Accepted answer

I have done this in past, Amazon s3 needs 5Mb chunks, I was returning tuple at last, you could change as per your requirement.

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  case Multipart.FileInfo(partName, file_name, content_type) => {

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
    var position = 0
    val etags = new java.util.ArrayList[PartETag]()

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
    val initResponse = client.initiateMultipartUpload(initRequest)
    println("fileName = " + file_name)
    println("contentType = " + content_type)

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
      Future {
        println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
        val is = new java.io.ByteArrayInputStream(bytes)

        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket).withKey(object_id_key)
          .withUploadId(initResponse.getUploadId())
          .withPartNumber(c)
          .withFileOffset(position)
          .withInputStream(is)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        position = position + bytes.length

        c + 1
      }
    }).map { v =>
      try {
        val compRequest = new CompleteMultipartUploadRequest(
          bucket,
          object_id_key,
          initResponse.getUploadId(),
          etags)
        client.completeMultipartUpload(compRequest)
        println("Finished uploading " + file_name)   
        client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
      } catch {
        case e: Exception => {
          println("S3 upload Ex " + e.getMessage())
          val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         ("error", file_name, content_type.getOrElse("application/octet-stream"))
        }
      }
    }
  }
}))

}


Related Query

More Query from same tag