score:2
The correct way of testing Sources, Flows and Sinks is by using test probes.
Given following source the logic of witch you want to test
val source = Source(Seq("TestValue")).collect {
case s @ "TestValue" => Right[String, String](s)
case _ => Left[String, String]("error")
}
(I know that Left
will never trigger here but it's just an example)
Now, you can define a TestSink
that is connected to the given source. The resulting graph is executed in following way
"assert correct value" in {
implicit val system: ActorSystem = ???
val probe = source.toMat(TestSink.probe)(Keep.right).run()
probe.request(1)
probe.expectNext(Right("TestValue"))
}
TestSink.probe
is a sink that materializes into a TestSubscriber.Probe[T]
and it provides controls over the stream. Since it's a sink, it needs to emit demand for an element. It does it via request(1)
or request one element. Then it's followed by assert sink.expectNext(Right("TestValue"))
that checks that correct value is received.
There is also a counterpart called TestSource.probe
that allows testing of a Sink
.
And by combing both of them, you can test a Flow
.
Source: stackoverflow.com
Related Query
- How to test an akka stream closed shape runnable graph with an encapsulated source and sink
- Validating akka-stream Source in unit test
- Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef
- Split Akka Stream Source into two
- How to mock an Akka Actor to Unit Test a class?
- Http Websocket as Akka Stream Source
- How to unit test an Akka actor that sends a message to itself, without using Thread.sleep
- Idiomatic way to use Spark DStream as Source for an Akka stream
- How do I unit test akka actors in Play Framework 2.2.0 Scala (spec2, Mockito)
- Converting a callback-method implementation into an akka stream Source
- How do I unit test an akka Actor synchronously?
- Java/Kotlin- Akka Stream Source.reduce does not work if null in Source
- Akka Actor restarts after exception during Unit test
- Akka Streams Kafka - unit test for consumer
- Stop Akka stream Source when web socket connection is closed by the client
- Akka stream source - cassandra resultset
- How to make cleanup when creating a source stream from iterator with Akka stream?
- Akka stream test with testkit and scalatest
- Akka Stream source code repository
- Recommended Akka Props style throws IllegalArgumentException only inside unit test
- Akka stream best practice for dynamic Source and Flow controlled by websocket messages
- How to create an Akka Stream Source that generates items recursively
- Akka Stream test flow when Supervision.Resume implemented
- Unit test akka actor method
- Scala simple funsuite unit test with akka actors fails
- Reactive akka stream : How to delay the graph shutdown until the source has dried out?
- Feeding an Akka stream sink into an akka stream source
- Unit test cases for web sockets using akka http
- Akka Stream Graphs - How to test PartitionWith from akka.stream.contrib
- #Repr[A] in Akka Stream source type
More Query from same tag
- Transient equivalent in Slick
- Modify/rewrite ErrorToken String in scala
- Julian to Gregorian date conversion in Scala
- not found: value UADetectorServiceFactory
- Incremental update in rdd or dataframe apache spark
- Intellij Scala- unable to find Scala App
- Concatenate Sparse Vectors in Spark?
- SBT can not resolve sbt-echo-play
- Exception while connecting to Hbase using Spark
- Cancel a Future after stopping an actor
- “required: scala.collection.GenTraversableOnce[?]” error from this for-comprehension
- scala programming without vars
- Spark streaming group by custom function
- Testing a utility function by writing a unit test in apache spark scala
- Gatling: Enable runMultipleSimulations
- Ordering a PriorityQueue by a field of a non-case class
- Function evaluation
- difference between pipe and comma delimiter in spark-scala
- Why does Scala's semicolon inference fail here?
- Interoperability: Scala Case Classes + Java Pojos
- scala spark convert a struct type column to json data
- Why does Spark's GaussianMixture return identical clusters?
- Message passing using akka with case
- How to specify tuple type signature
- Correct pattern for accumulating state in an Akka actor
- Scala + stax compile problem during deploy process
- Getting all records as NULL while reading a Simple JSON file in Spark scala
- How to define a custom SBT task that takes a File argument and returns a File?
- Scala Map indexed by Type
- Applying evolutions to ScalikeJDBC in-memory test DB