score:0

in spark, you can use window aggregate functions directly, i will show that here in scala.

here is your input data (my preparation):

import scala.collection.javaconversions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.row

val schema = structtype(
    structfield("col1", stringtype, false) ::
    structfield("col2", integertype, false) :: nil
)

val row = seq(row("a", 1),row("b", 8),row("b", 2),row("a", 5),row("b", 5),row("a", 3))
val df = spark.createdataframe(row, schema)
df.show(false)
//input:
// +----+----+
// |col1|col2|
// +----+----+
// |a   |1   |
// |b   |8   |
// |b   |2   |
// |a   |5   |
// |b   |5   |
// |a   |3   |
// +----+----+

here is the code to obtain desired logic :

  import org.apache.spark.sql.expressions.window
  df
// newcolumn: evaluate/create list of values for each record over the window as frame moves
    .withcolumn(
      "collected_list",
      collect_list(col("col2")) over window
        .partitionby(col("col1"))
        .orderby(col("col2"))
    )
// newcolumn: max size of collected list in each window
    .withcolumn(
      "max_size",
      max(size(col("collected_list"))) over window.partitionby(col("col1"))
    )
// filter to get only highest sized array row
    .where(col("max_size") - size(col("collected_list")) === 0)
    .orderby(col("col1"))
    .drop("col2", "max_size")
    .show(false)

// output:
// +----+--------------+
// |col1|collected_list|
// +----+--------------+
// |a   |[1, 3, 5]     |
// |b   |[2, 5, 8]     |
// +----+--------------+

note:

  1. you can just use collect_list() aggregate function with groupby directly but, you can not get the collection list ordered.
  2. collect_set() aggregate function you can explore if you want to eliminate duplicates (with some changes to the above query).
edit 2 : you can write your custom collect_list() as a udaf (userdefinedaggregatefunction) like this in scala spark for dataframes
online docs
  1. for spark2.3.0
  2. for latest version
below code spark version == 2.3.0
object your_collect_array extends userdefinedaggregatefunction {

    override def inputschema: structtype = structtype(
      structfield("yourinputtoaggfunction", longtype, false) :: nil
    )

    override def datatype: arraytype = arraytype(longtype, false)

    override def deterministic: boolean = true

    override def bufferschema: structtype = {
      structtype(
        structfield("yourcollectedarray", arraytype(longtype, false), false) :: nil
      )
    }
    override def initialize(buffer: mutableaggregationbuffer): unit = {
      buffer(0) = new array[long](0)
    }

    override def update(buffer: mutableaggregationbuffer, input: row): unit = {

      buffer.update(
        0,
        buffer.getas[mutable.wrappedarray[long]](0) :+ input.getlong(0)
      )

    }

    override def merge(
        buffer1: mutableaggregationbuffer,
        buffer2: row
    ): unit = {
      buffer1.update(
        0,
        buffer1.getas[mutable.wrappedarray[long]](0) ++ buffer2
          .getas[mutable.wrappedarray[long]](0)
      )

    }
    override def evaluate(buffer: row): any =
      buffer.getas[mutable.wrappedarray[long]](0)

  }


//below is the query with just one line change i.e., calling above written custom udf 
df
// newcolumn : using our custom udf
  .withcolumn(
    "your_collected_list",
    your_collect_array(col("col2")) over window
      .partitionby(col("col1"))
      .orderby(col("col2"))
  )
// newcolumn: max size of collected list in each window
  .withcolumn(
    "max_size",
    max(size(col("your_collected_list"))) over window.partitionby(col("col1"))
  )
// filter to get only highest sized array row
  .where(col("max_size") - size(col("your_collected_list")) === 0)
  .orderby(col("col1"))
  .drop("col2", "max_size")
  .show(false)

//output:
// +----+-------------------+
// |col1|your_collected_list|
// +----+-------------------+
// |a   |[1, 3, 5]          |
// |b   |[2, 5, 8]          |
// +----+-------------------+

note:

  1. udfs are not that efficient in spark hence, use them only when you absolutely need them. they are mainly focused for data analytics.

Related Query

More Query from same tag