score:1

Accepted answer

i found its not easy to read kafka avro format data. i developed code in spark streaming using twitter bijection but i am getting invert byte error any suggestion.

error : exception in thread "main" org.apache.spark.sparkexception: job aborted due to stage failure: task 0 in stage 1.0 failed 1 times, most recent failure: lost task 0.0 in stage 1.0 (tid 1, localhost, executor driver): com.twitter.bijection.inversionfailure: failed to invert: [b@5335860

new code i used:

import com.twitter.bijection.injection
import com.twitter.bijection.avro.genericavrocodecs
import io.confluent.kafka.schemaregistry.client.{cachedschemaregistryclient, schemaregistryclient}
import io.confluent.kafka.schemaregistry.client.rest.restservice
import io.confluent.kafka.serializers.{kafkaavrodecoder, kafkaavrodeserializer}
import org.apache.kafka.common.serialization.{bytearraydeserializer, stringdeserializer}
import org.apache.spark.sparkconf
import org.apache.spark.streaming.{seconds, streamingcontext}
import org.apache.avro.generic.genericrecord
import org.apache.avro.schema
import org.apache.spark.streaming.kafka010.kafkautils
import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent
import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe
import org.apache.avro.schema
import org.apache.avro.hadoop.io.avrodeserializer
import org.apache.commons.codec.stringdecoder


object readkafkaavro1 {

   object injection {

  val schemaregistryurl = "http://vtorppsdv01.corp.moneris.com:8081"
 val topics = "b24_tx_financial_formatted_clean"
   val subjectvaluename = topics + "-value"
    val restservice = new restservice(schemaregistryurl)
    val valuerestresponseschema = restservice.getlatestversion(subjectvaluename)
  val parser = new schema.parser()
 //   val schema = parser.parse(getclass.getresourceasstream("src\\main\\resources\\b24_tx_financial_formatted_clean.avsc"))
    val schema = parser.parse((valuerestresponseschema.getschema))
        val injection: injection[genericrecord, array[byte]] = genericavrocodecs.tobinary(schema)
 }

  def main(args: array[string]): unit = {

    val conf = new sparkconf().setappname("readkafkaavro").setmaster("local[*]")
    val streamingctx = new streamingcontext(conf,seconds(30))
    val schemaregistryurl1 = "http://vtorppsdv01.corp.moneris.com:8081"
    val topics = array("b24_tx_financial_formatted_clean")

    streamingctx.sparkcontext.setloglevel("error")

    val kafkaparms = map[string,object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
      "key.deserializer" -> "org.apache.kafka.common.serialization.stringdeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.bytearraydeserializer",
      "group.id" -> "b24_ptlf_eim_processing" ,
      "auto.offset.reset" -> "earliest",
      "auto.commit.interval.ms" -> "2000",
      "schema.registry.url" -> schemaregistryurl1,
      "enable.auto.commit" -> (false: java.lang.boolean),
      "security.protocol" -> "ssl",
      "ssl.keystore.location" -> "c:\\users\\pawan.likhi\\desktop\\spark code\\simplekafkaconsumer\\kafka-eim-dev.jks",
      "ssl.keystore.password" -> "bw^1=|sy$j",
      "ssl.key.password" -> "bw^1=|sy$j",
      "ssl.truststore.location" -> "c:\\users\\pawan.likhi\\desktop\\spark code\\simplekafkaconsumer\\cpbp-ca-dev.jks",
      "ssl.truststore.password" -> "ib>3v$6m@9",
      "ssl.keystore.type" -> "jceks",
      "ssl.truststore.type" -> "jceks",
      "specific.avro.reader" -> "true"
    )

    val inputstream = kafkautils.createdirectstream[string,array[byte]](streamingctx,preferconsistent,subscribe[string,array[byte]](topics,kafkaparms))



    val recordstream = inputstream.map(msg => injection.injection.invert(msg.value()).get)
   // .map(record => (record.get("authorizationtransactionsource"),record.get("authorizationtransactionsourceid")))



    inputstream.map(x => (x.key,x.value)).print()

    //recordstream.print()


    recordstream.print()

    streamingctx.start()
    streamingctx.awaittermination()


  }
}

Related Query

More Query from same tag