score:0
this is obtained by getting json object and exploding the column as below.
val schema = arraytype(structtype(seq(structfield("sn", stringtype), structfield("e", stringtype), structfield("v", stringtype))))
val structdf = fromblobdf.withcolumn("sig_array", from_json($"event", schema))
val signalsdf = structdf.withcolumn("sig_array", explode($"sig_array")).withcolumn("signal", $"sig_array.sn").withcolumn("e", $"sig_array.e").withcolumn("v", $"sig_array.v").select("num_id","e","signal","v")
score:1
above event column contains multiple records in a row, that is data has to flatten before processing it further. data flattening could be achieved by a flatmap transformation operation on dataframe.
the approach is creating a flatten json dataframe having all the necessary key & values in it, and finally json to dataframe conversion via spark read json api.
val mapper = new objectmapper()
import spark.implicits._
val flatdf = df.flatmap(row => {
val numid = row.getas[string]("num_id")
val event = row.getas[string]("event")
val data = mapper.readvalue(event, classof[array[java.util.map[string, string]]])
data.map(jsonmap => {
jsonmap.put("num_id", numid)
mapper.writevalueasstring(jsonmap)
})
})
val finaldf = spark.read.json(flatdf)
//finaldf outout
+-------------+-------+----+-----+
| e| num_id| sn| v|
+-------------+-------+----+-----+
|1571599398000|xxxxx01|sig1|19.79|
|1571599406000|xxxxx01|sig1| 19.8|
|1571599406000|xxxxx01|sig2| 25.3|
|1571599414000|xxxxx02|sig1|19.79|
|1571599414000|xxxxx02|sig2| 19.8|
|1571599424000|xxxxx02|sig2| 25.3|
+-------------+-------+----+-----+
Source: stackoverflow.com
Related Query
- Convert a dataframe column with json string into different columns
- Scala - Convert Dataframe into json file and make key from column name and column value both for different different columns
- Converting an Array[Double] Column into a string or two different columns with Spark Dataframe Scala
- Apache Spark: Convert column with a JSON String to new Dataframe in Scala spark
- Combine all dataframe columns into one column as a JSON with JSON types preserved
- Convert all the columns of a spark dataframe into a json format and then include the json formatted data as a column in another/parent dataframe
- Convert Spark Dataframe Column with Seq of String to Nested Json with Dynamic Json Schema
- Convert a string into four different columns of a dataframe in spark
- Converting multiple different columns to Map column with Spark Dataframe scala
- Convert Array into dataframe with columns and index in Scala
- Scala - Spark - How to transform a dataframe containing one string column to a DF with columns with the rigth type?
- Convert row values into columns with its value from another column in spark scala
- Convert multiple columns into a column of map on Spark Dataframe using Scala
- Scala Spark How can I convert a column array[string] to a string with JSON array in it?
- split an apache-spark dataframe string column into multiple columns by slicing/splitting on field width values stored in a list
- Convert List of dataframes into single dataframe with specific columns in Scala
- Convert multiple columns into a single column in dataframe
- Convert a column which contains array of string (of unequal size) to exactly two columns with multiple rows in scala spark
- Spark scala - parse json from dataframe column and return RDD with columns
- Convert nested json string to columns in DataFrame
- Convert spark Dataframe with schema to dataframe of json String
- How to merge all unique values of a spark dataframe column into single row based on id and convert the column into json format
- Spark DataFrame convert milliseconds timestamp column in string format to human readable time with milliseconds
- Spark dataframe convert all the columns into json format and then modify json structure
- How to convert a DataFrame with String into a DataFrame with Vectors in Scala(Spark 2.0)
- Spark Dataframe Combine 2 Columns into Single Column, with Additional Identifying Column
- How to replace the column values from the dataframe into empty string if it matches completely with regex?
- Scala, convert list of dataframe into single dataframe then merge it with another dataframe with a specific column
- Convert master dataframe into child dataframe with specific columns using substring
- How to transform a string column of a dataframe into a column of Array[String] with Apache Spark and Scala
More Query from same tag
- Scala and SLF4J :: pass multiple parameters
- Scala Random Number
- How do I set the classpath when using the ScalaTest runner
- Scala project organization
- Implicit Conversion of Case Classes Issues
- How to force Logger.debug output in Play! framework specs2 tests?
- Using scala.util.control.Exception
- Spark CombineByKey
- Custom decoder for AWS API Gateway using circe
- Scala String interpolation with Format, how to change locale?
- / to %2f convert back in Ajax Request
- Scala XML, Get nodes where parent has attribute value match
- Convert java.util.ArrayList to Seq in Scala
- how to invoke scala method in play framework view page
- How to compile scala into runnable jar file
- Spark Sql: Loading the file from excel sheet (with extension .xlsx) can not infer the schema of a date-type column properly
- MongoDB Codec in Scala broken?
- Installing scala-maven-plugin Manually in Eclipse
- Ordering can't take parameter type for context bound
- Assertion failed when compiling Scala in IntelliJ
- Android Filter searchView Widget for scala " got ClassCastException"
- how to process data in chunks/batches with kafka streams?
- How to manipulate modifiers in annotation macros
- How to test the arguments passed to a function of ScalarDB
- Scala: Iterate an Array's indices in for-comprehension but ensure result type is same Array type
- Scala: How to access a class property dynamically by name?
- Why does operator associativity work differently for Scala 3 extension methods and regular methods?
- Is it correct to use `Future` to run some loop task which is never finished?
- Gatling session - get attribute as Long
- How to represent a partial update on case classe in Scala ?