score:1
Accepted answer
First of all, while it's not necessary, go ahead and use Scala tuples. It'll make things easier overall, unless you have to interoperate with Java Tuples for some reason.
And then, don't use org.apache.flink.api.java.functions.KeySelector. You want to be using this keyBy from org.apache.flink.streaming.api.scala.DataStream:
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {
val cleanFun = clean(fun)
val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
In other words, just pass a function that transforms your stream elements into key values (in general, Flink's scala API tries to be idiomatic). So something like this should do the job:
aa_stats_stream_w_timestamps.keyBy[String](value => value.set1.sEntId)
Update:
For the composite key case, use
aa_stats_stream_w_timestamps.keyBy[(String, String)](x => (x.set1.sEntId, x.set1.field2))
Source: stackoverflow.com
Related Query
- Flink datastream keyby using composite key
- How to query using vertexId if vertex is made using composite key in scala gremline
- Does keyBy partition the DataStream across parallel tasks in Flink (scala)?
- How to convert a Seq[A] to a Map[Int, A] using a value of A as the key in the map?
- Using Typesafe Config's ConfigFactory to set key setting in build.sbt?
- Creating a composition Primary Key using Scala Slick
- How can I configure Circe to stop using nested class names as key names in encoded JSON?
- Can't add member to Map using dynamic mixin type for the key
- reduceByKey using Scala object as key
- Get file name of DataStream with Flink
- Using sealed trait as a key for a map
- How to keep N latest values for key in kafka topic using kafka streams
- Real-Time streaming prediction in Flink using scala
- NoSuchMethod exception in Flink when using dataset with custom object array
- Cannot instantiate user function in scala using flink
- Retrieving Key From Map In Scala Using A Value
- Using tuple as a key in scala
- Using part of the first line of the text file as the key in RDD
- Scala Collections: Using a value of Set find the key from a Map object.
- Using JNI in flink YARN cluster jobs
- Create DataSet From JDBC Source in Flink using Scala
- Cassandra slice query not working with composite key
- Programmatically enable sharding + choosing shard key on a collection using casbah with Mongo 2.4
- Using UUID.randomUUID().toString for setting a key in request body giving issues- scala
- Apache Flink : Creating a Lagged Datastream
- Making key value pairs from an HDFS sequence file using Apache Spark
- How is key equivalence defined when using Scala's HashMap?
- How to change the key of a KStream and then write to a topic using Scala?
- Using Akka ActoRef as Map key
- troubleshooting kafka + flink example using scala sbt?
More Query from same tag
- How to create a javascript function using Scala.js?
- How do I use a runtime factory object for a semiring implementation in Scala?
- How to create a dependent Guice (Play / Scala) binding?
- scala polymorphic type for return value
- Converting single argument into an HList with shapeless Generic
- Why does Scala choose the type 'Product' for 'for' expressions involving Either and value definitions
- What is the best way to Dockerize a Play2-Scala application?
- How to do console debug ouput in a Lift web service?
- What are the differences between Either and Option?
- How to replace the multiple column value of a data frame
- Weird Scala bug related to implicit conversions on default constructor parameters
- How to filter FlatMap based on null value
- Iterating immutable Seq, Vector and List: any differences?
- How to generate several Enumerators out of a single Enumerator (partition, split, ..)
- Why can a method returning Unit be overridden with method returning String when return types are not explicitly given?
- how shall I check that a Future is not null
- Substituting variable in string variable
- Exercise: implementing Stream in Scala
- Replacing all JSON String values recursively with Circe
- Play Framework Strange Behavior when Parsing JSON Body
- Scala String Sink for FlinkKafkaProducer extending KafkaSerializationSchema
- How to encode Json when HList parameter is HNil?
- Akka Test TCP command failed
- Scala mutable.BitSet intersection performance
- Exception on samza KafkaSystemFactory.getAdmin
- Scala How to find Map Key contain some word
- Is there a way to select everything in an array in scala?
- Scala trait syntax
- CountWord using spark on cluster azure
- Converting a vector column in a dataframe back into an array column