score:0

scan may not be better enough, but works

     sealed trait Ele
      case class FailureEle(e: Throwable) extends Ele
      case class SuccessEle(r: String) extends Ele

      def parse(p: Process[Task, Ele], error: Int): Process[Task, (Seq[SuccessEle],   Seq[FailureEle])] = {
            p.scan(Seq[SuccessEle]() -> Seq[FailureEle]()) { (r, e) =>
              val (s, f) = r
              e match {
                case fail: FailureEle =>
                  s  -> (f :+ fail)
                case succ: SuccessEle =>
                  (s :+ succ) -> f
              }
            }.dropWhile { case (succ, fail) => fail.size < error }.take(1)
          }

        def test() {
            def randomFail = {
              val nInt =  scala.util.Random.nextInt()
              println("getting" +  nInt)
              if(nInt % 5 == 0 )
                FailureEle(new Exception("fooo"))
              else
                SuccessEle(nInt.toString)
            }
            val infinite = Process.repeatEval(Task.delay(randomFail))
            val r = parse(infinite, 3).runLast.run
            println(r)
}

Related Query

More Query from same tag