score:7

I figured out my problem. When specifying the security protocol option, the option name must be prefixed with "kafka.". This is confusing because for a regular Kafka consumer the option is simply security.protocol but for the purpose of configuring Spark both bootstrap.servers and security.protocol (and presumably any other options/properties you may need) must be prefixed with kafka.. My original code was:

.option("security.protocol", "SASL_PLAINTEXT")

the correct option is:

.option("kafka.security.protocol", "SASL_PLAINTEXT")

Here's the complete code that works:

import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}

object Main {
  def main(args: Array[String]) {
    Logger.getLogger("org").setLevel(Level.INFO)
    Logger.getLogger("akka").setLevel(Level.INFO)

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("myapp")
      .config("spark.executor.extraJavaOptions", "java.security.auth.login.config=c:/krb/jaas.conf")
      .getOrCreate()

    import spark.implicits._

    val lines = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "broker1:9100,broker2:9100")
      .option("kafka.security.protocol", "SASL_PLAINTEXT")
      .option("subscribe", "mytopic")
      .load()

    val query = lines.select("value").writeStream.format("console").start()
    query.awaitTermination()
  }
}

For reference, here's the content of the jaas.conf file:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="c:/krb/mykeytab.keytab"
  principal="myaccount@mydomain.int"
  storeKey=true
  useTicketCache=false
  serviceName="myservicename";
};

Related Query

More Query from same tag