Accepted answer

There are various ways this can be solved. One is to use an ActorPublisher: where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.


Callback --> Source

Elaborating on Endre Varga's answer, below is the code that will create the DataConsumer callback function which will send messages into an akka stream Source.

Caution: There is a lot more to creating a functional ActorPublish than I am indicating below. In particular, buffering needs to be done to handle the case where the DataProducer is calling onData faster than the Sink is signalling demand (see this example). The below code just sets up the "wiring".



/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
  def receive : Receive = {
    case message : DataType => deliverBuf() //defined in example link

class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
  override def onData(data : DataType) = sourceActor.tell(data, noSender)

//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]

//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)

//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))

//setup the incoming data feed from 3rd party library
val dataProducer  = DataProducer(actorConsumer)

Callback --> Whole Stream

The original question ask specifically for a callback to Source, but dealing with callbacks is easier to handle if the entire stream is already available (not just the Source). That is because the stream can be materialized into an ActorRef using the Source#actorRef function. As an example:

val overflowStrategy =

val bufferSize = 100

val streamRef = 
    .actorRef[DataType](bufferSize, overflowStrategy)

val streamConsumer = new DataConsumer {
  override def onData(data : DataType) : Unit = streamRef ! data

val dataProducer = DataProducer(streamConsumer)

Related Query

More Query from same tag