score:31
following your code:
val bykey = x.map({case (id,uri,count) => (id,uri)->count})
you could do:
val reducedbykey = bykey.reducebykey(_ + _)
scala> reducedbykey.collect.foreach(println)
((a,d),1)
((a,b),2)
((c,b),1)
pairrddfunctions[k,v].reducebykey
takes an associative reduce function that can be applied to the to type v of the rdd[(k,v)]. in other words, you need a function f[v](e1:v, e2:v) : v
. in this particular case with sum on ints: (x:int, y:int) => x+y
or _ + _
in short underscore notation.
for the record: reducebykey
performs better than groupbykey
because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupbykey
will force a shuffle of all elements before grouping.
score:0
the syntax is below:
reducebykey(func: function2[v, v, v]): javapairrdd[k, v],
which says for the same key in an rdd it takes the values (which will be definitely of same type) performs the operation provided as part of function and returns the value of same type as of parent rdd.
score:6
your origin data structure is: rdd[(string, string, int)], and reducebykey
can only be used if data structure is rdd[(k, v)].
val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is rdd[((string, string), int)]
val reduced = kv.reducebykey(_ + _) // reduced is rdd[((string, string), int)]
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is rdd[(string, (string, int))]
val grouped = kv2.groupbykey() // grouped is rdd[(string, iterable[(string, int)])]
grouped.foreach(println)
Source: stackoverflow.com
Related Query
- Concatenating datasets of different RDDs in Apache spark using scala
- Using reduceByKey in Apache Spark (Scala)
- Splitting strings in Apache Spark using Scala
- Spark : Average of values instead of sum in reduceByKey using Scala
- How to read json data using scala from kafka topic in apache spark
- Convert RDD of Vector in LabeledPoint using Scala - MLLib in Apache Spark
- Convert Matrix to RowMatrix in Apache Spark using Scala
- Using Apache Spark in IntelliJ Scala Worksheet
- How to run Multi threaded jobs in apache spark using scala or python?
- How to Compile Apache Spark with Scala 2.11.1 using SBT?
- Merging RDDs using Scala Apache Spark
- How to create nested json using Apache Spark with Scala
- Failed to execute user defined function in Apache Spark using Scala
- Using Scala Pickling serialization In APACHE SPARK over KryoSerializer and JavaSerializer
- How to store the result of an action in apache spark using scala
- Getting connection error while reading data from ElasticSearch using apache Spark & Scala
- Twitter Popular Tags Using Scala Apache Spark
- How to replace nulls with empty string ("") in Apache spark using scala
- i want to store each rdd into database in twitter streaming using apache spark but got error of task not serialize in scala
- How to run Kafka as a stream for Apache Spark using Scala 2.11?
- Push Data to Nifi Flow using apache spark and scala
- apache spark stand alone connecting to mongodb with scala using casbah
- I cannot make dataframe using streaming mode for online prediction in apache spark using scala
- ReduceByKey on multidimensional tuple using scala and spark
- Apache Spark scala lowercase first letter using built-in function
- Using Scala IDE and Apache Spark on Windows
- Scala Apache Spark Filter DF Using Arbitrary Number of Bounding Boxes Read From File
- Scala version issues while using spark runtime configuration on Apache Hop
- Regex RDD using Apache Spark Scala
- Check if join stream was successful using Apache Spark - Scala
More Query from same tag
- toArray in Scala 2.10 Milestone
- Scala Play - How to Modify Cookie setting value before response goes out?
- How to indicate inherited class and use wildcards in scala partial function?
- Spark Print the Shape of my DataFrame in Scala
- How to traverse list of objects with multiple fields with circe optics
- Scala lower bound does not behave as I expect
- Pattern matching Scala (key, Tuple2) values in reduceByKey() for Apache Spark
- How to create a Row from a List or Array in Spark using Scala
- Test REST service using Scala Lift
- Scala | Spark | Invoking undefined method
- How to get count of invalid data during parse
- is there an XML equivalent of Json.reads[Foo] and Json.writes[Foo] in the Play Framework
- How do I load variables from a file back into Scala?
- When does Scala need parameter types for anonymous and expanded functions?
- Does feature parity exist between Spark and PySpark
- How to use MLeap DenseTensor in Java
- Scala: reconfigure trait using CLI parameters
- Play framework runtime error on OS X
- Spark 2.0: how to convert a RDD of Tuples to DF
- I get a java.lang.NoClassDefFoundError when i try to run the word count example in Spark
- Why does SparkSQL always return out of range value when accessing any value in MySQL table?
- Spark-shell : The number of columns doesn't match
- Type inference on contents of shapeless HList
- How to convert rdd / data frame / dataset to String
- Scala generics implementing Ordered[A[B]] gives strange compiler errors
- Scala: Detecting a Straight in a 5-card Poker hand using pattern matching
- Cake pattern with overriding abstract type don't work with Upper Type Bounds
- How do I make a variable in scala change?
- Spark: understanding partitioning - cores
- Scala amazon S3 file upload show error not found