score:0
I think you can use aggregateMessages() twice.
In first step, collect inDegrees to every node, and store them.
in second step, collect inDegrees info in node to the final node.
OR,you can use outerJoinVertices() first, like:
yourGraph = followerGraph.outerJoinVertices(
yourGraph.ops().inDegrees(),
new setDegreesMsg()//use the function to combine inDegrees info with yourGraph
);
And then, use aggregateMessages() to collect inDegrees info in neighbor, like:
degrees:RDD[(VertexId, Int)] = yourGraph.aggregateMessages(
new getInfoSendMsg(),
new getInfoMergeMsg()
);
Source: stackoverflow.com
Related Query
- How to calculate indegree in GraphX with 2 step connections
- How can I calculate exact median with Apache Spark?
- How to pool the connections of mongodb with casbah?
- How do you mixin functionality to each step of an iterative procedure with Scala?
- How to parse a huge file with csv data and calculate average on one of its column in plain Scala?
- How to calculate the current row with the next one?
- How to create Directional graph with Spark Graphx or Graphframe
- How to calculate Binary Classification Metrics in Spark MLlib with Dataframe API
- OrientDB - How do I insert a document with connections to multiple other documents?
- how to calculate unixtime stamp with ttl in scala
- How to find intersection between a specific node and its neighbors in Spark GraphX with Scala
- How to create a graph from a list with Spark Graphx
- how to sum edge weights with graphx
- How to use spark and mongo with play to calculate prediction?
- How do i create a Graph in GraphX with this
- How to calculate the std in scala with high order functions
- how calculate the sum of two adjacent numbers in a RDD with spark/scala?
- SparkSQL, How to calculate with dynamic time window?
- How to calculate closest total price from item prices collection with multiple items
- How to calculate join in Spark with top N failed matches efficiently?
- How to calculate TP, TN, FP and FN with spark and scala when I have predictions and ground truth file ( original graph )?
- How to calculate time difference with nscala_time in Spark
- How to calculate value from one column between row 1 to row N, with Scala/spark data frame
- How to get started with Akka Streams?
- How to create SBT project with IntelliJ Idea?
- How to update a mongo record using Rogue with MongoCaseClassField when case class contains a scala Enumeration
- How to convert an Int to a String of a given length with leading zeros to align?
- How to do an instanceof check with Scala(Test)
- How to get the last date of a particular month with JodaTime?
- How to create an empty DataFrame with a specified schema?
More Query from same tag
- spring/scala: Possible to autowire bean dependencies?
- In Scala, why can't I explicitly use a parameter type here?
- How to solve method delegation with scala implicits
- Usage of Keep.right / Keep.left within Akka Streams does not affect the result
- Why is my source producing more than the buffer size?
- an idiomatic way to initialize a Scala ArrayBuffer?
- Understanding Firestore's Underlying Serialisation Mechanism
- How to reference a nested scala object to dynamically set configs on logback.groovy?
- How can I remove existing word from one column to another?
- Can't apply pattern by key in KeyedStream with Flink CEP
- How do I convert an ApiFuture to a ZIO Task?
- Use of java image in pythonic OpenCV
- On scala project - Getting error GC overhead limit exceeded when running sbt test command
- Scala deserialization of custom scala object with play-json library
- Scala: How to find matching items in a List
- Univocity parser - Iterator way generating scala case classes
- joda DateTime format cause null pointer error in spark RDD functions
- Spark 1.6.0 DataFrame selfjoin issue
- Does Scala compiler calls javac when compiling Java libraries imported in a Scala program?
- <error> appearing in documentation produced by sbt doc
- Out of Memory Error when Reading large file in Spark 2.1.0
- Calculate row mean, ignoring NAs in Spark Scala
- Groovy vs Scala for internal DSL
- Creating three dimensional Array with arbitrary type and map in Scala
- Play's Hot Reloading and Neo4j Embedded takes too long
- Scala short and type safe cast operator
- Understanding scala binary compatibility on my example
- How to apply a separate function to each index of a known Shapelss HList
- PlayFramework(2.6.12): Use JKS file in conf folder(relative path) in HTTPS mode
- Implicit with more specific type not getting picked up