score:0

this is obtained by getting json object and exploding the column as below.

val schema = arraytype(structtype(seq(structfield("sn", stringtype), structfield("e", stringtype), structfield("v", stringtype))))
val structdf = fromblobdf.withcolumn("sig_array", from_json($"event", schema))

val signalsdf = structdf.withcolumn("sig_array", explode($"sig_array")).withcolumn("signal", $"sig_array.sn").withcolumn("e", $"sig_array.e").withcolumn("v", $"sig_array.v").select("num_id","e","signal","v")

score:1

above event column contains multiple records in a row, that is data has to flatten before processing it further. data flattening could be achieved by a flatmap transformation operation on dataframe.

the approach is creating a flatten json dataframe having all the necessary key & values in it, and finally json to dataframe conversion via spark read json api.

val mapper = new objectmapper()
import spark.implicits._

val flatdf = df.flatmap(row => {
  val numid = row.getas[string]("num_id")
  val event = row.getas[string]("event")
  val data = mapper.readvalue(event, classof[array[java.util.map[string, string]]])

  data.map(jsonmap => {
    jsonmap.put("num_id", numid)
    mapper.writevalueasstring(jsonmap)
  })
})

val finaldf = spark.read.json(flatdf)

//finaldf outout
+-------------+-------+----+-----+
|            e| num_id|  sn|    v|
+-------------+-------+----+-----+
|1571599398000|xxxxx01|sig1|19.79|
|1571599406000|xxxxx01|sig1| 19.8|
|1571599406000|xxxxx01|sig2| 25.3|
|1571599414000|xxxxx02|sig1|19.79|
|1571599414000|xxxxx02|sig2| 19.8|
|1571599424000|xxxxx02|sig2| 25.3|
+-------------+-------+----+-----+

Related Query

More Query from same tag