score:2

Accepted answer

Although it isn't immediately obvious, Flow[Message, Message, _] is enough for implementing most protocols. Remember that a Flow can build up almost arbitrary amounts of state via functions like statefulMapConcat or flatMapConcat. A Flow can even start emitting stuff without having directly received an input to reply to via functions like extrapolate or merge-ing with some ticking source.

In your case:

val getSourceFiltered: Flow[Message, Message, _] = Flow[Message]
  .take(1)  // you only care about the first thing that the client sends 
  .flatMapConcat {
    case TextMessage.Strict(txtMsg: String) =>

      // Here is where you parse and make your filter using the message the client message
      val clientFilter: Message => Boolean = makeFilter(txtMsg)
      getSourceAll.filter(clientFilter)


    case _ => Source.single(TextMessage("Expected a single strict JSON message"))
  }

Related Query

More Query from same tag