score:0
in spark, you can use window aggregate functions directly, i will show that here in scala.
here is your input data (my preparation):
import scala.collection.javaconversions._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.row
val schema = structtype(
structfield("col1", stringtype, false) ::
structfield("col2", integertype, false) :: nil
)
val row = seq(row("a", 1),row("b", 8),row("b", 2),row("a", 5),row("b", 5),row("a", 3))
val df = spark.createdataframe(row, schema)
df.show(false)
//input:
// +----+----+
// |col1|col2|
// +----+----+
// |a |1 |
// |b |8 |
// |b |2 |
// |a |5 |
// |b |5 |
// |a |3 |
// +----+----+
here is the code to obtain desired logic :
import org.apache.spark.sql.expressions.window
df
// newcolumn: evaluate/create list of values for each record over the window as frame moves
.withcolumn(
"collected_list",
collect_list(col("col2")) over window
.partitionby(col("col1"))
.orderby(col("col2"))
)
// newcolumn: max size of collected list in each window
.withcolumn(
"max_size",
max(size(col("collected_list"))) over window.partitionby(col("col1"))
)
// filter to get only highest sized array row
.where(col("max_size") - size(col("collected_list")) === 0)
.orderby(col("col1"))
.drop("col2", "max_size")
.show(false)
// output:
// +----+--------------+
// |col1|collected_list|
// +----+--------------+
// |a |[1, 3, 5] |
// |b |[2, 5, 8] |
// +----+--------------+
note:
- you can just use collect_list() aggregate function with groupby directly but, you can not get the collection list ordered.
- collect_set() aggregate function you can explore if you want to eliminate duplicates (with some changes to the above query).
edit 2 : you can write your custom collect_list() as a udaf (userdefinedaggregatefunction) like this in scala spark for dataframes
online docs
below code spark version == 2.3.0
object your_collect_array extends userdefinedaggregatefunction {
override def inputschema: structtype = structtype(
structfield("yourinputtoaggfunction", longtype, false) :: nil
)
override def datatype: arraytype = arraytype(longtype, false)
override def deterministic: boolean = true
override def bufferschema: structtype = {
structtype(
structfield("yourcollectedarray", arraytype(longtype, false), false) :: nil
)
}
override def initialize(buffer: mutableaggregationbuffer): unit = {
buffer(0) = new array[long](0)
}
override def update(buffer: mutableaggregationbuffer, input: row): unit = {
buffer.update(
0,
buffer.getas[mutable.wrappedarray[long]](0) :+ input.getlong(0)
)
}
override def merge(
buffer1: mutableaggregationbuffer,
buffer2: row
): unit = {
buffer1.update(
0,
buffer1.getas[mutable.wrappedarray[long]](0) ++ buffer2
.getas[mutable.wrappedarray[long]](0)
)
}
override def evaluate(buffer: row): any =
buffer.getas[mutable.wrappedarray[long]](0)
}
//below is the query with just one line change i.e., calling above written custom udf
df
// newcolumn : using our custom udf
.withcolumn(
"your_collected_list",
your_collect_array(col("col2")) over window
.partitionby(col("col1"))
.orderby(col("col2"))
)
// newcolumn: max size of collected list in each window
.withcolumn(
"max_size",
max(size(col("your_collected_list"))) over window.partitionby(col("col1"))
)
// filter to get only highest sized array row
.where(col("max_size") - size(col("your_collected_list")) === 0)
.orderby(col("col1"))
.drop("col2", "max_size")
.show(false)
//output:
// +----+-------------------+
// |col1|your_collected_list|
// +----+-------------------+
// |a |[1, 3, 5] |
// |b |[2, 5, 8] |
// +----+-------------------+
note:
- udfs are not that efficient in spark hence, use them only when you absolutely need them. they are mainly focused for data analytics.
Source: stackoverflow.com
Related Query
- How to apply a function on a sequential data within group in spark?
- How to apply a function to a column of a Spark DataFrame?
- How to convert the group by function to data frame
- Spark 1.6 apply function to column with dot in name/ How to properly escape colName
- How we can sort and group data from the Spark RDDs?
- How do I group records that are within a specific time interval using Spark Scala or sql?
- How to apply a custom filtering function on a Spark DataFrame
- How to apply User Defined Function on each subset formed by a Group By Operation in Apache Spark?
- How to apply a customized function with multiple parameters to each group of a dataframe and union the resulting dataframes in Scala Spark?
- How to Transform a Spark Scala Nested Map within a Map Data Structure?
- Spark Scala : how to group data based on a field with a certain range?
- How apply regexp_replace spark function for multiple key-values?
- How to apply a Scala Function to two different columns data frame Scala
- how to apply spark window function on columns computed during execution
- spark sql : How to achieve parallel processing of dataframe at group level but with in each group, we require sequential processing of rows
- Dynamically apply aggregate function in SPARK data frame
- How to use a function or method on a Spark data frame column for transformation using Scala
- How to apply a function to a tuple?
- How to define and use a User-Defined Aggregate Function in Spark SQL?
- How to create correct data frame for classification in Spark ML
- How to create hive table from Spark data frame, using its schema?
- How to do count(*) within a spark dataframe groupBy
- SPARK DataFrame: How to efficiently split dataframe for each group based on same column values
- How to use the function type in scala within defined in type meaningfully?
- How to set column names to toDF() function in spark dataframe using a string array?
- How to mock a function within Scala object using Mockito?
- How to get data of previous row in Apache Spark
- How to set a custom loss function in Spark MLlib
- How to read json data using scala from kafka topic in apache spark
- Scalaz: how can I accumulate Failures or apply a function to Validations with different types?
More Query from same tag
- Spark SQL DataFrame - distinct() vs dropDuplicates()
- play 2.3.0 and Scala 2.10.4
- Get the difference between two dates in hours
- How to convert Scala Document to JSON in Scala
- How do you print out filled in Anorm SQL statements in Play framework
- Scala implicit Paramater used in Shapeless library
- Java regex for dropping all characters before group
- monitoring the lifecyle of an akka stream
- play/scala , implicit request => what is meaning?
- Create Regular Expression To Match Substring
- Is Base64 deterministic (Apache Commons lib or otherwise)?
- To multi insert in cql for scala
- How to find a string in list of tuples using scala?
- vaadin + scala : automatic push makes me waiting
- How to Write Generic Numeric Function in Scala?
- Test URI of web services using scalatest
- Can Scala Array add new element
- Why does Akka Actor have only one onReceive() method that able to take messsage
- Scala - How to group a 3 tuples List[(String, String, Int)] into List[List[(String, String, Int)]]
- error when trying to import spark with sbt
- How can I apply a macro annotation to a case class with a context bound?
- Future does not compile with Success and Failure
- How to reduce Scala (/ Java) startup overhead?
- scala: Is collection object mutable?
- In scala 2.13.x, is there a way to loop over the values of a Enumeration where Value is a String?
- How to avoid left-recursion infinite loops in Fastparse?
- Scala classes with arbitrary type attribute
- Scala 2.2.4 with Secure Social and Facebook login
- Scala sub type not being accepted
- Hazelcast server with scala client issue