score:1
Accepted answer
i found its not easy to read kafka avro format data. i developed code in spark streaming using twitter bijection but i am getting invert byte error any suggestion.
error : exception in thread "main" org.apache.spark.sparkexception: job aborted due to stage failure: task 0 in stage 1.0 failed 1 times, most recent failure: lost task 0.0 in stage 1.0 (tid 1, localhost, executor driver): com.twitter.bijection.inversionfailure: failed to invert: [b@5335860
new code i used:
import com.twitter.bijection.injection
import com.twitter.bijection.avro.genericavrocodecs
import io.confluent.kafka.schemaregistry.client.{cachedschemaregistryclient, schemaregistryclient}
import io.confluent.kafka.schemaregistry.client.rest.restservice
import io.confluent.kafka.serializers.{kafkaavrodecoder, kafkaavrodeserializer}
import org.apache.kafka.common.serialization.{bytearraydeserializer, stringdeserializer}
import org.apache.spark.sparkconf
import org.apache.spark.streaming.{seconds, streamingcontext}
import org.apache.avro.generic.genericrecord
import org.apache.avro.schema
import org.apache.spark.streaming.kafka010.kafkautils
import org.apache.spark.streaming.kafka010.locationstrategies.preferconsistent
import org.apache.spark.streaming.kafka010.consumerstrategies.subscribe
import org.apache.avro.schema
import org.apache.avro.hadoop.io.avrodeserializer
import org.apache.commons.codec.stringdecoder
object readkafkaavro1 {
object injection {
val schemaregistryurl = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = "b24_tx_financial_formatted_clean"
val subjectvaluename = topics + "-value"
val restservice = new restservice(schemaregistryurl)
val valuerestresponseschema = restservice.getlatestversion(subjectvaluename)
val parser = new schema.parser()
// val schema = parser.parse(getclass.getresourceasstream("src\\main\\resources\\b24_tx_financial_formatted_clean.avsc"))
val schema = parser.parse((valuerestresponseschema.getschema))
val injection: injection[genericrecord, array[byte]] = genericavrocodecs.tobinary(schema)
}
def main(args: array[string]): unit = {
val conf = new sparkconf().setappname("readkafkaavro").setmaster("local[*]")
val streamingctx = new streamingcontext(conf,seconds(30))
val schemaregistryurl1 = "http://vtorppsdv01.corp.moneris.com:8081"
val topics = array("b24_tx_financial_formatted_clean")
streamingctx.sparkcontext.setloglevel("error")
val kafkaparms = map[string,object]("bootstrap.servers" -> "vtorppsdv01.corp.moneris.com:9093,vtorppsdv02.corp.moneris.com:9093,vtorppsdv03.corp.moneris.com:9093",
"key.deserializer" -> "org.apache.kafka.common.serialization.stringdeserializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.bytearraydeserializer",
"group.id" -> "b24_ptlf_eim_processing" ,
"auto.offset.reset" -> "earliest",
"auto.commit.interval.ms" -> "2000",
"schema.registry.url" -> schemaregistryurl1,
"enable.auto.commit" -> (false: java.lang.boolean),
"security.protocol" -> "ssl",
"ssl.keystore.location" -> "c:\\users\\pawan.likhi\\desktop\\spark code\\simplekafkaconsumer\\kafka-eim-dev.jks",
"ssl.keystore.password" -> "bw^1=|sy$j",
"ssl.key.password" -> "bw^1=|sy$j",
"ssl.truststore.location" -> "c:\\users\\pawan.likhi\\desktop\\spark code\\simplekafkaconsumer\\cpbp-ca-dev.jks",
"ssl.truststore.password" -> "ib>3v$6m@9",
"ssl.keystore.type" -> "jceks",
"ssl.truststore.type" -> "jceks",
"specific.avro.reader" -> "true"
)
val inputstream = kafkautils.createdirectstream[string,array[byte]](streamingctx,preferconsistent,subscribe[string,array[byte]](topics,kafkaparms))
val recordstream = inputstream.map(msg => injection.injection.invert(msg.value()).get)
// .map(record => (record.get("authorizationtransactionsource"),record.get("authorizationtransactionsourceid")))
inputstream.map(x => (x.key,x.value)).print()
//recordstream.print()
recordstream.print()
streamingctx.start()
streamingctx.awaittermination()
}
}
Source: stackoverflow.com
Related Query
- Row.key and row.value not working in spark structure streaming code
- calculating average in Spark streaming not working : issue w/ updateStateByKey and instantiating class
- Compare Value of Current and Previous Row in Spark
- Spark streaming is not working in Standalone cluster deployed in VM
- Apache Toree and Spark Scala Not Working in Jupyter
- Scala spark reduce by key and find common value
- Spark streaming if(!rdd.partitions.isEmpty) not working
- Google secret manager API and google storage API not working with Apache Spark
- How to check whether multiple columns values of a row are not null and then add a true/false resulting column in Spark Scala
- Spark and Scala, add new column with value from another dataframe by mapping common key
- Reverse the Key and value of a Pair in Spark
- Add a column to spark dataframe which contains list of all column names of the current row whose value is not null
- dataframe columns as key and column data as value group by id in spark scala
- Skip the current row COUNT and sum up the other COUNTS for current key with Spark Dataframe
- Fetch the partial value from a column having key value pairs and assign it to new column in Spark Dataframe
- Spark code to find maximum not working
- Spark structured streaming avoid delay and checkpointing: startingOffsets latest does not work?
- elasticsearch 1.6 elasticsearch command and code is not working in jdk11
- How do I save a file in a Spark PairRDD using the key as the filename and the value as the contents?
- scala - create a mutable map and with default value as (0,0) if key not exist
- Spark SQL sum rows with the same key and appending the sum value
- How to make new rows and columns based on array value of row Spark DataFrame
- Iterate over a spark dataframe and store each row value in variables of another class
- How to fill column with value taken from a (non-adjacent) previous row without natural partitioning key using Spark Scala DataFrame
- Writing File to a directory using java.io.File.mkdirs() and then accessing it not working in spark cluster mode
- Apache Spark , detect key change on a row and group rows
- Spark Scala code not working similarly then its pyspark version
- Spark SQL find min value in column and get whole row
- Update Map type columns in Cassandra with new key value pairs and not completely overwrite the map
- Transform structure of Spark DF. Create one column or row for each value in a column. Impute values
More Query from same tag
- In what binary format should I ingest Kafka topic to Druid, whose events are sent from FlinkKafkaProducer[<Scala case class>]?
- Create and fill a JsArray with play-json transformer
- Trait, FunctionN, or trait-inheriting-FunctionN in Scala?
- Can I use monad transformers to simplify this composition?
- Writing SQL in Play with Scala?
- Scala source external to SBT project folder
- Updating a val in scala by reassigning
- How do I create a function that takes any Seq[T,Int] and returns the original type of the Seq.
- How to mute Intellij IDEA after sbt project is imported
- How to resolve error "Cannot resolve symbol ==="
- Leveraging a generic return type in Scala
- Class re-appears in a class loader after using embedded interpreter
- PosZInt and PosInt in scala?
- Filter columns in a Spark Scala Dataframe by a specific value when the columns are unknown
- Why accesing DataFrame from UDF results in NullPointerException?
- Calculate a product of an RDD[Array[(String, Int)]] and a RDD[(String, Double)]
- Why scala-swing Button can be assigned to MainFrame's contents?
- Scala-Lift project in Eclipse Scala IDE errors
- Flatten multiple dataframe columns to one
- error: not found: value sc
- How to write asInstanceOfOpt[T] where T <: Any
- akka - get list of ActorRef from ActorSelection
- Case Switch Statement with Class Object in Scala
- Scala + PostgreSQL + Homebrew: Setting up a local database on Mac
- Insert data in H2 using HTML form
- parameter default value involving type bound
- spark streaming + querying hive table in every streaming batch?
- Default value for method parameter is not considered
- Creating a fixture in Specs 2
- Why I can store null in Int when doing conversion from Java but not otherwise?