score:2

Accepted answer

the program is running in a deadlock. the threadpool provided is of a fixed size, so the following happens: future(longcomputation) allocates a thread from the threadpool and starts working. when it's done, the oncomplete allocates a thread from the pool to execute the provided function.

given that doing work takes longer than finishing work, at some point, all threads are busy doing work. any one of them finishes and the oncomplete needs a thread as well, so it requests the executor for one. work cannot finish because all threads are busy and the machine stops in deadlock.

we can solve this producer-consumer deadlock by giving reserved resources to the consumer. that way, work is throttled by the fixed-size thread pool, but we ensure that any work that's finished can be further handled.

this snippet, where i've renamed context as fixedcontext shows the use of a separate context for processing the results, solving the deadlock. i also got rid of the promise, that was not playing a real function other than proxying the future.

val fixedcontext = // same as in question
val singlethreadcontext = executioncontext.fromexecutorservice(executors.newfixedthreadpool(1))
...
...
def main( args: array[ string ] ) {

   val s = stream.from( 0 )

   s.foreach { x => 
     println(x)
     val f = future( longcomputation )(fixedcontext)  
     f.oncomplete{ processresult }(singlethreadcontext)
   }

   println("finished")
   fixedcontext.shutdown
 } 
}

score:1

when a thread completes the longcomputation, it tries to put the job on the queue to execute the callback, and gets blocked, because the queue is full. so, eventually, the first "batch" of jobs completes, but all threads are still busy, waiting on the queue to schedule the callback, and nothing is available to pull off the queue.

a solution? remove the limit from queue. this way the threads trying to submit callbacks won't get blocked, and will become available to pick up next task.

you might want to have something inserted into your producer loop to slow it down a little, so that your unlimited queue doesn't eat up all the memory. a semaphore perhaps?

val sem = new semaphore(numthread*2)
def processresult[t](r : try[t]) = blocking {
  r match {
    case success( id ) => println( s"thread result: $id" )
    case failure( t )  => println( "an error has occured: " + t.getmessage )
  }
  sem.release
}

stream.from(0).foreach { _ => 
  sem.acquire
  new future(longcomputation).oncomplete(processresult)
}

you don't need your custom execution context with this - the scala's default will actually work better for what you want to do


Related Query

More Query from same tag