I think the problem is that, before your code inside writeRDD(rdd) executes ( since it is in a Future ), the rdd ( or the micro-batch RDD ) is already reclaimed by Apache Spark memory-management or BlockManager.

Therefore, this error

org.apache.spark.SparkException: Job aborted due to stage failure: Task creation failed: org.apache.spark.SparkException: Attempted to use BlockRDD[820] at actorStream at Tests.scala:149 after its blocks have been removed!

You can fix this by first collecting the micro-batch collection and then passing it to writeRDD() function. Something like this:

dStreams.foreach(_.foreachRDD { rdd =>

  val coll = rdd.collect()
  Future{ writeCollection(coll) }


Related Query

More Query from same tag