score:1

Accepted answer

explode the nested array json and select the fields you want then write to file in json format.

val jsondf= spark.read.json(path)

val explodecolname = "value" // name of the column we want to explode
val flattencolname = explodecolname + "_flat" // temp name

val listofcolsfromarraytype =
  jsondf.schema
    .find(
      s => s.name == explodecolname && s.datatype.isinstanceof[arraytype])
    .map(
      _.datatype
        .asinstanceof[arraytype]
        .elementtype
        .asinstanceof[structtype]
        .names
    )

val filtercollist =
  listofcolsfromarraytype.getorelse(throw new exception("explode col name not found")) // or handle the error as needed

val flattenfiltercols = filtercollist.map { c =>
  if (c.contains(".")) { col(s"$flattencolname.`$c`") } else {
    col(s"$flattencolname.$c")
  }
}

val flatten = jsondf
  .select(explode(col(explodecolname)).as(flattencolname))
  .select(flattenfiltercols: _*)
   
    flattendf
      .write
      .json(outputpath)

the result will be

{"@odata.etag":"w/\"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn\"","e_no":345345,"g_2_code":"","g_code":"007"}
{"@odata.etag":"w/\"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn\"","e_no":234543,"g_2_code":"","g_code":"008"}

score:0

i made few changes to your method and now it is working.

please note that, i haven't renamed any of the underlying column. if you wanted to fetch that in further processing use backtique (`)

test data

 df.show(false)
    df.printschema()

    /**
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |@odata.context|@odata.nextlink|value                                                                                                                                                                                         |
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      * |xxxx          |xxxx           |[[w/"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn", 345345, , 007], [w/"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn", 234543, , 008]]|
      * +--------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
      *
      * root
      * |-- @odata.context: string (nullable = true)
      * |-- @odata.nextlink: string (nullable = true)
      * |-- value: array (nullable = true)
      * |    |-- element: struct (containsnull = true)
      * |    |    |-- @odata.etag: string (nullable = true)
      * |    |    |-- e_no: long (nullable = true)
      * |    |    |-- g_2_code: string (nullable = true)
      * |    |    |-- g_code: string (nullable = true)
      *
      */

flatten the nested columns of type array and struct


    def flattendataframe(df: dataframe): dataframe = {

      val fields = df.schema.fields
      val fieldnames = fields.map(x => x.name)
      val length = fields.length

      for(i <- 0 to fields.length-1){
        val field = fields(i)
        val fieldtype = field.datatype
        val fieldname = field.name
        fieldtype match {
          case arraytype: arraytype =>
            val fieldnamesexcludingarray = fieldnames.filter(_!=fieldname)
            val fieldnamesandexplode = fieldnamesexcludingarray.map(c => s"`$c`") ++
              array(s"explode_outer($fieldname) as $fieldname")
            val explodeddf = df.selectexpr(fieldnamesandexplode:_*)
            return flattendataframe(explodeddf)
          case structtype: structtype =>
            val childfieldnames = structtype.fieldnames.map(childname => s"$fieldname.`$childname`")
            val newfieldnames = fieldnames.filter(_!= fieldname).map(c => s"`$c`") ++ childfieldnames
            val renamedcols = newfieldnames.map(x => col(x))
            val explodedf = df.select(renamedcols:_*)
            return flattendataframe(explodedf)
          case _ =>
        }
      }
      df
    }

    val flattendedjson = flattendataframe(df)
    flattendedjson.show(false)
    flattendedjson.printschema()

    /**
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      * |@odata.context|@odata.nextlink|@odata.etag                                                                 |e_no  |g_2_code|g_code|
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      * |xxxx          |xxxx           |w/"jzq0ozlxadnzlys1wxbpbwfxae5mbfdkbvpnyjmrwdq1mmjsegdxvvhrtvrzuxc9mtswmdsn"|345345|        |007   |
      * |xxxx          |xxxx           |w/"jzq0o0znwkf2ogd1dve2l21oqtdkr2g4yu05tldkmerpmupmwtrsazfkqzzutdq9mtswmdsn"|234543|        |008   |
      * +--------------+---------------+----------------------------------------------------------------------------+------+--------+------+
      *
      * root
      * |-- @odata.context: string (nullable = true)
      * |-- @odata.nextlink: string (nullable = true)
      * |-- @odata.etag: string (nullable = true)
      * |-- e_no: long (nullable = true)
      * |-- g_2_code: string (nullable = true)
      * |-- g_code: string (nullable = true)
      */

Related Query

More Query from same tag