score:4

Accepted answer

the behaviour you are trying to use will not work in a distributed manner, basically if you will have parellelism > 1 it will not work. in your code rates are actually updated, but in different instance of parallel operator.

actually, what you would like to do in this case is use a broadcaststate which was designed to solve exactly the issue you are facing.

in your specific usecase it would look like something like this:

val streama : datastream[(string,double)]= ???
val streamabroadcasted = streama.broadcast(<your map state definition>)
val streamb: datastream[string] = ???
streamb.connect(streamabroadcasted)

then you could easily use broadcastprocessfunction to implement your logic. more on the broadcast state pattern can be found here


Related Query

More Query from same tag