score:1

Accepted answer

First of all, while it's not necessary, go ahead and use Scala tuples. It'll make things easier overall, unless you have to interoperate with Java Tuples for some reason.

And then, don't use org.apache.flink.api.java.functions.KeySelector. You want to be using this keyBy from org.apache.flink.streaming.api.scala.DataStream:

/**
 * Groups the elements of a DataStream by the given K key to
 * be used with grouped operators like grouped reduce or grouped aggregations.
 */
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {

  val cleanFun = clean(fun)
  val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]

  val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
    def getKey(in: T) = cleanFun(in)
    override def getProducedType: TypeInformation[K] = keyType
  }
  asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}

In other words, just pass a function that transforms your stream elements into key values (in general, Flink's scala API tries to be idiomatic). So something like this should do the job:

aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)

Update:

For the composite key case, use

aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))

Related Query

More Query from same tag