Accepted answer
  1. You should merge the array of RDDS into one RDD (line 1)
  2. Group them by the String value (line 2)
  3. I see that the expected output is sorted, if it is required you can sort the values (line 3)

val mergeIntoOne: RDD[(String, Double)] = array.fold(sparkSession.sparkContext.emptyRDD[(String, Double)])(_ ++ _) val groupByKeys: RDD[(String, Iterable[Double])] = mergeIntoOne.groupByKey() val sortedValues = groupByKeys.mapValues(_.toList.sorted)


it depends on where you want to use it but you can use a for loop and union the arrays

scala> var a = Array(("a1",1.1))
a: Array[(String, Double)] = Array((a1,1.1))

scala> var b = Array(("a2",1.2))
b: Array[(String, Double)] = Array((a2,1.2))

scala>  for (i <- 0 to b.length) {
 |  a = a:+b(i)}

scala> a
res2: Array[(String, Double)] = Array((a1,1.1), (a2,1.2))


Assume you don't have duplicated keys in each RDD, then you could try foldLeft on the Array[RDD] with fullOuterJoin on all the rdds:

val rdd1 = sc.parallelize(Seq(("x", 1.0), ("y", 2.0)))
val rdd2 = sc.parallelize(Seq(("x", 3.0), ("y", 4.0)))
val rdd3 = sc.parallelize(Seq(("x", 5.0), ("y", 6.0)))

val rdds = Array(rdd1, rdd2, rdd3)

val startRdd = sc.parallelize(Seq[(String, Seq[Option[Double]])]())

    (rdd1, rdd2) => rdd1.fullOuterJoin(rdd2).mapValues(
        p => p._1.getOrElse(Seq[Option[Double]]()) :+ p._2
 ).mapValues(_.collect{ case Some(x) => x }).collect)
// res15: Array[(String, Seq[Double])] = Array((x,List(1.0, 3.0, 5.0)), (y,List(2.0, 4.0, 6.0)))

Related Query

More Query from same tag