score:2
Accepted answer
Not a straight forward task, but one approach would be to:
- group
time
and correspondingvalue
into a "map" oftime-value
pairs - flatten it out into a column of
time-value
pairs - perform
groupBy-pivot-agg
transformation usingtime
as part of the groupBy key andtypes
as the pivot column to aggregate the time-correspondingvalue
Sample code below:
import org.apache.spark.sql.functions._
val df = Seq(
("G", "12/3/2018", "Import", "Voltage", 3.5, 6.8),
("H", "13/3/2018", "Import", "Voltage", 7.5, 9.8),
("H", "13/3/2018", "Export", "Watt", 4.5, 8.9),
("H", "13/3/2018", "Export", "Voltage", 5.6, 9.1)
).toDF("ID", "Date", "Type1", "Type2", "0:30", "1:00")
df.
withColumn("TimeValMap", array(
struct(lit("0:30").as("_1"), $"0:30".as("_2")),
struct(lit("1:00").as("_1"), $"1:00".as("_2"))
)).
withColumn("TimeVal", explode($"TimeValMap")).
withColumn("Time", $"TimeVal._1").
withColumn("Types", concat_ws("-", array($"Type1", $"Type2"))).
groupBy("ID", "Date", "Time").pivot("Types").agg(first($"TimeVal._2")).
orderBy("ID", "Date", "Time").
na.fill(0.0).
show
// +---+---------+----+--------------+-----------+--------------+
// | ID| Date|Time|Export-Voltage|Export-Watt|Import-Voltage|
// +---+---------+----+--------------+-----------+--------------+
// | G|12/3/2018|0:30| 0.0| 0.0| 3.5|
// | G|12/3/2018|1:00| 0.0| 0.0| 6.8|
// | H|13/3/2018|0:30| 5.6| 4.5| 7.5|
// | H|13/3/2018|1:00| 9.1| 8.9| 9.8|
// +---+---------+----+--------------+-----------+--------------+
Source: stackoverflow.com
Related Query
- How to unpivot the table based on the multiple columns
- how to concat multiple columns in spark while getting the column names to be concatenated from another table (different for each row)
- How to filter columns in one table based on the same columns in another table using Spark
- How to sort the data on multiple columns in apache spark scala?
- How to return a subset of the DataFrame’s columns based on the column dtypes in Spark Scala
- How to use attribute names if the table has more than 22 columns so it is defined as a type. Slick / Scala
- How do I explode multiple columns of arrays in a Spark Scala dataframe when the columns contain arrays that line up with one another?
- How to partition a dataframe on multiple columns and write the output to xlsx in Apache Spark
- how to create a dataframe based on the first appearing date and based on additional columns each id column
- How to obtain DataFrame from the database table retrived with JDBC cut by the multiple date ranges with one date range per row in another DataFrame?
- How to explode multple columns to multiple rows and add an additional column, based on exploded ones?
- How to select specific columns from Spark DataFrame based on the value of another column?
- How to put values to the new separate DataFrame columns based on the value of specific already existing column? I.e transpose the DataFrame
- Parse the XML column into multiple columns and transpose into rows based on count in Spark DataFrame
- How to create a DF based on a filter criteria across all the columns in Spark/Scala
- how to merge the multiple columns in single columns using UDF and remove the 0 value row from the column in pyspark
- Order of the columns in Apache Zeppelin when selecting the data from the temprorary table is wrong, how to put specific column first?
- How to filter on multiple predicates while querying a table in the Spark BigQuery Connector in Scala?
- How to search in multiple columns using the like operator in slick?
- How to perform aggregation (sum) on different columns and group the result based on another column of a spark dataframe?
- How to create new rows in dataset based on multiple values present in array in one column of the dataset
- What is the fastest way to group values based on multiple key columns in RDD using Scala?
- How to save a dataframe into multiple files based on unique columns in spark-scala
- How to find intersection of 2 dataframes based on multiple columns in spark/scala?
- How are the multiple Actors implementation in Scala different?
- How do you update multiple columns using Slick Lifted Embedding?
- Spark/Scala repeated calls to withColumn() using the same function on multiple columns
- How to groupBy using multiple columns in scala collections
- How do the Scala based frameworks stack up for a complete Scala newbie - Lift, Play, Circumflex, etc
- Dropping multiple columns from Spark dataframe by Iterating through the columns from a Scala List of Column names
More Query from same tag
- Most frequent value in a dataset in scala
- Streaming from HBase using Spark not serializable
- Scala play coffee not compile javascript in public/javascript/
- Why does memory usage of spark worker increases with time?
- Error in SQL statement: NoClassDefFoundError: com/macasaet/fernet/Validator
- How to use sbt with Google App Engine?
- SCALA regex: Find matching URL rgex within a sentence
- Overwriting the default case class constructor (Scala)
- How to avoid duplication of type constraint in scala
- Iterate through List of case objects replace NaNs and write it back in Scala
- Incorrect Float conversion by Argonaut.io
- Delay a shutdown when background actors must finish their tasks
- How to avoid gc overhead limit exceeded in a range query with GeoSpark?
- Load spark scala script into spark shell
- Why I have to return Some in unapply method
- Interoperability : sharing Datasets of objects or Row between Java and Scala, two ways. I put a Scala dataset operation in the middle of Java ones
- What `JObject(rec) <- someJArray` means inside for-comprehension
- Loading and Parsing JSON with Spark using Scala Jackson library
- Apply key to map obtained via pattern matching in Scala (type erased)
- Creating a struct field in a row of a dataframe
- Invalid literal number by running head command in azure databrick
- How do you divide a byte array into chunks in Scala?
- How to get data from SocketTCP to save to dataframe in Spark Scala?
- @After annotation doesn't work with parameters in Play! Framework with Scala module
- Mixing up of Scala library supplied akka-actors.jar and application specific version
- Scala / Akka - pass message through behavior stack?
- scala Slider throws casting exception
- Why is the Scala API documentation missing classes?
- How to transform a list into a list of pairs like that?
- Dataset.unpersist() unexpectedly affects count of other RDD's