score:0
the answer from @sathiyan s led me to the following solution (presenting it here as it did not completely solved my problems but served as the pointer to the right direction):
import org.apache.spark.sql.encoders
import org.apache.spark.sql.types.{structfield, structtype}
// created expected schema
val schema = encoders.product[schemaclass].schema
// convert all fields as nullable
val newschema = structtype(schema.map {
case structfield( c, t, _, m) ⇒ structfield( c, t, nullable = true, m)
})
// apply expected and nullable schema for parsing json string
session.read.schema(newschema).json(jsonincompleterddstring).as[schemaclass]
benefits:
- all missing fields are set to
null
, independent of data type - additional fields in the json string, which are not part of the
case class
will be ignored
score:1
package spark
import org.apache.spark.rdd.rdd
import org.apache.spark.sql.{column, encoders, sparksession}
import org.apache.spark.sql.types.structtype
import org.apache.spark.sql.functions.{col, lit}
object jsondf extends app {
val spark = sparksession.builder()
.master("local")
.appname("dataframe-example")
.getorcreate()
import spark.implicits._
case class schemaclass(a: string, b: int)
val jsondataincomplete: string = """{"a": "foo", "m": "eee"}"""
val jsonincompleterddstring: rdd[string] = spark.sparkcontext.parallelize(list(jsondataincomplete))
val dsincomplete = spark.read.json(jsonincompleterddstring) // .as[schemaclass]
lazy val schema: structtype = encoders.product[schemaclass].schema
lazy val fields: array[string] = schema.fieldnames
lazy val colnames: array[column] = fields.map(col(_))
val sch = dsincomplete.schema
val schemadiff = schema.diff(sch)
val rr = schemadiff.foldleft(dsincomplete)((acc, col) => {
acc.withcolumn(col.name, lit(null).cast(col.datatype))
})
val schf = dsincomplete.schema
val schdiff = schf.diff(schema)
val rrr = schdiff.foldleft(rr)((acc, col) => {
acc.drop(col.name)
})
.select(colnames: _*)
}
score:1
you can now skip loading json as rdd and then reading as df to directly
val dsincomplete = spark.read.json(seq(jsondataincomplete).tods)
if you are using spark 2.2+
- load your json data
- extract your schema from case class or define it manually
- get the missing field list
- default the value to
lit(null).cast(col.datatype)
for missing column.
import org.apache.spark.sql.encoders
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{structfield, structtype}
object defaultfieldvalue {
def main(args: array[string]): unit = {
val spark = constant.getsparksess
import spark.implicits._
val jsondataincomplete: string = """{"a": "foo"}"""
val dsincomplete = spark.read.json(seq(jsondataincomplete).tods)
val schema: structtype = encoders.product[schemaclass].schema
val fields: array[structfield] = schema.fields
val outdf = fields.diff(dsincomplete.columns).foldleft(dsincomplete)((acc, col) => {
acc.withcolumn(col.name, lit(null).cast(col.datatype))
})
outdf.printschema()
outdf.show()
}
}
case class schemaclass(a: string, b: int, c: string, d: double)
score:1
it will work the same way if you have different json strings in the same rdd. when you have only one which is not matching with the schema then it will throw error.
eg.
val jsonincompleterddstring: rdd[string] = spark.sparkcontext.parallelize(list(jsondataincomplete, jsondata))
import spark.implicits._
val dsincomplete = spark.read.json(jsonincompleterddstring).as[schemaclass]
dsincomplete.printschema()
dsincomplete.show()
scala> dsincomplete.show()
+---+----+
| a| b|
+---+----+
|foo|null|
|foo| bar|
+---+----+
one way you can do is instead converting it as[person] you can build schema(structtype) from it and apply it while reading the json files,
import org.apache.spark.sql.encoders
val schema = encoders.product[person].schema
val path = "examples/src/main/resources/people.json"
val peopleds = spark.read.schema(schema).json(path).as[person]
peopleds.show
+-------+----+
| name| age|
+-------+----+
|michael|null|
+-------+----+
content of the code file is,
{"name":"michael"}
Source: stackoverflow.com
Related Query
- How to set default value to 'null' in Dataset parsed from RDD[String] applying Case Class as schema
- how to filter out a null value from spark dataframe
- How to set default value and also use Environment variable in application.conf
- How set default list value if system variable is not present in typesafe configuration?
- How to replace null NAN or Infinite values to default value in Spark Scala
- How to get the set of rows which contains null values from dataframe in scala using filter
- Scala - How to set default value instead None for Option[Int]
- How to set a column default value to Null? (in slick)
- Scala Default Values: How do I get a default value from type Any in Scala while creating a LinkedList?
- How to specify default value when extracting Option from Option case class
- How to assign non unique incrementing index (index markup) in Spark SQL, set back to 0 on joining the specific value from another dataframe
- Calculate minimum and maximum value from a set of values after applying a condition on column
- How to set the default value for a varargs parameter in scala?
- How to set a default Seq[Seo] if there is no result from databases?
- Set up default value of the the parameter of the element from the tuple argument
- How to get a value from Dataset and store it in a Scala value?
- How to set a parameter default value to be a pure function of another parameter value?
- Scaffeine: how to set different expiration time for default value
- How to find a list of distinct dataset from a particular value in Scala?
- Extract null value count effeciently from a groupBy relational dataset
- How in Spark to set default value if column has NULL?
- How to set a default map value as a new object in scala?
- How to set the value from one column to another in the same row by scala-slick
- how to remove key value from map in scala
- How to set hadoop configuration values from pyspark
- How to provide default value for implicit parameters at class level
- How to get a random element from a Set in Scala
- How to create a Spark Dataset from an RDD
- Scala: set a field value reflectively from field name
- How map only left value from scala Either?
More Query from same tag
- Scala : Define implicit functions in an object or in a class
- how to install spec2 with sbt
- Spark S3 I/O - [S3ServiceException] S3 HEAD request failed
- Uncaught SyntaxError: Invalid or unexpected token in javascript with scala
- Scala tail recursion error
- How to invert a map in scala?
- Best way to integrate autonomous and reactive behaviour in a Scala actor?
- Equivalent of kafkaConsumer.partitionsFor using reactive kafka (akka kafka streams)
- Play 2 configuration file : Reading a list of string variable
- Akka: Adding a delay to a durable mailbox
- JSON web tokens(JWT) implementation in Finagle stack
- How to train and predict in parallel via Spark MLlib?
- Mockito Stubbing consecutive calls (iterator-style stubbing), exceptions and return void
- Convert timestamp to string without loosing miliseconds in Spark (Scala)
- How do I create a generic Scala function that returns an object based on the generic type?
- Receiving POST submission in Play Framework/Scala REST service
- Using scala.xml.Node in scala play templates
- Scala Compilation Error using Partition
- How to get a number of rows for given query?
- Spark CombineByKey
- scala laziness - how to get rid of return statement in this palindrome search?
- Spark sql Unix timestamp returning wrong output
- Reader monad - how does it conform to Monad interface?
- How to not override a val of a parent class?
- Translating monad composition to SQL
- Can't install Plotly in Scala
- Search through last 2 integers and compare in a map
- Haskell, Scala, Clojure, what to choose for high performance pattern matching and concurrency
- Creating a map in config files
- Why isn't my akka logging working inside of play