score:0

The one method that I've been able to come up with that works, is to use a Concurrent.unicast and pass the channel into the combining function. I'm sure there is a way to create an Iteratee/Enumerator that does the work all in one nice neat package, but that is eluding me at the time being.

Updated combining function;

def virtualSystemGrouping[E](system:ParentSystem, output:Channel): Iteratee[Detail, Detail] = {
  def step(state: Map[String, Detail])(input:Input[Detail]): Iteratee[Detail, Detail] = {
    input match {
      case Input.EOF => {
          state.mapValues(r=>output.push(r))
          output.eofAndEnd
          Done(null, Input.EOF)
      }
      case Input.Empty =>{Cont[Detail, Detail](i => step(state)(i))}
      case Input.El(e) => {
        if (!system.isVirtual) {output.push(e); Done(e, Input.Empty)}
        if (state.exists((k) =>{k._1.equals(e.name)})) {
          val other = state(e.name)
          output.push(e + other)
          Cont[Detail, Detail](i => step(state - e.name)(i))
        } else {
          Cont[Detail, Detail](i => step(state + (e.name -> e))(i))
        }
      }
    }
  }
  Cont(step(Map[String,Detail]()))
}

Here any combined values are pushed into the output channel and then subsequently processed.

The usage of this looks like the following;

   val systems:List[ParentSystem] = getSystems(parent)
   val start = Enumerator.empty[Detail]
   val concatDetail = systems.foldLeft(start){(b,p) =>
     b interleave Concurrent.unicast[Detail]{channel =>
       implicit val timeout = Timeout (1 seconds)
       val actor = SystemsActor.lookupActor(p.name + "/details")
       actor map {
         case Some(a) => {a ! SendRateInformation(channel)}
         case None => {channel.eofAndEnd}
         } recover {
         case t:Throwable => {channel.eofAndEnd}
       }

       }
   } 


   val combinedDetail = Concurrent.unicast[Detail]{channel => 
    concatDetail &> Enumeratee.grouped(virtualSystemGrouping(parent, channel)) |>> Iteratee.ignore
   }
   val send = combinedDetail |>> Iteratee.foreach(e => {output.push(e)})
   send.onComplete(t => output.eofAndEnd)

Very similar to the original except now the calling to the combining function is done within the unicast onStart block (where channel is defined). concatDetail is the Enumerator created from the interleaved results of the child systems. This is fed through the system grouping function which in turn pushes any combined results (and remaining results at EOF) through the provided channel.

The combinedDetails Enumerator is then taken in and pushed through to the upstream output channel.

EDIT: The virtualSystemGrouping can be generalized as;

  def enumGroup[E >: Null, K, M](
      key:(E) => K,
      merge:(E, Option[E]) => M,
      output:Concurrent.Channel[M]
      ): Iteratee[E, E] = {
    def step(state: Map[K, E])(input:Input[E]): Iteratee[E, E] = {
      input match {
        case Input.EOF => {
          state.mapValues(f => output.push(merge(f, None))) //Push along any remaining values.
          output.eofAndEnd();
          Done(null, Input.EOF)
          }
        case Input.Empty =>{ Cont[E, E](i => step(state)(i))}
        case Input.El(e) => {
          if (state.contains(key(e))) {
            output.push(merge(e, state.get(key(e))))
            Cont[E, E](i => step(state - key(e))(i))
          } else {
            Cont[E, E](i => step(state + (key(e) -> e))(i))
          }
        }
      }
    }

    Cont(step(Map[K,E]()))
  }

With a call such as;

Enumeratee.grouped(
             enumGroup(
             (k=>k.name),
             ((e1, e2) => e2.fold(e1)(v => e1 + v)),
             channel)
            )

Related Query

More Query from same tag