score:1

Accepted answer

it doesn't sum values because there is no sum in your code. moreover your logic is wrong. mergemsg receives messages not (message, current) pairs. try something like this:

import breeze.linalg.densevector

def compareattrs(xs: seq[any], ys: seq[any]) = 
  densevector(xs.zip(ys).map{ case (x, y) => if (x == y) 1l else 0l}.toarray)

val result = graph.aggregatemessages[(long, densevector[long])](
  triplet => {
    val comparedattrs = compareattrs(triplet.dstattr, triplet.srcattr)
    triplet.sendtodst(1l, comparedattrs)
    triplet.sendtosrc(1l, comparedattrs)
  },
  { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) }
)

result.mapvalues(kv => (kv._2.map(_.todouble) / kv._1.todouble)).collect
// array(
//   (1,densevector(0.5, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)),
//   (2,densevector(0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)), 
//   (3,densevector(0.0, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)))

Related Query

More Query from same tag