score:1
Accepted answer
When handling Future
s inside a Flow
, mapAsync
is usually what you're looking for. To add to your specific example:
def asyncOp(msg: TextMessage): Future[SomeReply] = ???
def tailorResponse(r: SomeReply): TextMessage = ???
def wsFlow: Flow[Message, Message, Any] =
Flow[Message]
.mapConcat {...}
.mapAsync(parallelism = 4)(asyncOp)
.via(tailorResponse)
mapAsyncUnordered
can also be used, in case the order of the Future
s result is not relevant.
Parallelism indicates how many Future
s can be run at the same time, before the stage backpressures.
See also
Source: stackoverflow.com
Related Query
- Akka-http approach to handle websocket commands
- Http Websocket as Akka Stream Source
- Definition of Akka HTTP client-side websocket streams
- Intercept Akka HTTP WebSocket events on socket close in Scala
- Akka HTTP client side websocket closes unexpectedly
- Akka Http and Circe, how to serialize entities wrapped in the tagless final approach (Cats Effect)
- How to get a kill switch per user for Akka Http Websocket connection?
- Akka HTTP WebSocket Server How to declare Source based on Sink content when building Flow for message handlement?
- Migration from Play Websocket to Akka HTTP Websocket
- How to handle this type of JSON request in akka http POST endpoint?
- how to handle long running request in akka http route
- How to use Akka actors to handle HTTP requests in Akka
- How does one log Akka HTTP client requests
- Akka Http Performance tuning
- Simple Websocket in scala / akka / play
- Correct use of Akka http client connection pools
- How to test client-side Akka HTTP
- How can I use a Future inside an Akka HTTP Directive?
- How to do authentication using Akka HTTP
- Transforming Slick Streaming data and sending Chunked Response using Akka Http
- Get http headers with akka routing dsl
- File upload using Akka HTTP
- How to unmarshall akka http request entity as string?
- Idiomatic way to create a basic HTTP Post request with Akka HTTP
- How to download a HTTP resource to a file with Akka Streams and HTTP?
- akka http throws EntityStreamException: Entity stream truncation
- Akka Http Client :Custom headers
- Handle Akka stream's first element specially
- How to match all paths in Akka HTTP
- Shut down Akka HTTP app
More Query from same tag
- Convert List into dataframe spark scala
- Configure logging with Lift
- Conditional mapping in play scala
- Scala io.Source.fromfile not finding my file even with absolute path specified
- Why does Scalaz use complex symbols and no in-code documentation?
- How to find columns with many nulls in Spark/Scala
- How do I integrate ScalaTest with Spring
- Spark 2.1.0 UDF Schema type not supported
- Distinguish Scala-3 Enum and Sealed Traits
- Companion objects benefits of posibility to implement interfaces
- Error in shapeless code: Expression of inst.type does not conform
- On scala project - Getting error GC overhead limit exceeded when running sbt test command
- Converting datatypes in Spark/Scala
- Can not access field with sorm
- union two RDDs Spark scala, keeping the right side
- How to cast Array[Struct[String,String]] column type in Hive to Array[Map[String,String]]?
- Specs2 unable to find my tests using sbt
- Akka Http Client :Custom headers
- Apache Thrift server logs
- How to select the most recent data from dataframe for a pair of keys?
- sbt run won't find external libraries
- Reading a large file in functional scala
- How to correctly use ambiguous implicits for type negation in Scala
- How to apply a function on a sequential data within group in spark?
- How to get generic (polymorphic) lambda in scala?
- write parquet files with defined column name instead of using dataframe's column name
- can we have methods as arguments in a class definition?
- Scalatest and Spark giving "java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper"
- Reading Message from Kafka Topic and Dump it into HDFS
- Multiple types in a list?