score:1
Accepted answer
edit: self-accepting after 5 days since there have been no good answers.
i've extracted the actor-based implementation into a lib, sparkka-streams, and it's been working for me thus far. when a solution to this question that is better shows up, i'll either update or deprecate the lib.
its usage is as follows:
// inputdstream can then be used to build elements of the graph that require integration with spark
val (inputdstream, feeddinput) = streaming.connection[int]()
val source = source.fromgraph(graphdsl.create() { implicit builder =>
import graphdsl.implicits._
val source = source(1 to 10)
val bcast = builder.add(broadcast[int](2))
val merge = builder.add(merge[int](2))
val add1 = flow[int].map(_ + 1)
val times3 = flow[int].map(_ * 3)
source ~> bcast ~> add1 ~> merge
bcast ~> times3 ~> feeddinput ~> merge
sourceshape(merge.out)
})
val reducedflow = source.runwith(sink.fold(0)(_ + _))
whenready(reducedflow)(_ shouldbe 230)
val sharedvar = ssc.sparkcontext.accumulator(0)
inputdstream.foreachrdd { rdd =>
rdd.foreach { i =>
sharedvar += i
}
}
ssc.start()
eventually(sharedvar.value shouldbe 165)
score:0
ref: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
you can do it like:
class streamstopped extends runtimeexception("stream stopped")
// serializable factory class
case class sourcefactory(start: int, end: int) {
def source = source(start to end).map(_.tostring)
}
class customreceiver(sourcefactory: sourcefactory)
extends receiver[string](storagelevel.memory_and_disk_2) with logging {
implicit val materializer = ....
def onstart() {
sourcefactory.source.runforeach { e =>
if (isstopped) {
// stop the source
throw new streamstopped
} else {
store(e)
}
} onfailure {
case _: streamstopped => // ignore
case ex: throwable => reporterror("source exception", ex)
}
}
def onstop() {}
}
val customreceiverstream = ssc.receiverstream(new customreceiver(sourcefactory(1,100))
Source: stackoverflow.com
Related Query
- Idiomatic way to turn an Akka Source into a Spark InputDStream
- Idiomatic way to use Spark DStream as Source for an Akka stream
- Best way to turn a Lists of Eithers into an Either of Lists?
- Better way to convert a string field into timestamp in Spark
- Split Akka Stream Source into two
- Idiomatic way to create a basic HTTP Post request with Akka HTTP
- Converting a callback-method implementation into an akka stream Source
- What is an idiomatic way to filter out Either Left in an akka stream?
- Turn list of key/value pairs into list of values per key in spark
- Proper way to inject dependencies into clustered persistent Akka actors?
- Idiomatic way of converting multiple Options into list
- Efficient and/or idiomatic way to turn Seq[Either[String, Int]] to (Seq[String], Seq[Int])
- Akka Stream from within a Spark Job to write into kafka
- Idiomatic Scala way of deserializing delimited strings into case classes
- The idiomatic way to manage shared state with Akka Streams
- Idiomatic way to collect elements removed from a mutable map into a second mutable map
- Scala: What is the idiomatic way to bring implicit parameters into scope?
- Spark (Scala) Turn a list with duplicates into a map of (list_entry, count)
- What's the cleanest way to turn a TraversableOnce[T] into a Multiset-like or Histogram-ish Map[T, Int]?
- Spark (Scala): How to turn an Array[Row] into either a DataSet[Row] or a DataFrame?
- What is the correct way to dynamically pass a list or variable into a SQL cell in a spark databricks notebook in Scala?
- idiomatic way to perform unique task using akka cluster
- Multiplex akka streams source into two copies
- Is there any way in Scala spark to transforming this dataframe into this?
- Turn a Spark dataframe Groupby into a sequence of dataframes
- Turn textfile into dataframe with Scala Spark
- Fast way to collect spark dataframe column value into a list (scala)
- How to turn this simple Spark Streaming code into a Multi threaded one?
- Feeding an Akka stream sink into an akka stream source
- Idiomatic Scala way to come a list of Eithers into a Left or Right depending on the lists content
More Query from same tag
- Including test Playframework sbt projects inside the sbt project for a Playframework library
- Case class .copy() and large objects
- how to set scala Implicit value in java code?
- How to translate type projections into PDTs in implicits?
- Using version specific scala compiler options
- Is it safe to override `receive` in an Akka FSM?
- How do I compare each column in a table using DataFrame by Scala without caring what the column is?
- Creating Case Class from parameters of an Array
- Scala resolving to wrong override in Comparator.thenComparing
- scala: for comprehension with Option
- Variable number of Futures
- Running Scala Maven Project
- How can I overwrite the values of a tuple in scala?
- Spark SQL: Error when using aliased column in SELECT clause
- Spark - Lost task in stage
- ScalaFx Bind ImageView to custom Enumeration
- Initializing the factory at compile time
- IndexedSeq-based equivalent of Stream?
- scala, Passing an operator as argument of a function
- Trouble with generic collections in Scala
- Scala - case class with 100 fields (StackOverflowError)
- flatMap results when read from a file is different from same line passed as a string
- Using case when in Spark Scala
- Scala quasiquote concatenation
- Why is scala Await.result timing out in repl when passed the same future twice?
- Order of column values gets changed after selecting a column from dataframe Scala
- Reference Scala constructor args in default value
- Spark "CodeGenerator: failed to compile" with Dataset.groupByKey
- Is there Thrift marshaling / unmarshaling support for Spray?
- Is foldRight equivalent to foldLeft given a noncommutative associative operation?