score:1

Accepted answer

When handling Futures inside a Flow, mapAsync is usually what you're looking for. To add to your specific example:

def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???

def wsFlow: Flow[Message, Message, Any] =
  Flow[Message]
   .mapConcat {...}
   .mapAsync(parallelism = 4)(asyncOp)
   .via(tailorResponse)

mapAsyncUnordered can also be used, in case the order of the Futures result is not relevant. Parallelism indicates how many Futures can be run at the same time, before the stage backpressures.

See also

  • stage docs
  • how to use in conjunction with ask - here

Related Query

More Query from same tag