score:2
the code behaves like that because it store the data in internal buffer. due to that it is not getting back pressure.
import akka.actor.actorsystem
import akka.stream.{actormaterializer, attributes, closedshape, flowshape, overflowstrategy}
import akka.stream.scaladsl.{flow, graphdsl, mergelatest, partition, runnablegraph, sink, source}
import scala.concurrent.duration._
object graphwithbackpressure extends app {
implicit val actorsysten = actorsystem("actory-system")
implicit val executioncontext = actorsysten.dispatcher
def partition2(v : int): int = {
v % 2
}
val partialgraphdsl = graphdsl.create() { implicit builder =>
import graphdsl.implicits._
val partitioner = builder.add(partition[int](2, partition2))
val mergelatest = builder.add(mergelatest[int](2))
partitioner.out(0) ~> mergelatest
partitioner.out(1) ~> mergelatest
flowshape(partitioner.in, mergelatest.out)
}
val source = source(1 to 1000000)
.async
val maingraph = graphdsl.create() { implicit builder =>
import graphdsl.implicits._
val partialgraph = builder.add(partialgraphdsl)
val delay = flow[list[int]]
.delay(1.seconds, overflowstrategy.backpressure)
.map { x => println("after delay."); x }
.withattributes(attributes.inputbuffer(1,1))
val newdelay = flow[int]
.delay(1.seconds, overflowstrategy.backpressure)
.map { x => println("after delay."); x }
.withattributes(attributes.inputbuffer(1,1))
source ~> newdelay ~> flow[int].map { x => println(x); x } ~> partialgraph ~> delay ~> sink.foreach(println)
closedshape
}
val runnable = runnablegraph.fromgraph(maingraph)
val materialized = runnable.run()(actormaterializer())
}
if we start the delay before flow then we can see the back-pressure.
note: check the newdelay method
Source: stackoverflow.com
Related Query
- Akka streams delay and back pressure
- Ways to maintain back pressure in Akka Streams involving multiple JVMs
- integrating with a non thread safe service, while maintaining back pressure, in a concurrent environment using Futures, akka streams and akka actors
- How to download a HTTP resource to a file with Akka Streams and HTTP?
- Websocket Proxy using Play 2.6 and akka streams
- Akka streams - resuming graph with broadcast and zip after failure
- Difference between Balance and Broadcast fan out in Akka Streams
- Reading from postgres using Akka Streams 2.4.2 and Slick 3.0
- Scala Akka Streams Merging Filter And Map
- Akka Streams with Akka HTTP Server and Client
- Play framework Scala: Create infinite source using scala akka streams and keep Server sent events connection open on server
- Akka Streams Inlets and Outlets match
- Akka Streams and Scala Play server
- Akka HTTP and Akka streams - using Flows with high level routes API
- An akka streams function that creates a sink and a source that emits whatever the sink receives
- When materialiser is actually used in Akka Streams Flows and when do we need to Keep values
- How to pass and consume Flow1 materialized value into Flow2 using Akka streams
- Dead letters using Akka HTTP client and Akka Streams
- Akka Streams File Handling and Termination
- Equivalent to balancer, broadcast and merge in pure akka streams
- Akka Streams scala DSL and Op-Rabbit
- Akka Streams - Understanding when and how materialisation works
- Akka in Scala, exclamation mark and question mark
- Difference between forward and tell in akka actors
- Difference in message-passing model of Akka and Vert.x
- Akka Stream Kafka vs Kafka Streams
- Traversing lists and streams with a function returning a future
- In Scala Akka futures, what is the difference between map and flatMap?
- Akka SLF4J logback configuration and usage
- Creating a flow from actor in Akka Streams
More Query from same tag
- How to calculate unique session id per user per minute in a click stream dataset using Spark-SQL?
- Typesafe stack and Scala 2.10
- DynamoDB QueryRequest not returning all the results
- Gatling. ConnectException: connection timed out:
- Play for Scala: In an ActionBuilder, is it possible to modify the session of a wrapped action's result?
- Unresolved dependency with specs2 scalaz-stream 0.5a
- Typesafe Play WS as dependency in SBT project
- Akka Streams Covarience for SourceQueueWithComplete
- Case class inheritance is prohibited, but how to model dependency from library?
- How to remove all inferred implicits in a macro?
- Calculating the mean of elements in a list in Scala
- Difference between ::: and ++
- How to build and run zinc (scala incremental compiler) on ubuntu
- Cake pattern with overriding abstract type don't work with Upper Type Bounds
- Scala - TrieMap vs Vector
- ScalaTest - check for "almost equal" for floats and objects containing floats
- Play framework show States list from DB based on selected Country Id using AJAX
- Building an SBT project using Maven
- Scala: Converting 2D list into 1D
- Is using Try[Unit] the proper way?
- Get a value from a CSV Gatling feeder to create a POST request body
- How to use gcs-connector and google-cloud-storage alongside in Scala
- Scala: copy case class from common supertype
- Cannot connect to remote MongoDB from EMR cluster with spark-shell
- Recursive tree-like table query with Slick
- How to measure execution time of each loop step in Scala?
- Order Scala methods alphabetically (by name) using Intellij IDEA
- Understanding a Scala function from Play framework zentask example
- Match a word but not its inverse using [^] syntax
- Finding an overloaded method using Scala Reflections