score:3

Accepted answer

ok - to cut the long discussion short, here's a working solution. basically you had two separate issues here:

  1. you expected spark to be able to parse an arbitrary java class into a dataframe - that is not the case, spark can only parse specific types, which are generally: scala collections; primitives; java.sql.date; and any subclass of scala.product - all case classes and tuples, for instance. so - as discussed in comments, the first thing to do is to convert your existing structure into such types.

  2. your schema didn't match your java class either - there were a few differences:

    • schema's playtype was an array of globalizedplaytimewindows, while your code created a single item and not an array
    • globalizedplaytimewindows schema contained benefitids which doesn't exist in the java class
    • playtypeids schema was an array, while the field with the same name in the java class was a map

so - i corrected all these (changed the schema to match the data, you can choose to fix these differently as long as they match) and completed the conversion of the java classes into case classes:

// corrected schemas:
val playtimewindow =
  structtype(
    structfield("starttime", datetype, true) ::
      structfield("endtime", datetype, true) :: nil)

val globalizedplaytimewindows =
  structtype(
    structfield( "countries", arraytype(stringtype, true), true )  ::
      structfield( "purchase", arraytype(playtimewindow, true), true )  ::
      structfield( "rental", arraytype(playtimewindow, true), true )  ::
      structfield( "free", arraytype(playtimewindow, true), true )  ::
      structfield( "download", arraytype(playtimewindow, true), true )  ::
      structfield( "adverti t", arraytype(playtimewindow, true), true )  ::
      structfield( "preorderexclusive", arraytype(playtimewindow, true), true )  ::
      structfield( "playtypeids", maptype(stringtype, arraytype(playtimewindow, true), true), true )  ::
      nil)

val schema =    structtype(
  structfield("id", stringtype, true) ::
    structfield("jazzcount", integertype, true) ::
    structfield("rockcount", integertype, true) ::
    structfield("classiccount", integertype, true) ::
    structfield("nonclassiccount", integertype, true) ::
    structfield("musictype", stringtype, true) ::
    structfield( "playtype", globalizedplaytimewindows, true) :: nil)

// note the use of java.sql.date, java.util.date not supported
case class playtimewindowscala(starttime: java.sql.date, endtime: java.sql.date)

case class globalizedplaytimewindowsscala (countries: list[string],
                                           purchase: list[playtimewindowscala],
                                           rental: list[playtimewindowscala],
                                           free: list[playtimewindowscala],
                                           download: list[playtimewindowscala],
                                           adverti t: list[playtimewindowscala],
                                           preorderexclusive: list[playtimewindowscala],
                                           playtypeids: map[string, list[playtimewindowscala]])

// some conversion methods:
def tosqldate(jdate: java.util.date): java.sql.date = new java.sql.date(jdate.gettime)

import scala.collection.javaconverters._

def toscalawindowlist(l: java.util.list[playtimewindow]): list[playtimewindowscala] = {
  l.asscala.map(javawindow => playtimewindowscala(tosqldate(javawindow.starttime), tosqldate(javawindow.endtime))).tolist
}

def toscalaglobalizedwindows(javaobj: globalizedplaytimewindows): globalizedplaytimewindowsscala = {
  globalizedplaytimewindowsscala(
    javaobj.countries.asscala.tolist,
    toscalawindowlist(javaobj.purchase),
    toscalawindowlist(javaobj.rental),
    toscalawindowlist(javaobj.free),
    toscalawindowlist(javaobj.download),
    toscalawindowlist(javaobj.adverti t),
    toscalawindowlist(javaobj.preorderexclusive),
    javaobj.playtypeids.asscala.mapvalues(toscalawindowlist).tomap
  )
}

val parsedjavadata: rdd[(string, int, int, int, int, string, globalizedplaytimewindows)] = mappingfile.map(x => {
   // your code producing the tuple
})

// convert to scala objects and into a row:
val inputdata = parsedjavadata.map{
  case (id, jazzcount, rockcount, classiccount, nonclassiccount, musictype, javaplaytype) =>
    val scalaplaytype = toscalaglobalizedwindows(javaplaytype)
    row(id, jazzcount, rockcount, classiccount, nonclassiccount, musictype, scalaplaytype)
}

// now - this works
val inputdatadf = sqlcontext.createdataframe(inputdata, schema)

Related Query

More Query from same tag