score:1

Accepted answer

edit: self-accepting after 5 days since there have been no good answers.

i've extracted the actor-based implementation into a lib, sparkka-streams, and it's been working for me thus far. when a solution to this question that is better shows up, i'll either update or deprecate the lib.

its usage is as follows:

// inputdstream can then be used to build elements of the graph that require integration with spark
val (inputdstream, feeddinput) = streaming.connection[int]()
val source = source.fromgraph(graphdsl.create() { implicit builder =>

  import graphdsl.implicits._

  val source = source(1 to 10)

  val bcast = builder.add(broadcast[int](2))
  val merge = builder.add(merge[int](2))

  val add1 = flow[int].map(_ + 1)
  val times3 = flow[int].map(_ * 3)
  source ~> bcast ~> add1 ~> merge
            bcast ~> times3 ~> feeddinput ~> merge

  sourceshape(merge.out)
})

val reducedflow = source.runwith(sink.fold(0)(_ + _))
whenready(reducedflow)(_ shouldbe 230)

val sharedvar = ssc.sparkcontext.accumulator(0)
inputdstream.foreachrdd { rdd =>
  rdd.foreach { i =>
    sharedvar += i
  }
}
ssc.start()
eventually(sharedvar.value shouldbe 165)

score:0

ref: http://spark.apache.org/docs/latest/streaming-custom-receivers.html

you can do it like:

class streamstopped extends runtimeexception("stream stopped")

// serializable factory class
case class sourcefactory(start: int, end: int) {
  def source = source(start to end).map(_.tostring)
}

class customreceiver(sourcefactory: sourcefactory)
  extends receiver[string](storagelevel.memory_and_disk_2) with logging {

  implicit val materializer = ....

  def onstart() {
    sourcefactory.source.runforeach { e =>
      if (isstopped) {
        // stop the source
        throw new streamstopped
      } else {
        store(e)
      }
    } onfailure {
      case _: streamstopped => // ignore
      case ex: throwable => reporterror("source exception", ex)
    }
  }

  def onstop() {}
}

val customreceiverstream = ssc.receiverstream(new customreceiver(sourcefactory(1,100))

Related Query

More Query from same tag