score:7
I figured out my problem. When specifying the security protocol option, the option name must be prefixed with "kafka.". This is confusing because for a regular Kafka consumer the option is simply security.protocol but for the purpose of configuring Spark both bootstrap.servers and security.protocol (and presumably any other options/properties you may need) must be prefixed with kafka.. My original code was:
.option("security.protocol", "SASL_PLAINTEXT")
the correct option is:
.option("kafka.security.protocol", "SASL_PLAINTEXT")
Here's the complete code that works:
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
object Main {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.INFO)
Logger.getLogger("akka").setLevel(Level.INFO)
val spark = SparkSession.builder()
.master("local[*]")
.appName("myapp")
.config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("subscribe", "mytopic")
.load()
val query = lines.select("value").writeStream.format("console").start()
query.awaitTermination()
}
}
For reference, here's the content of the jaas.conf file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="c:/krb/mykeytab.keytab"
principal="myaccount@mydomain.int"
storeKey=true
useTicketCache=false
serviceName="myservicename";
};
Source: stackoverflow.com
Related Query
- How do I connect to a Kerberos-secured Kafka cluster with Spark Structured Streaming?
- How to use from_json with Kafka connect 0.10 and Spark Structured Streaming?
- How to use Spark Structured Streaming with Kafka Direct Stream?
- How to stream data from Kafka topic to Delta table using Spark Structured Streaming
- Spark Structured Streaming with Kafka - How to repartition the data and distribute the processing among worker nodes
- Reading Kafka Connect JSONConverter messages with schema using Spark Structured Streaming
- How to run Spark Streaming application with Kafka Direct Stream in IntelliJ IDEA?
- Deserializing structured stream from kafka with Spark
- How to use fully formed SQL with spark structured streaming
- How to use kafka.group.id and checkpoints in spark 3.0 structured streaming to continue to read from Kafka where it left off after restart?
- How to write to Kafka from Spark with a changed schema without getting exceptions?
- Kafka Spark Structured Streaming with SASL_SSL authentication
- Spark Streaming from Kafka topic throws offset out of range with no option to restart the stream
- How to run Spark application assembled with Spark 2.1 on cluster with Spark 1.6?
- How to connect with Hbase using spark
- How to reindex data from one Elasticsearch cluster to another with elasticsearch-hadoop in Spark
- How to convert kafka stream to spark RDD or Spark Dataframe
- How do I extract values from a kafka row via spark under structured streaming?
- How to fetch Kafka Stream and print it in Spark Shell?
- How to Read data from kafka topic with different schema (has some optional objects) in structured streaming
- How do I do functions.from_csv at spark structured stream
- How to connect to MQTT broker with authorization with MQTT Spark streaming library
- How to include kafka timestamp value as columns in spark structured streaming?
- How to predict kmeans cluster with Spark org.apache.spark.ml.clustering.{KMeans, KMeansModel}
- How to use Scala Case Class to map Kafka source in Spark Structured Streaming
- How to run Kafka as a stream for Apache Spark using Scala 2.11?
- In Spark Structured streaming with Kafka, how spark manages offset for multiple topics
- How to write a scala spark code to connect to Multiple cloud platforms(Azure,AWS) with Design Pattern or OOP
- How to prevent spark sql with kafka from adding backslash to JSON string in dataframe
- How to connect to Hive in Virtual Box from IntelliJ IDEA with Spark Scala
More Query from same tag
- How to handle response timeout?
- Scala shapeless zip issue
- generate unique sequence id per type
- Scala - are classes sufficient?
- How to bucket outputs in Scalding
- String permutation spark udf
- remote akka actor error to send serializable messages
- My ConcurrentHashmap's value type is List,how to make appending to that list thread safe?
- Traits and serialization/deserialization
- How would you define a delayed (by-name) parameter that accepts a function that takes parameters?
- How to query Spark StreamingContext with spark sql in zeppelin?
- Slick: CRUD extension: How to encapsulate implicit mapping:BaseColumnType[T]
- Operators on Scala short return Int?
- Exception in thread "main" java.lang.NoClassDefFoundError: com/twitter/chill/KryoBase
- spark dataset overwrite particular partition not working in spark 2.4
- Use LibGDX with Akka
- Should I persist a Spark dataframe if I keep adding columns in it?
- Calling overloaded super constructor in ScalaJS class that extends a native class
- Scala regex: how to extract year from string?
- filter the lines by two words Spark Streaming
- Scala 2.12 and Java 8 SAM interop doesn't compile
- Converting functions from type T to type U>:T
- scala manipulating Model object values
- Zip sparse sequences
- Scalamock Proxymock, is a 'does not expect' possible to implement
- Any built-in scala cache feature with size limit
- Using match .. case in scala
- Scala SparkSQL Create UDF to handle exception when column can be sometime struct and sometime string
- How does ScalaTest check equality?
- Parse nested maps in a function (gatling)