You need to make the following changes to your build file

1> build.sbt

val sparkVersion = "2.3.1"

name := "StreamJoiner"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"  

scalacOptions ++= Seq(

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.spark" %% "spark-mllib" % sparkVersion % Provided,
"org.apache.spark" % "spark-streaming-kafka-0-10" % sparkVersion

mainClass in assembly := Some("com.test.StreamProcess")

assembly / test := {}

assemblyJarName in assembly := s"${name.value}-${version.value}.jar"

assemblyMergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith("")       => MergeStrategy.discard
  case m if m.toLowerCase.matches("meta-inf.*\\.sf$")   => MergeStrategy.discard
  case "reference.conf"                                 => MergeStrategy.concat
  case x: String if x.contains("UnusedStubClass.class") => MergeStrategy.first
  case _                                                => MergeStrategy.first

2> Create a file called plugins.sbt in the project folder. This is the folder where your file exists(File containing the sbt version). This folder is on the same level as the src/ folder.

Put the following line into the plugins.sbt file

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

Note: This would help you create the Uber Jar for your project. You simply have to run sbt assembly in your Project's root.

3> Change your code to leverage Structured-Streaming. This would allow you to read all the topics in your sourceTopicSet as Streaming Dataframes.

val brokers = "serverip:9092"
 val groupId = "sparkApplication"
 val batchInterval = "2"
 val pollTimeout = "1000"
 val sourceTopics = "UberDataTopics"
 val sourceTopicsSet = sourceTopics.split(",").toSet
 val kafka_topic:String = "UberDataResultDataTopics" //publish topics
 val modelpath="hdfs://serverip:9000/Machine-Learning-models/KmeansModel"

 val sparkConf = new SparkConf().setAppName("ClusterUberStream")
 val spark = SparkSession.builder().master("local[2]").getOrCreate()

 val topicOneDF = spark.readStream.format("kafka").options(Map("kafka.bootstrap.servers"->brokers,"groupId"->, "subscribe"->sourceTopicsSet(0),"startingOffset"->"earliest")).load()

 val topicTwoDF = spark.readStream.format("kafka").options(Map("kafka.bootstrap.servers"->brokers,"groupId"->, "subscribe"->sourceTopicsSet(1),"startingOffset"->"earliest")).load()

val staticDF ="format_of_file").load()

Joining a streaming dataframe with a static dataframe


You can perform your ML algorithms to these Dataframes and when your done with your processing logic you simply need to write the resulting dataframe to Kafka as follows:

Assuming your final dataframe is called resultDF


This would keep writing the dataframe results to the Output topic

Related Query