score:0
assuming each id doesn't have too many matching records, you can use the collect_list
aggregation function to collect all matching arrays into an array-of-arrays, and then a user defined function to sum over these nested arrays:
val flattenandsum = udf[int, mutable.seq[mutable.seq[int]]] { seqofarrays => seqofarrays.flatten.sum }
val sums = df.groupby($"id").agg(
collect_list($"event.x0") as "arr0",
collect_list($"event.x1") as "arr1",
collect_list($"event.x2") as "arr2",
collect_list($"event.x3") as "arr3"
).select($"id",
flattenandsum($"arr0") as "0",
flattenandsum($"arr1") as "1",
flattenandsum($"arr2") as "2",
flattenandsum($"arr3") as "3"
)
df.join(sums, "id")
alternatively, if that assumption cannot be made, you can create a user defined aggregation function to perform the flatten-and-sum on the fly. this is safer and potentially faster but requires a bit more work:
// implement a udaf:
class flattenandsum extends userdefinedaggregatefunction {
override def inputschema: structtype = new structtype().add("arr", arraytype(integertype))
override def bufferschema: structtype = new structtype().add("sum", integertype)
override def datatype: datatype = integertype
override def deterministic: boolean = true
override def initialize(buffer: mutableaggregationbuffer): unit = buffer.update(0, 0)
override def update(buffer: mutableaggregationbuffer, input: row): unit = {
val current = buffer.getas[int](0)
val toadd = input.getas[seq[int]](0).sum
buffer.update(0, current + toadd)
}
override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = {
buffer1.update(0, buffer1.getas[int](0) + buffer2.getas[int](0))
}
override def evaluate(buffer: row): any = buffer.getas[int](0)
}
// use it in aggregation:
val flattenandsum = new flattenandsum()
val sums = df.groupby($"id").agg(
flattenandsum($"event.x0") as "0",
flattenandsum($"event.x1") as "1",
flattenandsum($"event.x2") as "2",
flattenandsum($"event.x3") as "3"
)
df.join(sums, "id")
Source: stackoverflow.com
Related Query
- Explode multiple nested columns, perform agg and join all the tables
- Join two spark Dataframe using the nested column and update one of the columns
- How to do distinct on multiple columns after join and then sort and select latest for each group?
- How to join two tables and map the result to a case class in slick
- Best way to handle Spark Scala API cross join leading to same columns names for both the right and left data frames
- How do I explode multiple columns of arrays in a Spark Scala dataframe when the columns contain arrays that line up with one another?
- How to partition a dataframe on multiple columns and write the output to xlsx in Apache Spark
- Scala how to match two dfs if mathes then update the key in first df and select all columns from required df
- How to join two dataframes in Scala and select on few columns from the dataframes by their index?
- scala dataframe join columns and split arrays explode spark
- How to explode multple columns to multiple rows and add an additional column, based on exploded ones?
- i want to join two datasets with rdd in spark scala and save results to file. It was executing in all nodes but the last task running forever
- Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe
- Scala - Spark : Operate on all the columns of DF at once and create a case-class for each column efficiently
- Parse the XML column into multiple columns and transpose into rows based on count in Spark DataFrame
- Spark dataframe convert all the columns into json format and then modify json structure
- Play Framework: Composing multiple ActionTransformers - can I add fields in multiple transformers and access them all in the invokeBlock?
- Best way to have nested tuples or nested columns in the Spark and filter by or group by nested column
- how to merge the multiple columns in single columns using UDF and remove the 0 value row from the column in pyspark
- [spark-scalapi]calculate correlation between multiple columns and one specific column after groupby the spark data frame
- Fetching all columns from one and some from the other
- How to perform aggregation (sum) on different columns and group the result based on another column of a spark dataframe?
- Spark : How to filter columns and perform operations and eventually write all columns to a file?
- Send multiple columns in Spark Dataframe to an external API and store the result in a separate column
- How to write withColumnRenamed for all columns and join two different schema in custom partition in spark data frame
- What's the difference between multiple parameters lists and multiple parameters per list in Scala?
- What's the difference between join and cogroup in Apache Spark
- Spark/Scala repeated calls to withColumn() using the same function on multiple columns
- What's the difference between currying and multiple parameter lists?
- How to return all positives and the first negative number in a list using functional programming?
More Query from same tag
- Filtering inside `for` with pattern matching
- variable sized inputs of different types
- Scala convert string between two charsets
- java.lang.NoClassDefFoundError: com/typesafe/config/ConfigFactory when packaging Scala project with Maven
- Type mismatch with generics sum type
- ScalaTest: understanding the order of execution of tests
- Merging streams in scala
- Transforming Form Field to Object on Submit
- Spark job running out of heap memory on takeSample
- Compiler warning for seq in build.sbt
- Read two different files from Scala Spark in single code
- not found: value assertThrows
- Use map elements as method arguments
- Difference between Java Optional and Scala Option
- Is there support for compression in ReactiveMongo?
- Scala passing a case class as a function
- Given an object, how can I instantiate a new object whose class is the same plus one additional trait
- Generate possible combinations
- Generating random values with Scala
- Spark UDAF with ArrayType as bufferSchema performance issues
- Compatible buildpacks for Scala Playframework application
- com.databricks.spark.csv version requirement
- Why is private constructor still visible in case class?
- Getting `Any` instead of Seq[DataFrame]
- Error : Unable to load Main Class from JAR file on running spark-submit command
- Parse response to JSON format
- Transform a collect_list column to other data type column using UDF in spark streaming
- Scala: Is there a way where type aliases can be treated as distinct from the type that they alias?
- Spray, Akka, Scala approach to testing full actor system
- Scala Syntax To Initialize Anonymous Type First