score:1
Accepted answer
it doesn't sum values because there is no sum in your code. moreover your logic is wrong. mergemsg
receives messages not (message
, current
) pairs. try something like this:
import breeze.linalg.densevector
def compareattrs(xs: seq[any], ys: seq[any]) =
densevector(xs.zip(ys).map{ case (x, y) => if (x == y) 1l else 0l}.toarray)
val result = graph.aggregatemessages[(long, densevector[long])](
triplet => {
val comparedattrs = compareattrs(triplet.dstattr, triplet.srcattr)
triplet.sendtodst(1l, comparedattrs)
triplet.sendtosrc(1l, comparedattrs)
},
{ case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) }
)
result.mapvalues(kv => (kv._2.map(_.todouble) / kv._1.todouble)).collect
// array(
// (1,densevector(0.5, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)),
// (2,densevector(0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)),
// (3,densevector(0.0, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)))
Source: stackoverflow.com
Related Query
- how to compute vertex similarity to neighbors in graphx
- How to compute the sum of degree of two vertexs in each edge in graphx
- How to modify vertex data when calling the mapTriplets method in Graphx of Spark
- How to find intersection between a specific node and its neighbors in Spark GraphX with Scala
- how to find the number of vertex that are reachable from a given vertex in Spark GraphX
- How to join in spark graphx given multiple vertex types
- How to compute cumulative sum using Spark
- How to compute the inverse of a RowMatrix in Apache Spark?
- how to build a graph from tuples in graphx and label the nodes after ?
- How to compute the mean with Apache spark?
- Graphx : Is it possible to execute a program on each vertex without receiving a message?
- How to compute statistics on a streaming dataframe for different type of columns in a single query?
- How to create UDF from Scala methods (to compute md5)?
- How to create a VertexId in Apache Spark GraphX using a Long data type?
- Spark ml cosine similarity: how to get 1 to n similarity score
- ZIO : How to compute only once?
- How to compute the sum of orders over a 12 months period sliding by 1 month per customer in Spark
- How to compute inverse of a multi-map
- How to check if an edge exist in a Spark Graphx graph
- Vertex Property Inheritance - Graphx Scala Spark
- How to construct graph in graphx
- How to filter a mixed-node graph on neighbor vertex types
- How does the lazy 'take' function compute the Scala stream further?
- How to convert a Map containing (vertexId,edgeId) into GraphX RDDs
- Scala -- how to create map from raw data to compute distinct values
- How to Parallel Prims Algorithm in Graphx
- How to compute a cumulative sum under a limit with Spark?
- How does GraphX internally traverse the Graph?
- How to query using vertexId if vertex is made using composite key in scala gremline
- How to get all vertices of all outgoing edges from a vertex scala gremlin
More Query from same tag
- What are the options to set base packaged directory for a package using sbt-native-packager?
- How to process messages in avro format from Kafka?
- How to turn a known structured RDD to Vector
- How to upsert or partial updates with script documents in ElasticSearch with Spark?
- how to check avro schema registry usage
- Update an application running in a compute instance on Azure
- Flink Kryo serializer because Chill serializer couldn't be found
- No such method running forEach in Scala job on Apache Spark
- Extract a nested array from a Spark SQL Row inside a UDF
- error: not found: value StructType/StructField/StringType
- Avoid "Path does not exist" in dir based spark load
- How to restart stack trace when rethrowing an exception?
- Sending back to sender, from supervisor, in case of failure
- how to accumulate 2D array elements in Scala?
- Changing the values in a scala map
- How to start on scala
- Companion objects benefits of posibility to implement interfaces
- Difference between Object and AnyRef in Scala
- Object type inference in Scala while implementing traits
- Preserve null values in array of Play framework form mapping
- scala-library.jar version in sbt published artifacts
- Why does `map` on a Directive sometimes wrap the result in a tuple?
- Two code sum solution provided in stackoverflow doesnt works even on IntelliJ
- Apache Spark (Scala): How do I grab a single element and sub-elements from a JSON RDD and store it in a new RDD?
- Are recursive computations with Apache Spark RDD possible?
- Asynchronous assertion is not firing in scalatest
- Deadlocks using Akka actors as a mailbox only
- Error while finding lines starting with H or I using Scala
- Looking for help in Nested groupBy with scalikejdbc?
- How to conditionally add route paths in akka-http?