score:2
Accepted answer
you could use userdefinedaggregatefunction. the code below is tested in spark 1.6.2
first create a class which extends userdefinedaggregatefunction.
import org.apache.spark.sql.row
import org.apache.spark.sql.expressions.{mutableaggregationbuffer, userdefinedaggregatefunction}
import org.apache.spark.sql.types._
class modeudaf extends userdefinedaggregatefunction{
override def datatype: datatype = stringtype
override def inputschema: structtype = new structtype().add("input", stringtype)
override def deterministic: boolean = true
override def bufferschema: structtype = new structtype().add("mode", maptype(stringtype, longtype))
override def initialize(buffer: mutableaggregationbuffer): unit = {
buffer(0) = map.empty[any, long]
}
override def update(buffer: mutableaggregationbuffer, input: row): unit = {
val buff0 = buffer.getmap[any, long](0)
val inp = input.get(0)
buffer(0) = buff0.updated(inp, buff0.getorelse(inp, 0l) + 1l)
}
override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = {
val mp1 = buffer1.getmap[any, long](0)
val mp2 = buffer2.getmap[any, long](0)
buffer1(0) = mp1 ++ mp2.map { case (k, v) => k -> (v + mp1.getorelse(k, 0l)) }
}
override def evaluate(buffer: row): any = {
lazy val st = buffer.getmap[any, long](0).tostream
val mode = st.foldleft(st.head){case (e, s) => if (s._2 > e._2) s else e}
mode._1
}
}
afterwords you could use it with your dataframe in the following manner.
val modecolumnlist = list("some", "column", "names") // or df.columns.tolist
val modeagg = new modeudaf()
val aggcols = modecolumnlist.map(c => modeagg(df(c)))
val aggregatedmodedf = df.agg(aggcols.head, aggcols.tail: _*)
aggregatedmodedf.show()
also you could use .collect on the final dataframe to collect the result in a scala data structure.
note: the performance of this solution depends on the cardinality of the input column.
Source: stackoverflow.com
Related Query
- Calculate a mode for multiple columns
- Apache Spark Dataframe Groupby agg() for multiple columns
- Failed to execute user defined function($anonfun$9: (string) => double) on using String Indexer for multiple columns
- How to do distinct on multiple columns after join and then sort and select latest for each group?
- Spark agg to collect a single list for multiple columns
- Spark Scala: moving average for multiple columns
- Spark - Iterating through all rows in dataframe comparing multiple columns for each row against another
- Calculate mean for several columns in Spark scala
- Update date format in spark dataframe for multiple spark columns
- Compare the values in multiple columns in one Dataframe with multiple rows in one single column for target dataframe for the same record in Scala?
- Spark SQL - Check for a value in multiple columns
- How to use group by for multiple columns with count?
- how to concat multiple columns in spark while getting the column names to be concatenated from another table (different for each row)
- Filtering rows on Spark for multiple columns sharing the same value
- spark UDF pattern matching for multiple columns and collection elements
- Spark - Appending multiple rows to create columns for a common column id
- groupBy Id and get multiple records for multiple columns in scala
- groupBy and get count of records for multiple columns in scala
- Exploding multiple array columns in spark for a changing input schema
- how to use spark.read.json for multiple json columns from a dataframe
- Pivot a column in Dataframe which is having multiple values for the pivoted columns
- Set all values to None for multiple columns
- Spark + scala new pipline for StringIndexer multiple columns
- Reader Monad for Dependency Injection: multiple dependencies, nested calls
- Derive multiple columns from a single column in a Spark DataFrame
- How do you update multiple columns using Slick Lifted Embedding?
- Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns
- Aggregating multiple columns with custom function in Spark
- Scala Spark DataFrame : dataFrame.select multiple columns given a Sequence of column names
- How to run sbt multiple command in interactive mode as one command?
More Query from same tag
- Akka scalability and performance benchmark testcases
- Play Framework - Schedule a task at precise time
- Scala: How to make an exception in a library to bubble up to the Application calling the library?
- Custom TypeConverters using spark cassandra connector
- Removing duplicate keys when concatenating maps in SCALA
- How to concatenate 2 values in StringBody in Gatling version 3.5.0
- Play sub-projects: how to convert to build.sbt
- How to make a POST call to self-certified server with akka-http
- Converting multiple columns from mysql recordset into Array
- Composing functions with multiple arguments
- One REPL to bind them all?
- error in initSerDe : java.lang.ClassNotFoundException class org.apache.hive.hcatalog.data.JsonSerDe not found
- Functional value, mutable state and thread-safety
- SQL DDL metadata into Spark schema metadata
- Auxiliary constructor invoking what seems to be an undefined method
- Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe
- Slick MappedColumnType
- Caused by: java.io.NotSerializableException: java.util.function.Predicate$$Lambda
- Scala^Z3: Delete previous assertion
- Is there a way to maintain ordering with Scala's breakOut?
- Future does not complete reliably in Play Framework
- Scala, Using Responder to abstract a possible Asynchronous computation
- How do I store accented characters in S3 metadata?
- JVM: How does Scala's JavaConversions work inside Java code?
- How to perform Mutation in sangria-graphql?
- How can I get automatic dependency resolution in my scala scripts?
- Intellij Idea Kotlin plugin cannot see scala case class accessors
- Is there a way to rename methods in Scala (or Java) at runtime?
- Why does groupBy in Scala change the ordering of a list's items?
- Scala SBT - External repository