score:2

the code behaves like that because it store the data in internal buffer. due to that it is not getting back pressure.

import akka.actor.actorsystem
import akka.stream.{actormaterializer, attributes, closedshape, flowshape, overflowstrategy}
import akka.stream.scaladsl.{flow, graphdsl, mergelatest, partition, runnablegraph, sink, source}

import scala.concurrent.duration._

object graphwithbackpressure extends app {
  implicit val actorsysten = actorsystem("actory-system")
  implicit val executioncontext = actorsysten.dispatcher

  def partition2(v : int): int = {
    v % 2
  }

  val partialgraphdsl = graphdsl.create() { implicit builder =>
    import graphdsl.implicits._
    val partitioner = builder.add(partition[int](2, partition2))
    val mergelatest = builder.add(mergelatest[int](2))

    partitioner.out(0) ~> mergelatest
    partitioner.out(1) ~> mergelatest

    flowshape(partitioner.in, mergelatest.out)
  }

  val source = source(1 to 1000000)
    .async

  val maingraph = graphdsl.create() { implicit builder =>
    import graphdsl.implicits._
    val partialgraph = builder.add(partialgraphdsl)

    val delay = flow[list[int]]
      .delay(1.seconds, overflowstrategy.backpressure)
      .map { x => println("after delay."); x }
      .withattributes(attributes.inputbuffer(1,1))

    val newdelay = flow[int]
      .delay(1.seconds, overflowstrategy.backpressure)
      .map { x => println("after delay."); x }
      .withattributes(attributes.inputbuffer(1,1))

    source ~> newdelay ~> flow[int].map { x => println(x); x } ~> partialgraph ~> delay ~> sink.foreach(println)

    closedshape
  }

  val runnable = runnablegraph.fromgraph(maingraph)
  val materialized = runnable.run()(actormaterializer())
}

if we start the delay before flow then we can see the back-pressure.

note: check the newdelay method


Related Query

More Query from same tag