score:2
Accepted answer
Although it isn't immediately obvious, Flow[Message, Message, _]
is enough for implementing most protocols. Remember that a Flow
can build up almost arbitrary amounts of state via functions like statefulMapConcat
or flatMapConcat
. A Flow
can even start emitting stuff without having directly received an input to reply to via functions like extrapolate
or merge
-ing with some ticking source.
In your case:
val getSourceFiltered: Flow[Message, Message, _] = Flow[Message]
.take(1) // you only care about the first thing that the client sends
.flatMapConcat {
case TextMessage.Strict(txtMsg: String) =>
// Here is where you parse and make your filter using the message the client message
val clientFilter: Message => Boolean = makeFilter(txtMsg)
getSourceAll.filter(clientFilter)
case _ => Source.single(TextMessage("Expected a single strict JSON message"))
}
Source: stackoverflow.com
Related Query
- Akka HTTP WebSocket Server How to declare Source based on Sink content when building Flow for message handlement?
- Http Websocket as Akka Stream Source
- How to test an akka stream closed shape runnable graph with an encapsulated source and sink
- how to make several request to websocket server using akka client side websocket
- Akka Http Server - How to have route respond with byte array binary response
- How to build an Akka Streams Source from the Akka Event Stream?
- How to make cleanup when creating a source stream from iterator with Akka stream?
- How to get a kill switch per user for Akka Http Websocket connection?
- How to deactivate output buffering in Scala Akka HTTP Server
- How to implement Actor based HTTP Routing in Akka
- How to get the current server request timeout in Akka HTTP
- Akka Source that emits when another Sink receives
- How to pass Json content type in akka http
- Akka websocket client : actively disconnect from server and / or replace sink
- How to create a Source in Akka with Actor to serve WebSocket connection
- Akka Http server failing with SQLTransientConnectionException when run in Intellij Idea
- How does one log Akka HTTP client requests
- How to test client-side Akka HTTP
- How can I use a Future inside an Akka HTTP Directive?
- How to do authentication using Akka HTTP
- How to unmarshall akka http request entity as string?
- How can one verify messages sent to self are delivered when testing Akka actors?
- How to download a HTTP resource to a file with Akka Streams and HTTP?
- How can I use http request headers for content negotiation in a Mashaller?
- How to match all paths in Akka HTTP
- How to use Akka HTTP to generate contents via an output stream
- Akka Streams - How to keep materialized value of an auxiliary Sink in a Graph
- How to assemble an Akka Streams sink from multiple file writes?
- Akka non blocking options when an HTTP response is requied
- Akka Flow hangs when making http requests via connection pool
More Query from same tag
- Play! Upload file and save to AWS S3
- Cumulative product in Spark
- Unable to connect to smtp server: Error [ No provider for smtp ] scala spark
- How to store the text file on the Master?
- How to use dataflow text io dynamic destinations in java
- How to use reduceByKey on key of value in tuple set up as (String, (String, Int))?
- Is it possible to extract the substream key in akkastreams?
- Avoiding default logic for case partial function
- Multiply operation on columns in group by query in Slick
- implicit parameter definition in class
- more on type parameters for scala, trying to get a consistent reference to a type
- call python script function from scala
- Redirect with Bad Request Status
- Mac | Scala terminal error while trying to compile
- TypeError: 'JavaPackage' object is not callable for Xgboost in PySpark
- Any class in scala
- Can someone explain what is happening in the below snipped of code? I am trying to understand the below Scala code
- Replace column string name with another column value in Spark Scala
- ERROR: java.lang.IllegalStateException: User did not initialize spark context
- How to add proper error handling to cats-effect's Resource
- Looping the scala list in Spark
- A bady-written Scala code. How would look a well-written one? (playing with programming paradigms and techniques)
- Why is DataBricks's Spark running slightly faster in Python than Scala
- In Scala, how to define a `val` in a `if block`?
- How Can I capture "HTTP Response" on Gatling script?
- How to execute spark sql multiline query when stored as a string variable?
- Scala generic type method matching
- Building configuration DSL in Scala
- Scala Wrapper class by extending Component and with the SequentialContainer.Wrapper trait, do I have the correct understanding of traits?
- Add custom method to JsonFormat