score:-1

You need to make the following changes to your build file

1> build.sbt

val sparkVersion = "2.3.1"

name := "StreamJoiner"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"  

scalacOptions ++= Seq(
  "-deprecation",
  "-feature",
  "-Xfuture",
  "-encoding",
  "UTF-8",
  "-unchecked",
  "-language:postfixOps"
)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
"org.apache.spark" % "spark-streaming-kafka-0-10" % sparkVersion
)

mainClass in assembly := Some("com.test.StreamProcess")

assembly / test := {}

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("manifest.mf")       => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")   => MergeStrategy.discard
  case "reference.conf"                                 => MergeStrategy.concat
  case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
  case _                                                => MergeStrategy.first
}

2> Create a file called plugins.sbt in the project folder. This is the folder where your build.properties file exists(File containing the sbt version). This folder is on the same level as the src/ folder.

Put the following line into the plugins.sbt file

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

Note: This would help you create the Uber Jar for your project. You simply have to run sbt assembly in your Project's root.

3> Change your code to leverage Structured-Streaming. This would allow you to read all the topics in your sourceTopicSet as Streaming Dataframes.

val brokers = "serverip:9092"
 val groupId = "sparkApplication"
 val batchInterval = "2"
 val pollTimeout = "1000"
 val sourceTopics = "UberDataTopics"
 val sourceTopicsSet = sourceTopics.split(",").toSet
 val kafka_topic:String = "UberDataResultDataTopics" //publish topics
 val modelpath="hdfs://serverip:9000/Machine-Learning-models/KmeansModel"

 val sparkConf = new SparkConf().setAppName("ClusterUberStream")
 val spark = SparkSession.builder().master("local[2]").getOrCreate()


 val topicOneDF = spark.readStream.format("kafka").options(Map("kafka.bootstrap.servers"->brokers,"groupId"->group.id, "subscribe"->sourceTopicsSet(0),"startingOffset"->"earliest")).load()


 val topicTwoDF = spark.readStream.format("kafka").options(Map("kafka.bootstrap.servers"->brokers,"groupId"->group.id, "subscribe"->sourceTopicsSet(1),"startingOffset"->"earliest")).load()


val staticDF = spark.read.format("format_of_file").load()

Joining a streaming dataframe with a static dataframe

topicOneDF.join(staticDF,"type_of_join")

You can perform your ML algorithms to these Dataframes and when your done with your processing logic you simply need to write the resulting dataframe to Kafka as follows:

Assuming your final dataframe is called resultDF

resultDf.writeStream
.format("kafka")
.option("kafka.bootstrap.servers",brokers)
.option("topic","OUTPUT_TOPIC_NAME_HERE")
.start()

This would keep writing the dataframe results to the Output topic


Related Query