score:100
Spark Datasets
require Encoders
for data type which is about to be stored. For common types (atomics, product types) there is a number of predefined encoders available but you have to import these first from SparkSession.implicits
to make it work:
val sparkSession: SparkSession = ???
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
Alternatively you can provide directly an explicit
import org.apache.spark.sql.{Encoder, Encoders}
val dataset = sparkSession.createDataset(dataList)(Encoders.product[SimpleTuple])
or implicit
implicit val enc: Encoder[SimpleTuple] = Encoders.product[SimpleTuple]
val dataset = sparkSession.createDataset(dataList)
Encoder
for the stored type.
Note that Encoders
also provide a number of predefined Encoders
for atomic types, and Encoders
for complex ones, can derived with ExpressionEncoder
.
Further reading:
- For custom objects which are not covered by built-in encoders see How to store custom objects in Dataset?
- For
Row
objects you have to provideEncoder
explicitly as shown in Encoder error while trying to map dataframe row to updated row - For debug cases, case class must be defined outside of the Main https://stackoverflow.com/a/34715827/3535853
score:-1
I'd clarify with an answer to my own question, that if the goal is to define a simple literal SparkData frame, rather than use Scala tuples and implicit conversion, the simpler route is to use the Spark API directly like this:
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
val simpleSchema = StructType(
StructField("a", StringType) ::
StructField("b", IntegerType) ::
StructField("c", IntegerType) ::
StructField("d", IntegerType) ::
StructField("e", IntegerType) :: Nil)
val data = List(
Row("001", 1, 0, 3, 4),
Row("001", 3, 4, 1, 7),
Row("001", null, 0, 6, 4),
Row("003", 1, 4, 5, 7),
Row("003", 5, 4, null, 2),
Row("003", 4, null, 9, 2),
Row("003", 2, 3, 0, 1)
)
val df = spark.createDataFrame(data.asJava, simpleSchema)
score:68
For other users (yours is correct), note that you it's also important that the case class
is defined outside of the object
scope. So:
Fails:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
val dataset = sparkSession.createDataset(dataList)
}
}
Add the implicits, still fails with the same error:
object DatasetTest {
case class SimpleTuple(id: Int, desc: String)
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Works:
case class SimpleTuple(id: Int, desc: String)
object DatasetTest {
val dataList = List(
SimpleTuple(5, "abc"),
SimpleTuple(6, "bcd")
)
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder
.master("local")
.appName("example")
.getOrCreate()
import sparkSession.implicits._
val dataset = sparkSession.createDataset(dataList)
}
}
Here's the relevant bug: https://issues.apache.org/jira/browse/SPARK-13540, so hopefully it will be fixed in the next release of Spark 2.
(Edit: Looks like that bugfix is actually in Spark 2.0.0... So I'm not sure why this still fails).
Source: stackoverflow.com
Related Query
- Why is "Unable to find encoder for type stored in a Dataset" when creating a dataset of custom case class?
- Unable to find encoder for type stored in a Dataset for streaming mongo db data through Kafka
- Why is the error "Unable to find encoder for type stored in a Dataset" when encoding JSON using case classes?
- Spark Error: Unable to find encoder for type stored in a Dataset
- Unable to find encoder for type stored in a Dataset in Spark Scala
- Unable to find encoder for Decimal type stored in a DataSet
- Unable to find encoder for type stored in a Dataset
- Why does reading stream from Kafka fail with "Unable to find encoder for type stored in a Dataset"?
- Unable to find encoder for type AccessLog. An implicit Encoder[AccessLog] is needed to store AccessLog instances in a Dataset
- Unable to find encoder for type stored in a Dataset. in spark structured streaming
- Why a encoder is needed for creating dataset in spark
- How to create Dataset with case class Type Parameter ? (Unable to find encoder for type T)
- "Unable to find encoder for type stored in a Dataset" even spark.implicits._ is imported?
- "Unable to find encoder for type stored in a Dataset" and "not enough arguments for method map"?
- Schema for type TypeTag[java.sql.Timestamp] is not supported when creating Spark Dataset
- Spark: Unable to find encoder for type Unit
- Error: Unable to find encoder for type org.apache.spark.sql.Dataset[(String, Long)]
- Scala - Encoder missing for type stored in dataset
- Why does creating a Dataset with LinearRegressionModel fail with "No Encoder found for org.apache.spark.ml.regression.LinearRegressionModel"?
- When using scala path dependent type as function codomain, why is it impossible to add alias for that function?
- Why parameter type is needed for expanded function when there are more than 1 parameter?
- Why does scala compiler fail to find implicit parameter value/conversion when it is an overload and has generic type param?
- Why does Spark report "error: not found: type Properties" when loading a data set?
- Why does the explicit syntax for creating Tuples only allow AnyRefs as type annotations?
- Why I'm getting type mismatch when filtering rows for a specific aggregate with spark sql?
- Why is Scala unable to infer the type of an anonymous function when there is method overloading and only one method takes a function?
- type mismatch error when creating Reads for Play 2.1
- Why is my data type Any when it started as Int?
- Struct data type when creating dataframe with createDataFrame in Scala
- Swagger - Why is Swagger creating a request body field when I have not written an annotation for one?
More Query from same tag
- ClassPath issue with Spark2 Streaming on Yarn
- Kafka popular hashtags counting
- Intelllij doesn't support Play Framework very well?
- Asterisk in val name using Scala
- REPL returns RDD values but SBT won't compile
- Top N items from a Spark DataFrame/RDD
- Using scala.Future with Java 8 lambdas
- Several case classes with the same behavior in Scala
- How to add dedendency in SBT project
- Spark 1.6: drop column in DataFrame with escaped column names
- Concurrency in Play 2.1 or above
- Scala object initialization bug
- How to call a Java varargs method with Scala's AnyVal?
- Java heap not getting the memory requested in spark-submit
- MultiMap in Scala
- Joda-Date Mapper for Slick - MappedColumnTyoe
- How spark driver transfer socket object to worker?
- Scala pattern matching on None and Some() in the same case
- What is the purpose of Format[T] if you have a Reads[T] and Writes [T]?
- Unrelated "protected" entry in stack trace
- How to compile and execute scala code at run-time in Scala3?
- Scala case class enriched with abstract component (Cake Pattern)
- Dynamic content in the root template in Play framework (Scala)
- Runtime Exception during executing a function
- passing RDD into an utility function
- scala: reflection on an inner class of abstract type
- Why is there no foldLeft or foreach on scalacheck Gen
- Akka ask Broadcast
- How to define Jackson databinding dependency in build.sbt?
- scala TreeMap - how to get indexOf and element to use with slice or view method