1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.actor.typed.scaladsl.Behaviors
23 import akka.actor.typed._
24 import akka.stream.scaladsl.{Sink, Source}
25 import com.sksamuel.avro4s.{Decoder, SchemaFor}
26 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
27 import org.apache.kafka.common.serialization.StringDeserializer
28 import org.make.api.extensions.KafkaConfiguration
29 import org.make.api.kafka.kafkaDispatcher
30 import org.make.api.technical.KafkaConsumerBehavior._
31 import org.make.api.technical.tracing.Tracing
32 import org.make.core.{AvroSerializers, EventWrapper, SlugHelper}
33 
34 import java.util
35 import java.util.Properties
36 import scala.concurrent.duration.{DurationInt, FiniteDuration}
37 import scala.concurrent.{Await, Future}
38 import scala.jdk.CollectionConverters._
39 import scala.util.{Failure, Success, Try}
40 
41 abstract class KafkaConsumerBehavior[T: SchemaFor: Decoder] extends AvroSerializers {
42 
43   protected val groupId: String
44   protected val topicKey: String
45   protected def handleMessage(message: T)(system: ActorSystem[_]): Future[_]
46 
47   protected val handleMessagesParallelism: Int = 4
48   protected def handleMessagesTimeout: FiniteDuration = 1.minute
49   protected def customProperties: Properties = new Properties()
50 
51   protected def createConsumer(kafkaConfiguration: KafkaConfiguration, name: String): KafkaConsumer[String, T] = {
52     val props = new Properties()
53     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.connectionString)
54     props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
55     props.put(ConsumerConfig.CLIENT_ID_CONFIG, name)
56     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
57     props.putAll(customProperties)
58     val consumer = new KafkaConsumer[String, T](
59       props,
60       new StringDeserializer(),
61       new MakeKafkaAvroDeserializer(kafkaConfiguration.schemaRegistry)
62     )
63     consumer.subscribe(util.Arrays.asList(kafkaConfiguration.topic(topicKey)))
64     consumer
65   }
66 
67   final def createBehavior(name: String): Behavior[Protocol] = {
68     Behaviors.setup { context =>
69       val kafkaConfiguration = KafkaConfiguration(context.system)
70       val consumer = createConsumer(kafkaConfiguration, name)
71 
72       context.self ! Consume
73 
74       implicit val system: ActorSystem[Nothing] = context.system
75       val dispatcher = system.dispatchers.lookup(DispatcherSelector.fromConfig(kafkaDispatcher))
76 
77       Behaviors
78         .receiveMessage[Protocol] {
79           case CheckState(replyTo) =>
80             if (consumer.assignment().size() > 0) {
81               replyTo ! Ready
82             } else {
83               replyTo ! Waiting
84             }
85             Behaviors.same
86           case Consume =>
87             context.self ! Consume
88             val records = consumer.poll(kafkaConfiguration.pollTimeout)
89             val handleRecords = Source
90               .fromIterator(() => records.asScala.iterator)
91               .map(_.value())
92               .mapAsync(handleMessagesParallelism) { message =>
93                 val messageClass = (message match {
94                   case wrapper: EventWrapper[_] => wrapper.event
95                   case _                        => message
96                 }).getClass.getSimpleName
97                 Tracing.entrypoint(SlugHelper(s"${getClass.getSimpleName}-$messageClass"))
98                 handleMessage(message)(context.system).recover {
99                   case e => context.log.error(s"Error while handling message of type [$messageClass]: $message", e)
100                 }(dispatcher)
101               }
102               .runWith(Sink.ignore)
103 
104             Try(Await.ready(handleRecords, handleMessagesTimeout)) match {
105               case Success(_) =>
106               case Failure(e) => context.log.error("Timeout occurred while consuming message", e)
107             }
108             // toDo: manage failures
109             consumer.commitSync()
110             Behaviors.same
111         }
112         .receiveSignal {
113           case (_, PostStop) =>
114             consumer.close()
115             Behaviors.same
116         }
117     }
118   }
119 
120   final def doNothing(event: Any): Future[Unit] = {
121     Future.unit
122   }
123 }
124 
125 object KafkaConsumerBehavior {
126   sealed trait Protocol
127   case object Consume extends Protocol
128   final case class CheckState(replyTo: ActorRef[ConsumerStatus]) extends Protocol
129 
130   sealed trait ConsumerStatus
131   case object Ready extends ConsumerStatus
132   case object Waiting extends ConsumerStatus
133 }
Line Stmt Id Pos Tree Symbol Tests Code
47 6969 1842 - 1843 Literal <nosymbol> 4
48 7550 1900 - 1908 Select scala.concurrent.duration.DurationConversions.minute scala.concurrent.duration.`package`.DurationInt(1).minute
48 6174 1900 - 1901 Literal <nosymbol> 1
49 7116 1956 - 1972 Apply java.util.Properties.<init> new java.util.Properties()
52 6293 2105 - 2121 Apply java.util.Properties.<init> new java.util.Properties()
53 6025 2126 - 2213 Apply java.util.Properties.put props.put("bootstrap.servers", kafkaConfiguration.connectionString)
53 6796 2177 - 2212 Select org.make.api.extensions.KafkaConfiguration.connectionString kafkaConfiguration.connectionString
53 7693 2136 - 2175 Literal <nosymbol> "bootstrap.servers"
54 6973 2260 - 2267 Select org.make.api.technical.KafkaConsumerBehavior.groupId KafkaConsumerBehavior.this.groupId
54 7860 2228 - 2258 Literal <nosymbol> "group.id"
54 6092 2218 - 2268 Apply java.util.Properties.put props.put("group.id", KafkaConsumerBehavior.this.groupId)
55 7514 2273 - 2321 Apply java.util.Properties.put props.put("client.id", name)
56 6647 2326 - 2386 Apply java.util.Properties.put props.put("enable.auto.commit", "false")
57 7658 2391 - 2421 Apply java.util.Properties.putAll props.putAll(KafkaConsumerBehavior.this.customProperties)
57 6242 2404 - 2420 Select org.make.api.technical.KafkaConsumerBehavior.customProperties KafkaConsumerBehavior.this.customProperties
58 6749 2441 - 2592 Apply org.apache.kafka.clients.consumer.KafkaConsumer.<init> new org.apache.kafka.clients.consumer.KafkaConsumer[String,T](props, new org.apache.kafka.common.serialization.StringDeserializer(), new org.make.api.technical.MakeKafkaAvroDeserializer[T](kafkaConfiguration.schemaRegistry, technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing])(KafkaConsumerBehavior.this.evidence$2, KafkaConsumerBehavior.this.evidence$1))
60 6802 2490 - 2514 Apply org.apache.kafka.common.serialization.StringDeserializer.<init> new org.apache.kafka.common.serialization.StringDeserializer()
61 5995 2552 - 2585 Select org.make.api.extensions.KafkaConfiguration.schemaRegistry kafkaConfiguration.schemaRegistry
61 6931 2522 - 2522 Select org.make.api.technical.KafkaConsumerBehavior.evidence$2 KafkaConsumerBehavior.this.evidence$2
61 7519 2522 - 2586 ApplyToImplicitArgs org.make.api.technical.MakeKafkaAvroDeserializer.<init> new org.make.api.technical.MakeKafkaAvroDeserializer[T](kafkaConfiguration.schemaRegistry, technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing])(KafkaConsumerBehavior.this.evidence$2, KafkaConsumerBehavior.this.evidence$1)
61 6077 2522 - 2522 Select org.make.api.technical.KafkaConsumerBehavior.evidence$1 KafkaConsumerBehavior.this.evidence$1
61 7865 2522 - 2522 TypeApply org.make.api.technical.MakeKafkaAvroDeserializer.<init>$default$2 technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing]
63 6342 2660 - 2668 Select org.make.api.technical.KafkaConsumerBehavior.topicKey KafkaConsumerBehavior.this.topicKey
63 7688 2635 - 2669 Apply org.make.api.extensions.KafkaConfiguration.topic kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey)
63 6060 2597 - 2671 Apply org.apache.kafka.clients.consumer.KafkaConsumer.subscribe consumer.subscribe(java.util.Arrays.asList[String](kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey)))
63 6777 2616 - 2670 Apply java.util.Arrays.asList java.util.Arrays.asList[String](kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey))
68 5968 2759 - 4731 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.technical.KafkaConsumerBehavior.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol]) => { val kafkaConfiguration: org.make.api.extensions.KafkaConfiguration = org.make.api.extensions.KafkaConfiguration.apply(context.system); val consumer: org.apache.kafka.clients.consumer.KafkaConsumer[String,T] = KafkaConsumerBehavior.this.createConsumer(kafkaConfiguration, name); typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume); implicit val system: akka.actor.typed.ActorSystem[Nothing] = context.system; val dispatcher: scala.concurrent.ExecutionContextExecutor = system.dispatchers.lookup(akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher)); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.KafkaConsumerBehavior.Protocol](((x0$1: org.make.api.technical.KafkaConsumerBehavior.Protocol) => x0$1 match { case (replyTo: akka.actor.typed.ActorRef[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus]): org.make.api.technical.KafkaConsumerBehavior.CheckState((replyTo @ _)) => { if (consumer.assignment().size().>(0)) typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready) else typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } case org.make.api.technical.KafkaConsumerBehavior.Consume => { typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume); val records: org.apache.kafka.clients.consumer.ConsumerRecords[String,T] = consumer.poll(kafkaConfiguration.pollTimeout); val handleRecords: scala.concurrent.Future[akka.Done] = akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => { val messageClass: String = message match { case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event case _ => message }.getClass().getSimpleName(); org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String))); KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable { def <init>(): <$anon: Throwable => Unit> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,Unit]))(dispatcher) })).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system)); scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)) match { case (value: scala.concurrent.Future[akka.Done]): scala.util.Success[scala.concurrent.Future[akka.Done]](_) => () case (exception: Throwable): scala.util.Failure[scala.concurrent.Future[akka.Done]]((e @ _)) => context.log.error("Timeout occurred while consuming message", e) }; consumer.commitSync(); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } })).receiveSignal(({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]] with java.io.Serializable { def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => { consumer.close(); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } case (defaultCase$ @ _) => default.apply(x2) }; final def isDefinedAt(x2: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): Boolean = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]])) }))
69 7039 2819 - 2853 Apply akka.actor.typed.ExtensionId.apply org.make.api.extensions.KafkaConfiguration.apply(context.system)
69 7731 2838 - 2852 Select akka.actor.typed.scaladsl.ActorContext.system context.system
70 6170 2875 - 2915 Apply org.make.api.technical.KafkaConsumerBehavior.createConsumer KafkaConsumerBehavior.this.createConsumer(kafkaConfiguration, name)
72 7485 2923 - 2935 Select akka.actor.typed.scaladsl.ActorContext.self context.self
72 6207 2923 - 2945 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume)
72 6757 2938 - 2945 Select org.make.api.technical.KafkaConsumerBehavior.Consume org.make.api.technical.KafkaConsumerBehavior.Consume
74 7650 2997 - 3011 Select akka.actor.typed.scaladsl.ActorContext.system context.system
75 7452 3035 - 3108 Apply akka.actor.typed.Dispatchers.lookup system.dispatchers.lookup(akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher))
75 6879 3091 - 3106 Select org.make.api.kafka.kafkaDispatcher org.make.api.kafka.kafkaDispatcher
75 6065 3061 - 3107 Apply akka.actor.typed.DispatcherSelector.fromConfig akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher)
80 6970 3216 - 3248 Apply scala.Int.> consumer.assignment().size().>(0)
81 6763 3266 - 3281 Block akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
81 7510 3266 - 3281 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
81 6131 3276 - 3281 Select org.make.api.technical.KafkaConsumerBehavior.Ready org.make.api.technical.KafkaConsumerBehavior.Ready
83 7655 3317 - 3334 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting)
83 6312 3327 - 3334 Select org.make.api.technical.KafkaConsumerBehavior.Waiting org.make.api.technical.KafkaConsumerBehavior.Waiting
83 6845 3317 - 3334 Block akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting)
85 5993 3361 - 3375 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
87 6136 3414 - 3436 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume)
87 6955 3429 - 3436 Select org.make.api.technical.KafkaConsumerBehavior.Consume org.make.api.technical.KafkaConsumerBehavior.Consume
87 7324 3414 - 3426 Select akka.actor.typed.scaladsl.ActorContext.self context.self
88 6688 3463 - 3508 Apply org.apache.kafka.clients.consumer.KafkaConsumer.poll consumer.poll(kafkaConfiguration.pollTimeout)
88 7516 3477 - 3507 Select org.make.api.extensions.KafkaConfiguration.pollTimeout kafkaConfiguration.pollTimeout
90 6284 3582 - 3606 Select scala.collection.IterableOnce.iterator scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator
91 7627 3627 - 3636 Apply org.apache.kafka.clients.consumer.ConsumerRecord.value x$1.value()
92 6850 3662 - 3687 Select org.make.api.technical.KafkaConsumerBehavior.handleMessagesParallelism KafkaConsumerBehavior.this.handleMessagesParallelism
96 5997 3738 - 3919 Apply java.lang.Class.getSimpleName message match { case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event case _ => message }.getClass().getSimpleName()
97 7374 3955 - 4009 Apply org.make.core.SlugHelper.apply org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String))
97 6956 3936 - 4010 Apply org.make.api.technical.tracing.Tracing.entrypoint org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String)))
100 6098 4027 - 4221 Apply scala.concurrent.Future.recover KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable { def <init>(): <$anon: Throwable => Unit> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,Unit]))(dispatcher)
102 7481 4261 - 4272 Select akka.stream.scaladsl.Sink.ignore akka.stream.scaladsl.Sink.ignore
102 6691 4260 - 4260 ApplyToImplicitArgs akka.stream.Materializer.matFromSystem stream.this.Materializer.matFromSystem(system)
102 5913 3541 - 4273 ApplyToImplicitArgs akka.stream.scaladsl.Source.runWith akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => { val messageClass: String = message match { case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event case _ => message }.getClass().getSimpleName(); org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String))); KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable { def <init>(): <$anon: Throwable => Unit> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,Unit]))(dispatcher) })).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system))
104 5960 4287 - 4341 Apply scala.util.Try.apply scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout))
104 7631 4318 - 4339 Select org.make.api.technical.KafkaConsumerBehavior.handleMessagesTimeout KafkaConsumerBehavior.this.handleMessagesTimeout
104 6778 4291 - 4340 Apply scala.concurrent.Await.ready scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)
105 7448 4380 - 4382 Literal <nosymbol> ()
106 7000 4416 - 4480 Apply org.slf4j.Logger.error context.log.error("Timeout occurred while consuming message", e)
109 6102 4544 - 4565 Apply org.apache.kafka.clients.consumer.KafkaConsumer.commitSync consumer.commitSync()
110 7486 4578 - 4592 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
112 6766 3116 - 4725 Apply akka.actor.typed.scaladsl.Behaviors.Receive.receiveSignal akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.KafkaConsumerBehavior.Protocol](((x0$1: org.make.api.technical.KafkaConsumerBehavior.Protocol) => x0$1 match { case (replyTo: akka.actor.typed.ActorRef[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus]): org.make.api.technical.KafkaConsumerBehavior.CheckState((replyTo @ _)) => { if (consumer.assignment().size().>(0)) typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready) else typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } case org.make.api.technical.KafkaConsumerBehavior.Consume => { typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume); val records: org.apache.kafka.clients.consumer.ConsumerRecords[String,T] = consumer.poll(kafkaConfiguration.pollTimeout); val handleRecords: scala.concurrent.Future[akka.Done] = akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => { val messageClass: String = message match { case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event case _ => message }.getClass().getSimpleName(); org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String))); KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable { def <init>(): <$anon: Throwable => Unit> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,Unit]))(dispatcher) })).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system)); scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)) match { case (value: scala.concurrent.Future[akka.Done]): scala.util.Success[scala.concurrent.Future[akka.Done]](_) => () case (exception: Throwable): scala.util.Failure[scala.concurrent.Future[akka.Done]]((e @ _)) => context.log.error("Timeout occurred while consuming message", e) }; consumer.commitSync(); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } })).receiveSignal(({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]] with java.io.Serializable { def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => { consumer.close(); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol] } case (defaultCase$ @ _) => default.apply(x2) }; final def isDefinedAt(x2: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): Boolean = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]]))
112 7596 4626 - 4626 Apply org.make.api.technical.KafkaConsumerBehavior.$anonfun.<init> new $anonfun()
114 6664 4672 - 4688 Apply org.apache.kafka.clients.consumer.KafkaConsumer.close consumer.close()
115 5921 4701 - 4715 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
121 7408 4793 - 4804 Select scala.concurrent.Future.unit scala.concurrent.Future.unit