score:4
Accepted answer
the behaviour you are trying to use will not work in a distributed manner, basically if you will have parellelism
> 1 it will not work. in your code rates
are actually updated, but in different instance of parallel operator.
actually, what you would like to do in this case is use a broadcaststate
which was designed to solve exactly the issue you are facing.
in your specific usecase it would look like something like this:
val streama : datastream[(string,double)]= ???
val streamabroadcasted = streama.broadcast(<your map state definition>)
val streamb: datastream[string] = ???
streamb.connect(streamabroadcasted)
then you could easily use broadcastprocessfunction
to implement your logic. more on the broadcast state pattern can be found here
Source: stackoverflow.com
Related Query
- Update concurrent map inside a stream map on flink
- How to update a global variable inside RDD map operation
- Scala: Update Array inside a Map
- scala case class update value inside map
- Google Maps API v2 SupportMapFragment inside ScrollView - users cannot scroll the map vertically
- Idiomatic way to update value in a Map based on previous value
- How does Scala's mutable Map update [map(key) = newValue] syntax work?
- What is the efficient way to update value inside Spark's RDD?
- How to access and update a value in a mutable map of map of maps
- Accessing Flink Classloader before Stream Start
- Create backpressure from a Future inside an Akka stream
- Update a mutable map with default value in Scala
- Scala bug with immutable Map in concurrent program?
- How to update Map using monocle
- LinkedHashMap changes to HashMap and crashes in flink data stream operators
- Scala: Why the mutable value inside Map cannot be changed if the Map is created from GroupBy
- Scala Spark filter inside map
- How to update a nested immutable map
- Scala Map update
- Result of map and mapAsync(1) in Akka Stream
- Closing an Akka stream from inside a GraphStage (Akka 2.4.2)
- Map inside Map in Scala
- Can't we use sparkContext inside map function?
- What is the best practice for throwing exceptions inside an Akka Stream - Scala?
- Scala - Update RDD with another Map
- How to test exceptions thrown inside map in Scala
- Scala update value in a map while iterating
- Functional way to update a Map
- Tuple unpacking inside map
- Compile time error in scala Map while I put an object inside value
More Query from same tag
- Scala mongodb : result of query as list
- How to convert SearchResponse to JsValue
- Akka - Remote cluster discovery
- How to read this flatMap code?
- Exception handling with Akka actors
- Future sql execution behavior while database connection is unstable
- How to listen websocket server close events using akka-http websocket client
- How to convert a Scala HTTP Route to a Java HTTP Route in Akka 2.4.7?
- How does `=<<` work in Scalaz.Kleisli
- How to remove footer from file while reading file in spark scala
- scala reflection without typeOf
- for-loop behavior when iterating a Set instead of a List
- scala : <console>:28: error: type mismatch;
- how to program against both mutable and immutable Map?
- Include Resources from an External Repository in Sbt
- How can I get Fibonacci(n) in an efficient way with Scala Actor?
- Scala-Play: Why are the Form default values not being picked up?
- Assign generic method to variable in Scala
- scalding how to map on all fields with '* keyword?
- Using different monads in for-comprehension
- What are Untyped Scala UDF and Typed Scala UDF? What are their differences?
- How to reference external sbt project from another sbt project?
- java.nio.charset.UnmappableCharacterException: Input length = 1
- Is there a name for this kind of lifting a function?
- Test if a function is called in a given scope in scala
- Value to indicate to use default
- JDBC error on IS NULL condition
- Scala function that return another function which can internally hold data
- How to Spark implement the interactive In-Memory Cache?
- sortWith using basic character comparison fails for some strings