score:1

Accepted answer

When an exception is thrown in a stream, the element that caused the error is typically lost. One idea is to offload the Avro conversion to an actor that returns both the result of the conversion and the original message: this approach would allow you to ack or nack a message depending on whether it was convertible to Avro.

For example, the actor could look something like the following:

case class AvroResult(avro: Option[Avro], msg: CommittableIncomingMessage)
                                // ^ change this to whatever type you're using

class ConverterActor extends Actor {
  val converter = ???

  def receive = {
    case msg: CommittableIncomingMessage =>
      try {
        val json = msg.message.bytes.utf8String
        val avro = converter.convert(json)
        sender() ! AvroResult(Some(avro), msg)
      } catch {
        case _ =>
          sender() ! AvroResult(None, msg)
      }
  }
}

Then you could interrogate this actor in your stream:

val converterActor = system.actorOf(Props[ConverterActor])

val source: Source[Avro, _] = amqpSource
  .ask[AvroResult](converterActor)
  .mapAsync(1) {
    case AvroResult(Some(avro), msg) => // ack
      msg.ack().map(_ => Some(avro))
    case AvroResult(None, msg) => // nack
      msg.nack().map(_ => None)
  }
  .collect {
    case Some(avro) => avro
  }

The above Source emits downstream the Avro-converted messages that are acknowledged with ack. The messages that are not convertible are rejected with nack and are not passed downstream.


Related Query

More Query from same tag