score:1
There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.
score:6
Callback --> Source
Elaborating on Endre Varga's answer, below is the code that will create the DataConsumer
callback function which will send messages into an akka stream Source
.
Caution: There is a lot more to creating a functional ActorPublish than I am indicating below. In particular, buffering needs to be done to handle the case where the DataProducer
is calling onData
faster than the Sink
is signalling demand (see this example). The below code just sets up the "wiring".
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
Callback --> Whole Stream
The original question ask specifically for a callback to Source, but dealing with callbacks is easier to handle if the entire stream is already available (not just the Source). That is because the stream can be materialized into an ActorRef
using the Source#actorRef function. As an example:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)
Source: stackoverflow.com
Related Query
- Converting a callback-method implementation into an akka stream Source
- Split Akka Stream Source into two
- Feeding an Akka stream sink into an akka stream source
- Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef
- Converting Scala @suspendable Method into a Future
- Http Websocket as Akka Stream Source
- Idiomatic way to use Spark DStream as Source for an Akka stream
- Akka stream - Splitting a stream of ByteString into multiple files
- How to test an akka stream closed shape runnable graph with an encapsulated source and sink
- Akka Stream from within a Spark Job to write into kafka
- Java/Kotlin- Akka Stream Source.reduce does not work if null in Source
- akka stream integrating akka-htpp web request call into stream
- Idiomatic way to turn an Akka Source into a Spark InputDStream
- Stop Akka stream Source when web socket connection is closed by the client
- Akka stream source - cassandra resultset
- How to make cleanup when creating a source stream from iterator with Akka stream?
- Pass callback into function to register on scala future onComplete method
- Create Source from a polling method in Akka
- Akka Stream - Splitting flow into multiple Sources
- Akka Stream source code repository
- Akka stream best practice for dynamic Source and Flow controlled by websocket messages
- How to create an Akka Stream Source that generates items recursively
- How to manage Http into Akka Stream and send the message to Kafka?
- Multiplex akka streams source into two copies
- Akka stream GraphStage is not running the final invocation of an async callback
- Akka HTTP. Streaming source from callback
- How to make source function that polls an http endpoint into flink stream for every 1 hour?
- Reactive akka stream : How to delay the graph shutdown until the source has dried out?
- #Repr[A] in Akka Stream source type
- Operator or method for turning a Scala stream into a stream of streams (suffixes of given stream)
More Query from same tag
- Run Scala Dotty project using Intellij IDE
- Twitter Logging and Scala Custom Handlers
- generate unique sequence id per type
- Clean a Play framework build in IntelliJ
- How to get the current local time or system time in spark-scala dataframe?
- Output contents of DStream in Scala Apache Spark
- How to catch an exception within loop/react of an actor?
- spark sql error while querying sql server table
- Extract Key Value Pairs from Input Using Scala, Spark
- Scala: function as a parameter to map()
- What are the differences between these two Scala code segments regarding postfix toString method?
- Neo4j Spark connector error: import.org.neo4j.spark._ object neo4j is not found in package org
- Making a public accessor from an inherited protected Java field
- error while loading CharSequence in Scala 2.11.4 and sbt 0.12.4
- Match may not be exhaustive warning is incorrect
- Deleted Intellij sbt project keeps being recreated
- How to initialize transient fields during deserialization?
- How do I know, which implicit that is needs?
- Limit product to value types in scala
- Difference between these two scala function definitions
- Alternative for Either pattern matching
- How to set the name of the next Future thread in Scala?
- Catching an exception as a response of an actor in Akka
- What are the differences between asInstanceOf[T] and (o: T) in Scala?
- What does apply function do in Scala?
- Akka member(actor) lookup in cluster
- What is the idiomatic way to pattern match sequence comprehensions?
- Why global ExecutionContext is not a default parameter in future block?
- Spark files not found in cluster deploy mode
- In clauses with sql interpolation