score:13
Accepted answer
From the Spark perspective value
is just a byte sequence. It has no knowledge about the serialization format or content. To be able to extract the filed you have to parse it first.
If data is serialized as a JSON string you have two options. You can cast
value
to StringType
and use from_json
and provide a schema:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
or cast
to StringType
, extract fields by path using get_json_object
:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
and cast
later to the desired types.
Source: stackoverflow.com
Related Query
- How to read records in JSON format from Kafka using Structured Streaming?
- How to read json data using scala from kafka topic in apache spark
- How to use kafka.group.id and checkpoints in spark 3.0 structured streaming to continue to read from Kafka where it left off after restart?
- How to stream data from Kafka topic to Delta table using Spark Structured Streaming
- How do I read binary serialized Avro (Confluent Platform) from Kafka using Spark Streaming
- Spark (2.2): deserialise Thrift records from Kafka using Structured Streaming
- How to Read data from kafka topic with different schema (has some optional objects) in structured streaming
- Processing json data from kafka using structured streaming
- How to read records from Kafka topic from beginning in Spark Streaming?
- How to read a json response from a akka-http response entity using json4s
- How can I acqurie the JSON data from Kafka using SparkStreaming
- Error: Using Spark Structured Streaming to read and write data to another topic in kafka
- How to process Scala case class objects from Kafka using streaming queries?
- How can i create a dataframe from a complex JSON in string format using Spark scala
- Using Play for Scala, how do I read in JSON body {a=...,b=...,c...} from POST request?
- Spark Structured Streaming cannot read from kafka inside docker
- How do I read in all files including subfolders when streaming from folder using spark streaming in scala?
- How to read list of string from JSON file using Scala
- How can I skip ssl.truststore.password property while reading from Kafka using Spark Structured streaming?
- how to read an array of objects from a json config file using scala
- How to use Spark Structured Streaming with Kafka Direct Stream?
- How to parse and extract information from json array using json4s
- How to continuously monitor a directory by using Spark Structured Streaming
- Create Spark DataFrame in Spark Streaming from JSON Message on Kafka
- How to read simple text file from Google Cloud Storage using Spark-Scala local Program
- Limit kafka batch size when using Spark Structured Streaming
- Combining/Updating Cassandra Queried data to Structured Streaming receieved from Kafka
- How can I deserialize from JSON with Scala using *non-case* classes?
- Read json from Kafka and write json to other Kafka topic
- Spark Structured Streaming with Kafka - How to repartition the data and distribute the processing among worker nodes
More Query from same tag
- Using scala.collection.immutable.Map inside my java class
- Object pool pattern with FS2
- How to shorten the syntax of passing a function as a parameter?
- Create a node in scala
- Parallel design of program working with Flink and scala
- How would I do multiple concurrency in Scala, without the need for messages?
- How to create a Row from a given case class?
- Java HashMap in Scala throws error
- Is there a way to define the required sbt version in build.sbt?
- Graphics2D transformation result does not match manual transformation
- Build and use custom scala-library to extend primitive datatypes
- Error downloading org.scalameta:semanticdb-scalac_2.13.6:4.4.10
- Slick 3 Transactions checking for failure Multiple Actions
- Why is it not possible (in scala) to provide implementation for an abstract override method in the implementing base class
- Ubuntu Duplicate sources.list entry error on sudo apt-get install scala
- How to contribute modules in Play Framework 2.0?
- How to access config map values in scalatest?
- Scala Pattern Matching on Generic Traits
- Can I use Java reflection to get the value of a member that has been added with a Scala macro annotation?
- Playframework & Guice without routing
- Proper method to access Play! cache in Scala templates?
- Spark Dynamically Json Parsing into key value strings
- writing a GridCacheStore implementation in scala with gridgain
- Mixing JavaScript and Scala in a Play template
- Accessing (/importing) Scala Enumerations
- Playframewrok [Scala]: Put different parameters other than @title and @content in main.scala.html
- ScalaTest Matcher For a Java Lambda Call that's Run in a Different Thread
- Function that ensures html start and end tags are correct
- Why does IDEA report errors for build.sbt in a new sbt project?
- How to convert a generic method into generic function