score:1
i can't think of an easy way to achieve what you want using only native spark functions, but it is can be done using a user defined function (udf) as below:
val groupby2 = udf((s: seq[int]) => s.grouped(2).tolist)
ss.udf.register("groupby2", groupby2)
val dsgrouped: dataset[aggrbook] = bookds.groupby("city", "state").agg(collect_list("bookingid") as "books")
.withcolumn("books", explode(groupby2(col("books"))))
.as[aggrbook]
the udf takes a seq[int]
and returns a seq[seq[int]]
where the inner sequence has a length of two or less. this is then 'expanded' using the native spark explode
function to give you (potentially) multiple rows for each 'city-state' pair, but only two ids in the 'books' column.
Source: stackoverflow.com
Related Query
- groupBy Dataset per number of keys with a limit
- Optimal way to create a ml pipeline in Apache Spark for dataset with high number of columns
- write a spark Dataset to json with all keys in the schema, including null columns
- Monix Observable groupBy large number of keys without memory leaks
- How to join with dataset with column as the collection of keys to join by?
- Play framework 2.3.4 read JSON with variable number keys
- Spark Dataframe groupBy with sequence as keys arguments
- GroupBy with list of keys -- Scala
- Aggregation on a Dataset with composite keys
- parse json object where keys start with a number using scala
- Spark job execution time exponentially increases with very wide dataset and number of columns
- With Scala, what's the (practical) limit number of Actors in a single process?
- GroupBy + custom aggregation on Dataset with Case class / Trait in the Key
- Split list into multiple lists with fixed number of elements
- aggregate function Count usage with groupBy in Spark
- Why does Spark fail with java.lang.OutOfMemoryError: GC overhead limit exceeded?
- Strange sbt bug where I cannot import sbt project due to keys colliding with themselves
- Number of CPUs per Task in Spark
- Apache Spark: Get number of records per partition
- Does the incremental compilation speed in Scala depend on the number of classes per file?
- Spark structured streaming - join static dataset with streaming dataset
- Spark Dataset select with typedcolumn
- Split list into multiple lists with fixed number of elements in java 8
- Scala Slick: Issues with groupBy and missing shapes
- Convert Scala's list into map with indicies as keys
- Spark java : Creating a new Dataset with a given schema
- How to limit number of unprocessed Futures in Scala?
- Spark How to get number of Keys changed in two JSONS in Scala?
- Filtering Scala's Parallel Collections with early abort when desired number of results found
- Scala spark: how to use dataset for a case class with the schema has snake_case?
More Query from same tag
- Is there Scala aware high level byte-code manipulation tool like Javassist?
- Converting a vector column in a dataframe back into an array column
- Group by and find count before doing pivot spark
- Introspect a Scala class for traits?
- mapPartitions Vs foreach plus accumulator approach
- Spark Streaming Kafka: ClassNotFoundException for ByteArrayDeserializer when run with spark-submit
- Merging files by Partition
- Why doesn't this Python code for powerset work?
- rank values strategies in scala collection
- Filter CSV in scala?
- Creating an aspectJ library using sbt-aspect
- Check if a String matches this String interpolation
- Rich enumeration in Scala
- Why Int, Double and Nothing 's common super type is AnyVal
- Is it possible to declare a val before assignment/initialization in Scala?
- Convert data frame to strong typed data set?
- Exclude Alphabet and Special character from Alphanumeric string in Spark Scala
- Issues when using Doobie library with Oracle and Timestamp
- How to use split function in scala with an iteration on a column?
- Cannot run scala in intellij but it works in commandline
- Extracting Values from a Map[String, Any] where Any is a Map itself
- Scala - overloading operators for custom wrappers
- ubuntu scala ide - spark - toDF Method error value toDF is not a member of org.apache.spark.rdd.RDD[String]
- Recursive method call in Apache Spark
- ZIO : How to compute only once?
- Scala access inherited static member of Java class
- How the Scala script that reads 5G log file from network drive should be modified in order to read last x lines (like 'tail' in Unix)?
- Selection sort in functional Scala
- How to add new field to struct column?
- Replace the value of one column from another column in spark dataframe