score:2

Accepted answer

you could use userdefinedaggregatefunction. the code below is tested in spark 1.6.2

first create a class which extends userdefinedaggregatefunction.

import org.apache.spark.sql.row
import org.apache.spark.sql.expressions.{mutableaggregationbuffer, userdefinedaggregatefunction}
import org.apache.spark.sql.types._

class modeudaf extends userdefinedaggregatefunction{

  override def datatype: datatype = stringtype

  override def inputschema: structtype = new structtype().add("input", stringtype)

  override def deterministic: boolean = true

  override def bufferschema: structtype = new structtype().add("mode", maptype(stringtype, longtype))

  override def initialize(buffer: mutableaggregationbuffer): unit = {
    buffer(0) = map.empty[any, long]
  }

  override def update(buffer: mutableaggregationbuffer, input: row): unit = {
    val buff0 = buffer.getmap[any, long](0)
    val inp = input.get(0)
    buffer(0) = buff0.updated(inp, buff0.getorelse(inp, 0l) + 1l)
  }

  override def merge(buffer1: mutableaggregationbuffer, buffer2: row): unit = {
    val mp1 = buffer1.getmap[any, long](0)
    val mp2 = buffer2.getmap[any, long](0)

    buffer1(0) = mp1 ++ mp2.map { case (k, v) => k -> (v + mp1.getorelse(k, 0l)) }
  }

  override def evaluate(buffer: row): any = {
    lazy val st = buffer.getmap[any, long](0).tostream
    val mode = st.foldleft(st.head){case (e, s) => if (s._2 > e._2) s else e}
    mode._1
  }

}

afterwords you could use it with your dataframe in the following manner.

val modecolumnlist = list("some", "column", "names") // or df.columns.tolist
val modeagg = new modeudaf()
val aggcols = modecolumnlist.map(c => modeagg(df(c)))
val aggregatedmodedf = df.agg(aggcols.head, aggcols.tail: _*)
aggregatedmodedf.show()

also you could use .collect on the final dataframe to collect the result in a scala data structure.

note: the performance of this solution depends on the cardinality of the input column.


Related Query

More Query from same tag