score:1
Accepted answer
explode the nested array json and select the fields you want then write to file in json format.
val jsondf= spark.read.json(path)
val explodecolname = "value" // name of the column we want to explode
val flattencolname = explodecolname + "_flat" // temp name
val listofcolsfromarraytype =
jsondf.schema
.find(
s => s.name == explodecolname && s.datatype.isinstanceof[arraytype])
.map(
_.datatype
.asinstanceof[arraytype]
.elementtype
.asinstanceof[structtype]
.names
)
val filtercollist =
listofcolsfromarraytype.getorelse(throw new exception("explode col name not found")) // or handle the error as needed
val flattenfiltercols = filtercollist.map { c =>
if (c.contains(".")) { col(s"$flattencolname.`$c`") } else {
col(s"$flattencolname.$c")
}
}
val flatten = jsondf
.select(explode(col(explodecolname)).as(flattencolname))
.select(flattenfiltercols: _*)
flattendf
.write
.json(outputpath)
the result will be
{"@odata.etag":"w/\"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn\"","e_no":345345,"g_2_code":"","g_code":"007"}
{"@odata.etag":"w/\"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn\"","e_no":234543,"g_2_code":"","g_code":"008"}
score:0
i made few changes to your method and now it is working.
please note that, i haven't renamed any of the underlying column. if you wanted to fetch that in further processing use backtique (`)
test data
df.show(false)
df.printschema()
/**
* +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |@odata.context|@odata.nextlink|value |
* +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
* |xxxx |xxxx |[[w/"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn", 345345, , 007], [w/"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn", 234543, , 008]]|
* +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
*
* root
* |-- @odata.context: string (nullable = true)
* |-- @odata.nextlink: string (nullable = true)
* |-- value: array (nullable = true)
* | |-- element: struct (containsnull = true)
* | | |-- @odata.etag: string (nullable = true)
* | | |-- e_no: long (nullable = true)
* | | |-- g_2_code: string (nullable = true)
* | | |-- g_code: string (nullable = true)
*
*/
flatten the nested columns of type array and struct
def flattendataframe(df: dataframe): dataframe = {
val fields = df.schema.fields
val fieldnames = fields.map(x => x.name)
val length = fields.length
for(i <- 0 to fields.length-1){
val field = fields(i)
val fieldtype = field.datatype
val fieldname = field.name
fieldtype match {
case arraytype: arraytype =>
val fieldnamesexcludingarray = fieldnames.filter(_!=fieldname)
val fieldnamesandexplode = fieldnamesexcludingarray.map(c => s"`$c`") ++
array(s"explode_outer($fieldname) as $fieldname")
val explodeddf = df.selectexpr(fieldnamesandexplode:_*)
return flattendataframe(explodeddf)
case structtype: structtype =>
val childfieldnames = structtype.fieldnames.map(childname => s"$fieldname.`$childname`")
val newfieldnames = fieldnames.filter(_!= fieldname).map(c => s"`$c`") ++ childfieldnames
val renamedcols = newfieldnames.map(x => col(x))
val explodedf = df.select(renamedcols:_*)
return flattendataframe(explodedf)
case _ =>
}
}
df
}
val flattendedjson = flattendataframe(df)
flattendedjson.show(false)
flattendedjson.printschema()
/**
* +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
* |@odata.context|@odata.nextlink|@odata.etag |e_no |g_2_code|g_code|
* +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
* |xxxx |xxxx |w/"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn"|345345| |007 |
* |xxxx |xxxx |w/"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn"|234543| |008 |
* +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
*
* root
* |-- @odata.context: string (nullable = true)
* |-- @odata.nextlink: string (nullable = true)
* |-- @odata.etag: string (nullable = true)
* |-- e_no: long (nullable = true)
* |-- g_2_code: string (nullable = true)
* |-- g_code: string (nullable = true)
*/
Source: stackoverflow.com
Related Query
- Flatten any nested json string and convert to dataframe using spark scala
- Flatten JSON using Scala
- How to parse JSON in Scala using standard Scala classes?
- How to load JSON file using Play with Scala
- Write a simple json REST server using spray in scala
- Serializing a scala object into a JSon String using lift-json
- JSON serialization of Scala enums using Jackson
- Encoding Scala None to JSON value using circe
- Print json string in one line using circe in scala
- How to add a json object in to a json array using scala play?
- Scala Dynamic Parse Json using case class No Manifest available for T
- How to read json data using scala from kafka topic in apache spark
- How can I deserialize from JSON with Scala using *non-case* classes?
- Re-using A Schema from JSON within a Spark DataFrame using Scala
- Parse JSON array using Scala Argonaut
- Flatten nested json in Scala Spark Dataframe
- Parsing a Json String in Scala using Play framework
- Scala using JSON and/or XML templates
- Using Akka Http and Circe for decoding JSON in Scala
- How to flatten a js array on Play using JSON transformers?
- Serialize List[Any] to/from Json in Scala using Jackson
- Get a specific parameter from a json string using JsonPath in scala
- Extract nested JSON element using scala
- Parse Complex JSON in SCALA using spray-json
- Scala convert tuple to JSON using Jerkson
- How to specify only particular fields using read.schema in JSON : SPARK Scala
- Read JSON inside a text file using spark and Scala
- Producing json in a Scala app using json4s
- convert scala object to json using json4s
- Add a Json object to JSON Array using scala play
More Query from same tag
- spark dataframe to be written in partitions
- Threading extra state through a parser in Scala
- How and when should I use variable/method declaration?
- Is it better to use import scala.reflect.io.File or java.io.File in Scala?
- Access Oozie Configuration in Spark program
- base64 decoding of a dataframe
- Is it possible to make a generic function that takes different case classes
- Spark combineByKey on values that contains tuples
- Problems on updating an var from a select form in Lift
- What would be an appropriate collection type for storing few elements?
- Finding size of distinct array column
- Find last time occured based on a time window with spark/scala for each group
- How to remove unicode in rdd with spark-scala?
- Decoding the below Scala code
- Scala sort list based on second attribute and then first
- RDF4J not filtering TreeModel in expected way
- Scala Spark get sum by time bucket across team spans and key
- Scala forall to compare two lists?
- Connecting to MSSQL(jtds) with Play and Slick
- Separating application logs in Logback from Spark Logs in log4j
- Query Athena (Add Partition) using using AWS Glue Scala
- Parsing SQL Queries by semicolon
- idiomatic way to declare protected method in Scala when allowing for composition?
- Scala tail recursion error
- How to read records in JSON format from Kafka using Structured Streaming?
- Scala Return Type Equal to Argument Type
- How to externally populate play! model
- Convert from Array[((String, String), Double)] to Array[(String, Array[((String, String), Double)])]
- .inferHtmlResources() and resource
- SCALA: Is it possible to prepare a regression suite(integration tests) for Restful API's with scalatest