score:1
When an exception is thrown in a stream, the element that caused the error is typically lost. One idea is to offload the Avro conversion to an actor that returns both the result of the conversion and the original message: this approach would allow you to ack
or nack
a message depending on whether it was convertible to Avro.
For example, the actor could look something like the following:
case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
// ^ change this to whatever type you're using
class ConverterActor extends Actor {
val converter = ???
def receive = {
case msg: CommittableIncomingMessage =>
try {
val json = msg.message.bytes.utf8String
val avro = converter.convert(json)
sender() ! AvroResult(Some(avro), msg)
} catch {
case _ =>
sender() ! AvroResult(None, msg)
}
}
}
Then you could interrogate this actor in your stream:
val converterActor = system.actorOf(Props[ConverterActor])
val source: Source[Avro, _] = amqpSource
.ask[AvroResult](converterActor)
.mapAsync(1) {
case AvroResult(Some(avro), msg) => // ack
msg.ack().map(_ => Some(avro))
case AvroResult(None, msg) => // nack
msg.nack().map(_ => None)
}
.collect {
case Some(avro) => avro
}
The above Source
emits downstream the Avro-converted messages that are acknowledged with ack
. The messages that are not convertible are rejected with nack
and are not passed downstream.
Source: stackoverflow.com
Related Query
- How to release a message back to RabbitMQ so it's available for another consumer?
- RabbitMQ - How to extract message body from consumer tag with DefaultConsumer?
- How do I test an Akka actor that sends a message to another actor?
- How to make Reflection for getting the field value by its string name and its original type
- How to compose column name using another column's value for withColumn in Scala Spark
- How to get a Promise (or Future) by asking an Akka Actor considering that the result is NOT available as a response from the same message
- How to override SettingKey for computing another SettingKey in sbt?
- How to send Json from client with missing fields for its corresponding Case Class after using Json.format function
- How can be CSS used to style web pages meant to be served by Lift when Lift uses CSS classes for the purpose of its own?
- Using Play Framework Anorm, how do I get the auto-generated id back for an insert?
- How to implement collection with covariance when delegating to another collection for storage?
- How to make my custom class available for `<-`?
- Http4s decoder how to customize error message for invalid fields
- How do make a `CustomExecutionContext` available for dependency injection in a Play 2.6 controller?
- how to write insert query for table that contains reference column to another table in play-slick with scala?
- How to define a new setting and set its value for a task?
- How to append lists (coming out of a for loop) to another existing list in scala
- How to make manifest available for a generic type in scala?
- How to assign non unique incrementing index (index markup) in Spark SQL, set back to 0 on joining the specific value from another dataframe
- How to state one trait is the equivalent of another generic trait for a given type parameter?
- How do I find a max of a column for each unique value in another column in Spark?
- Akka HTTP WebSocket Server How to declare Source based on Sink content when building Flow for message handlement?
- Akka actor(Scalia)l: how to react to the same message differently for different senders?
- How to remove a string element from a list if its a subset of another element?
- How get access_token from body for use in another method
- How to zip an RDD with its corresponding elements in another RDD?
- How to update a DataFrame given another DataFrame with the values for update?
- How to get Squeryl to release closed connections back to C3P0?
- how to concat multiple columns in spark while getting the column names to be concatenated from another table (different for each row)
- Scala - How to Convert Generic List To Array and avoid "No ClassTag available for T"
More Query from same tag
- Map data from a Scala Future
- Idiomatic Scala for nested loop
- When running tests from "sbt test" the classpath is different than when running from IDEA
- How to exclude logging (like logback-classic) from jar published by sbt
- Plus One Linked List: Functional Approach
- Map with same key when fetched shows all related to that key
- How to find max in a list of tuples?
- Parsing indentation with FastParse
- Access URL Query Parameters in Lagom
- Is there simple way/method to extract the element from json in scala?
- Scala for-comprehension yielding result from multiple variable bindings
- Why is this type not inferred by compiler?
- Scala broadcast join with "one to many" relationship
- Passing a function with default parameter to a higher order function
- Why does sbt-native-packager generate no bin directory?
- How to convert hadoop avro, parquet, as well as text file to csv without spark
- stacking multiple traits in akka Actors
- How to allow/wait for Akka streams to complete on JVM shutdown?
- All fields and parameters marked as non-null in Graphiql : Sangria, scala
- Cannot run Scala through ProcessBuilder
- How to mock scala generic method
- How do I make these function definitions D.R.Y.?
- Lagom ConductR connection refused error
- Akka Http DSL Directives, how it works
- Stringbuilder to RDD
- create object with reflection - from string
- Dynamically choose implementation of logger in Scala
- Is there a shorthand for type variable 'm forSome { type m[O] <: UpperBound[O] }` in Scala?
- Avro serialization with generic type issue
- Scala: How to convert tuple elements to lists