score:0

Accepted answer

the answer from @sathiyan s led me to the following solution (presenting it here as it did not completely solved my problems but served as the pointer to the right direction):

import org.apache.spark.sql.encoders
import org.apache.spark.sql.types.{structfield, structtype}

// created expected schema
val schema = encoders.product[schemaclass].schema

// convert all fields as nullable
val newschema = structtype(schema.map {
  case structfield( c, t, _, m) ⇒ structfield( c, t, nullable = true, m)
})

// apply expected and nullable schema for parsing json string            
session.read.schema(newschema).json(jsonincompleterddstring).as[schemaclass]

benefits:

  • all missing fields are set to null, independent of data type
  • additional fields in the json string, which are not part of the case class will be ignored

score:1

package spark

import org.apache.spark.rdd.rdd
import org.apache.spark.sql.{column, encoders, sparksession}
import org.apache.spark.sql.types.structtype
import org.apache.spark.sql.functions.{col, lit}

object jsondf extends app {


  val spark = sparksession.builder()
    .master("local")
    .appname("dataframe-example")
    .getorcreate()

  import spark.implicits._

  case class schemaclass(a: string, b: int)

  val jsondataincomplete: string = """{"a": "foo", "m": "eee"}"""
  val jsonincompleterddstring: rdd[string] = spark.sparkcontext.parallelize(list(jsondataincomplete))

  val dsincomplete = spark.read.json(jsonincompleterddstring)  // .as[schemaclass]

  lazy val schema: structtype    = encoders.product[schemaclass].schema
  lazy val fields: array[string] = schema.fieldnames
  lazy val colnames: array[column]  = fields.map(col(_))

  val sch = dsincomplete.schema
  val schemadiff = schema.diff(sch)
  val rr = schemadiff.foldleft(dsincomplete)((acc, col) => {
    acc.withcolumn(col.name, lit(null).cast(col.datatype))
  })



  val schf = dsincomplete.schema
  val schdiff = schf.diff(schema)

  val rrr = schdiff.foldleft(rr)((acc, col) => {
    acc.drop(col.name)
  })
    .select(colnames: _*)


}

score:1

you can now skip loading json as rdd and then reading as df to directly val dsincomplete = spark.read.json(seq(jsondataincomplete).tods) if you are using spark 2.2+

  1. load your json data
  2. extract your schema from case class or define it manually
  3. get the missing field list
  4. default the value to lit(null).cast(col.datatype) for missing column.
import org.apache.spark.sql.encoders
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{structfield, structtype}

object defaultfieldvalue {

  def main(args: array[string]): unit = {

    val spark = constant.getsparksess

    import spark.implicits._
    val jsondataincomplete: string = """{"a": "foo"}"""
    val dsincomplete = spark.read.json(seq(jsondataincomplete).tods)
    val schema: structtype = encoders.product[schemaclass].schema

    val fields: array[structfield] = schema.fields

    val outdf = fields.diff(dsincomplete.columns).foldleft(dsincomplete)((acc, col) => {
      acc.withcolumn(col.name, lit(null).cast(col.datatype))
    })

    outdf.printschema()
    outdf.show()


  }
}

case class schemaclass(a: string, b: int, c: string, d: double)



score:1

it will work the same way if you have different json strings in the same rdd. when you have only one which is not matching with the schema then it will throw error.

eg.

val jsonincompleterddstring: rdd[string] = spark.sparkcontext.parallelize(list(jsondataincomplete, jsondata))

import spark.implicits._
val dsincomplete = spark.read.json(jsonincompleterddstring).as[schemaclass]

dsincomplete.printschema()
dsincomplete.show()

scala> dsincomplete.show()
+---+----+
|  a|   b|
+---+----+
|foo|null|
|foo| bar|
+---+----+

one way you can do is instead converting it as[person] you can build schema(structtype) from it and apply it while reading the json files,

import org.apache.spark.sql.encoders

val schema = encoders.product[person].schema

val path = "examples/src/main/resources/people.json"
val peopleds = spark.read.schema(schema).json(path).as[person]
peopleds.show
+-------+----+
|   name| age|
+-------+----+
|michael|null|
+-------+----+

content of the code file is,

{"name":"michael"}

Related Query

More Query from same tag