score:3
ok - to cut the long discussion short, here's a working solution. basically you had two separate issues here:
you expected spark to be able to parse an arbitrary java class into a dataframe - that is not the case, spark can only parse specific types, which are generally: scala collections; primitives;
java.sql.date
; and any subclass ofscala.product
- all case classes and tuples, for instance. so - as discussed in comments, the first thing to do is to convert your existing structure into such types.your
schema
didn't match your java class either - there were a few differences:- schema's
playtype
was an array ofglobalizedplaytimewindows
, while your code created a single item and not an array globalizedplaytimewindows
schema containedbenefitids
which doesn't exist in the java classplaytypeids
schema was an array, while the field with the same name in the java class was amap
- schema's
so - i corrected all these (changed the schema to match the data, you can choose to fix these differently as long as they match) and completed the conversion of the java classes into case classes:
// corrected schemas:
val playtimewindow =
structtype(
structfield("starttime", datetype, true) ::
structfield("endtime", datetype, true) :: nil)
val globalizedplaytimewindows =
structtype(
structfield( "countries", arraytype(stringtype, true), true ) ::
structfield( "purchase", arraytype(playtimewindow, true), true ) ::
structfield( "rental", arraytype(playtimewindow, true), true ) ::
structfield( "free", arraytype(playtimewindow, true), true ) ::
structfield( "download", arraytype(playtimewindow, true), true ) ::
structfield( "adverti t", arraytype(playtimewindow, true), true ) ::
structfield( "preorderexclusive", arraytype(playtimewindow, true), true ) ::
structfield( "playtypeids", maptype(stringtype, arraytype(playtimewindow, true), true), true ) ::
nil)
val schema = structtype(
structfield("id", stringtype, true) ::
structfield("jazzcount", integertype, true) ::
structfield("rockcount", integertype, true) ::
structfield("classiccount", integertype, true) ::
structfield("nonclassiccount", integertype, true) ::
structfield("musictype", stringtype, true) ::
structfield( "playtype", globalizedplaytimewindows, true) :: nil)
// note the use of java.sql.date, java.util.date not supported
case class playtimewindowscala(starttime: java.sql.date, endtime: java.sql.date)
case class globalizedplaytimewindowsscala (countries: list[string],
purchase: list[playtimewindowscala],
rental: list[playtimewindowscala],
free: list[playtimewindowscala],
download: list[playtimewindowscala],
adverti t: list[playtimewindowscala],
preorderexclusive: list[playtimewindowscala],
playtypeids: map[string, list[playtimewindowscala]])
// some conversion methods:
def tosqldate(jdate: java.util.date): java.sql.date = new java.sql.date(jdate.gettime)
import scala.collection.javaconverters._
def toscalawindowlist(l: java.util.list[playtimewindow]): list[playtimewindowscala] = {
l.asscala.map(javawindow => playtimewindowscala(tosqldate(javawindow.starttime), tosqldate(javawindow.endtime))).tolist
}
def toscalaglobalizedwindows(javaobj: globalizedplaytimewindows): globalizedplaytimewindowsscala = {
globalizedplaytimewindowsscala(
javaobj.countries.asscala.tolist,
toscalawindowlist(javaobj.purchase),
toscalawindowlist(javaobj.rental),
toscalawindowlist(javaobj.free),
toscalawindowlist(javaobj.download),
toscalawindowlist(javaobj.adverti t),
toscalawindowlist(javaobj.preorderexclusive),
javaobj.playtypeids.asscala.mapvalues(toscalawindowlist).tomap
)
}
val parsedjavadata: rdd[(string, int, int, int, int, string, globalizedplaytimewindows)] = mappingfile.map(x => {
// your code producing the tuple
})
// convert to scala objects and into a row:
val inputdata = parsedjavadata.map{
case (id, jazzcount, rockcount, classiccount, nonclassiccount, musictype, javaplaytype) =>
val scalaplaytype = toscalaglobalizedwindows(javaplaytype)
row(id, jazzcount, rockcount, classiccount, nonclassiccount, musictype, scalaplaytype)
}
// now - this works
val inputdatadf = sqlcontext.createdataframe(inputdata, schema)
Source: stackoverflow.com
Related Query
- How to resolve scala.MatchError when creating a Data Frame
- How do I resolve "WILL_NOT_PERFORM" MS AD reply when trying to change password in scala w/ the unboundid LDAP SDK?
- Scala - How to split the probability column (column of vectors) that we obtain when we fit the GMM model to the data in to two separate columns?
- How to rename spark data frame output file in AWS in spark SCALA
- Spark Scala - How do I iterate rows in dataframe, and add calculated values as new columns of the data frame
- How to create Data frame from csv in Spark(using scala) when the first line is the schema?
- How to flatten a data frame in apache spark | Scala
- How to call a function which returns an int in Scala data frame and append it
- How to merge all part files in a folder created by SPARK data frame and rename as folder name in scala
- How do I resolve a MongoDB timeout error when connecting via the Scala Play! framework?
- How to use withColumn with condition for the each row in Scala / Spark data frame
- How can I convert all decimal columns in a Scala data frame to double type?
- How to remove key from map in spark data frame scala
- how to access map values and keys stored in a data frame in scala spark
- How to add data frame contents in scala ignore null values
- How to filter a map<String, Int> in a data frame : Spark / Scala
- SPARK: How to get day difference between a data frame column and timestamp in SCALA
- How to filter text in Scala SQL context data frame based on a list of keywords
- CREATING TABLE USING DATA FRAME in scala
- How can I resolve dependencies when cross-compiling in Scala with sbt?
- Scala - How to pass a string value to a data frame filter (Spark-Shell)
- Struct data type when creating dataframe with createDataFrame in Scala
- How to convert scala spark.sql.dataFrame to Pandas data frame
- how to retrieve data from data frame 1 contents that do not have in data frame 2 in Scala
- How to concat multiple columns in a data frame using Scala
- How to add a column from different data frame : Scala Frame
- How to apply a Scala Function to two different columns data frame Scala
- In spark Data frame how to convert Date column of type string to Date column of type Date using scala
- Scala for comprehension how to avoid creating of Future when passing results
- How to use a function or method on a Spark data frame column for transformation using Scala
More Query from same tag
- Handle twitter4j User Stream 420 Exception
- Mongodb's Aggregation Framework with Subset and Scala
- Object private fields in scala
- Right associative with operator :
- How to run scala code in spark container using docker?
- Scala, Play: IntelliJ cannot serialize Map[String, List[String]]
- Scala: How to Unit Test a function I have which makes an API call using mock/stub?
- Reading from a file in play on Heroku
- How to return selectively multiple rows from one rows in Scala
- Shapless coproduct sub-typing
- spark-shell cannot find the class to be extended
- The type system in Scala is Turing complete. Proof? Example? Benefits?
- Akka combining Sinks without access to Flows
- System Environment Variable Not Found
- Method return type covariance
- Retrieving name attribute from Scala Enumeration Value
- Accessing Class defined inside an Object in Scala
- Create function which take objects that declare an implicit writes
- UnmanagedJars in sbt plugin
- Scala trying to count instances of a digit in a number
- Scala Array manipulation task which requires two pointers [Scala]
- Programming Xbox games with Scala
- IntelliJ IDEA unable to resolve build.sbt AND no sbt tool window is present
- How to transform Array(e1, e2) into (e1, e2) using Scala 2.11.6 API only?
- Setting up Scaladoc for IntelliJ
- Sbt unable to find List class definition on Scala 3.0.0-M2
- What is a current status of spray-json version 2.0?
- Akka pattern for instantiating an actor only if the entity it represents has previously been created?
- How to compose two different `State Monad`?
- Scala require() equivalent in Kotlin