score:1
there can be some difficulties working with tuples.
below you can see working code, but let me explain.
val data = array((2,(2.1463120403829962,7340)), (1,(1.4532644653720025,4280)))
def tuplesum(t1: (int, (double, int)), t2: (int, (double, int))): (int, (double, int)) =
(0,(t1._2._1 + t2._2._1, t1._2._2 + t2._2._2))
val mainmean = data.reduce(tuplesum)._2
we can introduce reduce
arguments like
data.reduce((tuple1, tuple2) => tuplesum(tuple1, tuple2))
where tuple1
is kind of accumulator. on the first iteration it takes the first value of the array, and every next value adds to the value of accumulator.
so if you want to perform reduce using pattern matching it will look like this:
val mainmean = data.reduce((tuple1, tuple2) => {
val t1 = tuple1 match { case (i, t) => t }
val t2 = tuple2 match { case (i, t) => t }
// now t1 and t2 represents inner tuples of input tuples
(0, (t1._1 + t2._1, t1._2 + t2._2))}
)
upd.
i rewrite previous listing adding type annotations and println statements. i hope it will help to get the point. and there is some explanation after.
val data = array((3, (3.0, 3)), (2,(2.0,2)), (1,(1.0,1)))
val mainmean = data.reduce((tuple1: (int, (double, int)),
tuple2: (int, (double, int))) => {
println("tuple1: " + tuple1)
println("tuple2: " + tuple2)
val t1: (double, int) = tuple1 match {
case (i: int, t: (double, int)) => t
}
val t2: (double, int) = tuple2 match {
case (i: int, t: (double, int)) => t
}
// now t1 and t2 represents inner tuples of input tuples
(0, (t1._1 + t2._1, t1._2 + t2._2))}
)
println("mainmean: " + mainmean)
and the output will be:
tuple1: (3,(3.0,3)) // 1st element of the array
tuple2: (2,(2.0,2)) // 2nd element of the array
tuple1: (0,(5.0,5)) // sum of 1st and 2nd elements
tuple2: (1,(1.0,1)) // 3d element
mainmean: (0,(6.0,6)) // result sum
tuple1
and tuple2
type is (int, (double, int))
. we know it always be only this type, that is why we use only one case in pattern matching. we unpack tuple1 to i: int
and t: (int, double)
. as far as we are not interested in key, we return only t. now t1
is representing the inner tuple of tuple1
. the same story with tuple2
andt2
.
you can find more information about fold functions here and here
Source: stackoverflow.com
Related Query
- spark rdd, need to reduce over (key,(tuple))
- Scala Spark - Reduce RDD by adding multiple values per key
- Need help to group by then sort by value on an rdd at apache spark via scala
- Spark Iterating RDD over another RDD with filter conditions Scala
- Do Spark RDD tuple fields need to be serializable? Mahout Drm seems to say no
- Spark Scala - Need to iterate over column in dataframe
- How to override the hash function for tuple in reduce rdd for spark 2.4.3?
- Spark need an RDD.take with a big argument. Result should be an RDD
- (Why) do we need to call cache or persist on a RDD
- How to convert rdd object to dataframe in spark
- What is RDD in spark
- Using Spark to write a parquet file to s3 over s3a is very slow
- Order by value in spark pair RDD
- Why do we need to add "fork in run := true" when running Spark SBT application?
- How to create a Spark Dataset from an RDD
- RDD Aggregate in spark
- Spark RDD default number of partitions
- Modify collection inside a Spark RDD foreach
- Why would I want .union over .unionAll in Spark for SchemaRDDs?
- How to transpose an RDD in Spark
- Why does Spark RDD partition has 2GB limit for HDFS?
- Filter based on another RDD in Spark
- How to convert spark DataFrame to RDD mllib LabeledPoints?
- Spark converting a Dataset to RDD
- spark error RDD type not found when creating RDD
- Spark RDD equivalent to Scala collections partition
- Mock a Spark RDD in the unit tests
- Spark - scala: shuffle RDD / split RDD into two random parts randomly
- joda DateTime format cause null pointer error in spark RDD functions
- Why does the Spark DataFrame conversion to RDD require a full re-mapping?
More Query from same tag
- Classic actor system sending messages to a akka typed actor
- Scala filter list on condition
- Why does setting multiple vals within a for expression not result in an exception?
- Composing functions with multiple arguments
- How to run Lift's 'Ecommerce example' app
- Is there a way for a Scala base class to get informed when all the derived classes have been constructed? (guess not)
- Scala method that needs either one of two implicit parameters
- Extracting certain fields from scala object <-> Json
- How does the storeAs ... triggeredBy work in simple-build-tool?
- Scala/Spark - How to get first elements of all sub-arrays
- Exception Index Out Of Bounds when using deleteCharAt with StringBuilder
- How to troubleshoot scala3 program compilation error on Windows 7
- scala maven plugin not packaging scala files into jar
- Connection from local machine installed Zeppelin to Docker Spark cluster
- Rewrite a nested flatmap expression with an if and else clause to a for comprehension
- sbt package in spark throwing below error
- High performance wrapper for Scala-to-Java collections conversion
- Remove option from mutable.Buffer
- How to wrap procedural algorithms in OOP language
- YAML data format for mongodb collections and referenced entities
- Unable to run transform in Mleap runtime from Spark model
- Scala collection of strings - a way to output strings without double quotes
- How to load implicit Manifest from polymorphic type field
- How does regex capturing work in scala?
- How to create a singleton object in Scala with runtime params
- Polling with Akka-Http stream
- BDD in Scala - Does it have to be ugly?
- Scala Parsers: Availability, Differences and Combining?
- I'm having a error while building a Cube in Apache Kylin
- Scalding: Fill in Discontinuity in Rows