score:1

as far as i can tell all you need here is aggregate with zerovalue, seqop and combop corresponding to the operations which are performed by your accumulators.

val zerovalue: (a, b, c) = ??? // (accum1.zero, accum2.zero, accum3.zero)

def seqop(r: (a, b, c), t: t): (a, b, c) = r match {
  case (a, b, c) =>  {
     // apply operations equivalent to
     // accum1.addaccumulator(a, t)
     // accum2.addaccumulator(c, t))
     // accum3.addaccumulator(c, t)
     // and return the first argument
     // r
  }
}

def combop(r1: (a, b, c), r2: (a, b, c)): (a, b, c) = (r1, r2) match {

  case ((a1, b1, c1), (a2, b2, c2)) => {
     // apply operations equivalent to
     // acc1.addinplace(a1, a2)
     // acc2.addinplace(b1, b2)
     // acc3.addinplace(c1, c2)
     // and return the first argument
     // r1
  }
}

val rdd: rdd[t] = ???

val accums: (a, b, c) = rdd.aggregate(zerovalue)(seqop, combop)

Related Query

More Query from same tag