score:2
the program is running in a deadlock. the threadpool
provided is of a fixed size, so the following happens:
future(longcomputation)
allocates a thread from the threadpool and starts working. when it's done, the oncomplete
allocates a thread
from the pool to execute the provided function.
given that doing work takes longer than finishing work, at some point, all threads are busy doing work. any one of them finishes and the oncomplete
needs a thread as well, so it requests the executor for one. work cannot finish because all threads are busy and the machine stops in deadlock.
we can solve this producer-consumer deadlock by giving reserved resources to the consumer. that way, work is throttled by the fixed-size thread pool, but we ensure that any work that's finished can be further handled.
this snippet, where i've renamed context
as fixedcontext
shows the use of a separate context for processing the results, solving the deadlock. i also got rid of the promise
, that was not playing a real function other than proxying the future.
val fixedcontext = // same as in question
val singlethreadcontext = executioncontext.fromexecutorservice(executors.newfixedthreadpool(1))
...
...
def main( args: array[ string ] ) {
val s = stream.from( 0 )
s.foreach { x =>
println(x)
val f = future( longcomputation )(fixedcontext)
f.oncomplete{ processresult }(singlethreadcontext)
}
println("finished")
fixedcontext.shutdown
}
}
score:1
when a thread completes the longcomputation
, it tries to put the job on the queue to execute the callback, and gets blocked, because the queue is full. so, eventually, the first "batch" of jobs completes, but all threads are still busy, waiting on the queue to schedule the callback, and nothing is available to pull off the queue.
a solution? remove the limit from queue. this way the threads trying to submit callbacks won't get blocked, and will become available to pick up next task.
you might want to have something inserted into your producer loop to slow it down a little, so that your unlimited queue doesn't eat up all the memory. a semaphore
perhaps?
val sem = new semaphore(numthread*2)
def processresult[t](r : try[t]) = blocking {
r match {
case success( id ) => println( s"thread result: $id" )
case failure( t ) => println( "an error has occured: " + t.getmessage )
}
sem.release
}
stream.from(0).foreach { _ =>
sem.acquire
new future(longcomputation).oncomplete(processresult)
}
you don't need your custom execution context with this - the scala's default will actually work better for what you want to do
Source: stackoverflow.com
Related Query
- Throttling Scala Future blocks when onComplete is used
- Difference between F[_] and F[T] In Scala when used in type constructors
- MongoDB scala driver: what is a best way to return Future when working with Observer callbacks?
- Can a Scala type parameter reference itself when used as a parameter to a base-class?
- Why is scala Await.result timing out in repl when passed the same future twice?
- Scala variable binding when used with Actors
- Return a string from a Future onComplete case with Scala and Spray.io
- Scala Future onComplete callback not executing immediately
- Scala ActiveRecord performance issue when used with play framework
- Scala compiler says my method is recursive in case when implicits and anonymous class is used
- When should avoid usage of Future in scala
- Why is Scala Future running sequentially when mapped over an Iterator
- Trait runtime type of type parameter through TypeTag when used with Existential type in Scala
- implement onComplete Scala Future in Kotlin
- scala future throwing exception even though it is being handled by Failure in onComplete
- How do unicode character escape sequences work in Scala and Java, when used in naming
- Pass callback into function to register on scala future onComplete method
- Scala Process never end when applying Future and ThreadPools for Multi-threading
- java.lang.IllegalMonitorStateException when testing scala future
- scala Future onComplete method signature understanding
- Scala type mismatch in function which returns Double => String when "return" keyword is used
- Scala Future blocks transformations?
- Scala Future is not properly called when using implicit class
- Akka ask returns nothing when tell is used in future
- Scala Future does not return anything when allocating too much memory
- Overloading in Scala when type parameters are being used
- onComplete faster than map in Scala future
- I have a XGBoost model trained in python, but it will get a different predictions when loaded in scala and used the same features, why?
- Scala Future oncomplete doesn't complete?
- Spark Scala code does not compile when used with a encoder + by name
More Query from same tag
- Why in runtime the app have java.lang.NoClassDefFoundError: reactivemongo/api/bson/SafeBSONWriter
- How to start with Scala, intelliJ IDEA and SBT?
- Sum of single column across rows based on a condition in Spark Dataframe
- MappedEnum - No implicit view available
- How to update a mutable hashmap element in Scala?
- Idiomatic Scala way to deal with base vs derived class field names?
- Use of scalastyleConfigUrl
- Call by name parameters in scala functions
- Scala mocking trait that is extended by another trait
- Scala: Method overloading over generic types
- how can i override functions ( create, delete, modify, etc ) of crudify class ( scala with lift )
- For GraphX how do i convert an array of object to an array of Edges
- What language used in scala documentation in lexical expressions?
- How can I bind snippet function to onclick event from button or image?
- Anonymous Subclass in Scala
- Finding the max value in Spark RDD
- How can I convert a nested HashMap, nested with hashmaps, to json in Scala?
- Eclipse doesn't recognize new play framework's view's apply method
- spark scala scala.MatchError of class scala.collection.immutable.$colon$colon
- Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?
- Play 2 form limitation
- Getting "how can getCommonSuperclass() do its job if different class symbols get the same bytecode-level internal name" compile error
- Thread-safe weak-keys hash map
- Fill scala column with nulls
- When exporting a Java Scala project it failed to get scala/ScalaObject
- Deserialise JSON to polymorphic types based on a type field
- Why args are compulsory for main
- Compilation error: not found: value nonEmptyText in Play framework while using Scala
- Getting error while trying to insert data into MongoDB
- Scala JSON serialization support in Jersey 2.5