score:27
The solution is to use mapMaterializedValue
on the source to get a future of its queue materialization :
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
score:1
Would like to share an insight I got today, though it may not be appropriate to your case with Play.
Instead of thinking of a Source
to trigger, one can often turn the problem upside down and provide a Sink
to the function that does the sourcing.
In such a case, the Sink
would be the "recipe" (non-materialized) stage and we can now use Source.queue
and materialize it right away. Got queue. Got the flow that it runs.
Source: stackoverflow.com
Related Query
- How to use an Akka Streams SourceQueue with PlayFramework
- How to resolve - method using in object WebSocket is deprecated: Use accept with an Akka streams flow instead
- How to use stackable trait pattern with Akka actors?
- How to download a HTTP resource to a file with Akka Streams and HTTP?
- How to use Futures with Kafka Streams
- How to consume grouped sub streams with mapAsync in akka streams
- How to run Akka Streams graph on a separate dispatcher with timeout?
- How to use Oracle stored procedures with Scala Anorm in Playframework
- How to use futures with Akka for asynchronous results
- How to permanently disable javadsl namespace when coding with Akka Streams
- Akka Streams how to write a GraphStage with OrElse
- How to use the Akka sample cluster kubernetes with Scala and minikube?
- Play framework How to use akka streams output to a websocket
- How to use Typesafe Akka with Node.js
- How to use templatesImport in build.sbt with Playframework 2.5.x
- How to get started with Akka Streams?
- How to use Column.isin with list?
- How to use third party libraries with Scala REPL?
- How to use IntelliJ with Play Framework and Scala
- In Scala, how to use Ordering[T] with List.min or List.max and keep code readable
- How to use orderby() with descending order in Spark window functions?
- How to use scala trait with `self` reference?
- How to use s3 with Apache spark 2.2 in the Spark shell
- How to http post with an empty body request using WS API in Playframework 2 / Scala?
- How to use sbt with Google App Engine?
- How to use java.time.LocalDate in Datasets (fails with java.lang.UnsupportedOperationException: No Encoder found)?
- How to deal with long initialization of an Akka child actor?
- How can I use a Future inside an Akka HTTP Directive?
- Scala How to use extends with an anonymous class
- How can I use Kleisli composition with functions returning Validations?
More Query from same tag
- scala: check if a prime number using a while loop
- Flattening Nested Scalaz Validations
- Scala: How to define anonymous function with implicit parameter?
- Scala: Making concrete classes of trait with type member that has a representation type
- Why does scala allow private case class fields?
- Object Proto already defined
- Unable to find encoder for type stored in a Dataset in Spark Scala
- WebSocket Missing implicit Message Flow Transformer for custom data type in play 2.6
- What to import to make < work for jodatime in Scala
- Unable to connect to Spark master
- Nested For Expressions
- Not able to apply function to Spark Dataframe Column
- Snowflake Table from Databricks using Python/Scala
- How to manipulate a map using a parameterized class as the key
- Scala function to change attribute of class
- Gatling where to place JSON files?
- Eclipse Scala Worksheet error value is not a member of object
- How to find item by type from a list in scala?
- Lift Ajax multi select box
- How to invoke a method that expects a Type function with arguments
- create dataset from 2 other datasets spark
- How to read multiple parquet tables?
- What is the diffrence between JsObject and JsValue in Scala?
- The argument types of an anonymous function must be fully known. (SLS 8.5) when word2vec applied for dataframe
- How to group tuple dataset by one of its elements in Scala Spark?
- How to test stackable trait
- Shapeless: batch update using Record
- Scala syntax in Kafka
- Are there specific rules or precendence while using '+=' to insert data into a Map in Scala?
- Spark Scala, merging two columnar dataframes duplicating the second dataframe each time