score:0

I have tried to increase stack size, localize the utility functions, using spark.util.Vector, but unfortunately none of them work it out. Then I tried to downgrade Spark from 0.7.2 to 0.6.3 (https://github.com/mesos/spark/tree/branch-0.6). And it works and no more Stack Overflow even for a 10,000 by 10,000 matrix. I don't know how exactly it fix it, so I post the difference between the reduce function in RDD.scala:

--- spark-0.6.3/core/src/main/scala/spark/RDD.scala 2013-06-27 11:31:12.628017194 -0700
+++ spark-0.7.2/core/src/main/scala/spark/RDD.scala 2013-06-27 13:42:22.844686240 -0700
@@ -316,39 +468,93 @@
   def reduce(f: (T, T) => T): T = {
     val cleanF = sc.clean(f)
+    // println("RDD.reduce: after sc.clean")
     val reducePartition: Iterator[T] => Option[T] = iter => {
       if (iter.hasNext) {
         Some(iter.reduceLeft(cleanF))
-      }else {
+      } else {
         None
       }
     }
-    val options = sc.runJob(this, reducePartition)
-    val results = new ArrayBuffer[T]
-    for (opt <- options; elem <- opt) {
-      results += elem
-    }
-    if (results.size == 0) {
-      throw new UnsupportedOperationException("empty collection")
-    } else {
-      return results.reduceLeft(cleanF)
+    // println("RDD.reduce: after reducePartition")
+    var jobResult: Option[T] = None
+    val mergeResult = (index: Int, taskResult: Option[T]) => {
+      if (taskResult != None) {
+        jobResult = jobResult match {
+          case Some(value) => Some(f(value, taskResult.get))
+          case None => taskResult
+        }
+      }
     }
+    // println("RDD.reduce: after jobResult")
+    sc.runJob(this, reducePartition, mergeResult)
+    // println("RDD.reduce: after sc.runJob")
+    // Get the final result out of our Option, or throw an exception if the RDD was empty
+    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+    // println("RDD.reduce: finished")
   }

   /**
    * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to 
+   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
    * modify t1 and return it as its result value to avoid object allocation; however, it should not
    * modify t2.
    */
   def fold(zeroValue: T)(op: (T, T) => T): T = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
     val cleanOp = sc.clean(op)
-    val results = sc.runJob(this, (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp))
-    return results.fold(zeroValue)(cleanOp)
+    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+    sc.runJob(this, foldPartition, mergeResult)
+    jobResult
   }

Related Query

More Query from same tag