| Line |
Stmt Id |
Pos |
Tree |
Symbol |
Tests |
Code |
|
47
|
6969
|
1842
-
1843
|
Literal
|
<nosymbol>
|
|
4
|
|
48
|
7550
|
1900
-
1908
|
Select
|
scala.concurrent.duration.DurationConversions.minute
|
|
scala.concurrent.duration.`package`.DurationInt(1).minute
|
|
48
|
6174
|
1900
-
1901
|
Literal
|
<nosymbol>
|
|
1
|
|
49
|
7116
|
1956
-
1972
|
Apply
|
java.util.Properties.<init>
|
|
new java.util.Properties()
|
|
52
|
6293
|
2105
-
2121
|
Apply
|
java.util.Properties.<init>
|
|
new java.util.Properties()
|
|
53
|
6025
|
2126
-
2213
|
Apply
|
java.util.Properties.put
|
|
props.put("bootstrap.servers", kafkaConfiguration.connectionString)
|
|
53
|
6796
|
2177
-
2212
|
Select
|
org.make.api.extensions.KafkaConfiguration.connectionString
|
|
kafkaConfiguration.connectionString
|
|
53
|
7693
|
2136
-
2175
|
Literal
|
<nosymbol>
|
|
"bootstrap.servers"
|
|
54
|
6973
|
2260
-
2267
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.groupId
|
|
KafkaConsumerBehavior.this.groupId
|
|
54
|
7860
|
2228
-
2258
|
Literal
|
<nosymbol>
|
|
"group.id"
|
|
54
|
6092
|
2218
-
2268
|
Apply
|
java.util.Properties.put
|
|
props.put("group.id", KafkaConsumerBehavior.this.groupId)
|
|
55
|
7514
|
2273
-
2321
|
Apply
|
java.util.Properties.put
|
|
props.put("client.id", name)
|
|
56
|
6647
|
2326
-
2386
|
Apply
|
java.util.Properties.put
|
|
props.put("enable.auto.commit", "false")
|
|
57
|
7658
|
2391
-
2421
|
Apply
|
java.util.Properties.putAll
|
|
props.putAll(KafkaConsumerBehavior.this.customProperties)
|
|
57
|
6242
|
2404
-
2420
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.customProperties
|
|
KafkaConsumerBehavior.this.customProperties
|
|
58
|
6749
|
2441
-
2592
|
Apply
|
org.apache.kafka.clients.consumer.KafkaConsumer.<init>
|
|
new org.apache.kafka.clients.consumer.KafkaConsumer[String,T](props, new org.apache.kafka.common.serialization.StringDeserializer(), new org.make.api.technical.MakeKafkaAvroDeserializer[T](kafkaConfiguration.schemaRegistry, technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing])(KafkaConsumerBehavior.this.evidence$2, KafkaConsumerBehavior.this.evidence$1))
|
|
60
|
6802
|
2490
-
2514
|
Apply
|
org.apache.kafka.common.serialization.StringDeserializer.<init>
|
|
new org.apache.kafka.common.serialization.StringDeserializer()
|
|
61
|
5995
|
2552
-
2585
|
Select
|
org.make.api.extensions.KafkaConfiguration.schemaRegistry
|
|
kafkaConfiguration.schemaRegistry
|
|
61
|
6931
|
2522
-
2522
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.evidence$2
|
|
KafkaConsumerBehavior.this.evidence$2
|
|
61
|
7519
|
2522
-
2586
|
ApplyToImplicitArgs
|
org.make.api.technical.MakeKafkaAvroDeserializer.<init>
|
|
new org.make.api.technical.MakeKafkaAvroDeserializer[T](kafkaConfiguration.schemaRegistry, technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing])(KafkaConsumerBehavior.this.evidence$2, KafkaConsumerBehavior.this.evidence$1)
|
|
61
|
6077
|
2522
-
2522
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.evidence$1
|
|
KafkaConsumerBehavior.this.evidence$1
|
|
61
|
7865
|
2522
-
2522
|
TypeApply
|
org.make.api.technical.MakeKafkaAvroDeserializer.<init>$default$2
|
|
technical.this.MakeKafkaAvroDeserializer.<init>$default$2[Nothing]
|
|
63
|
6342
|
2660
-
2668
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.topicKey
|
|
KafkaConsumerBehavior.this.topicKey
|
|
63
|
7688
|
2635
-
2669
|
Apply
|
org.make.api.extensions.KafkaConfiguration.topic
|
|
kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey)
|
|
63
|
6060
|
2597
-
2671
|
Apply
|
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe
|
|
consumer.subscribe(java.util.Arrays.asList[String](kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey)))
|
|
63
|
6777
|
2616
-
2670
|
Apply
|
java.util.Arrays.asList
|
|
java.util.Arrays.asList[String](kafkaConfiguration.topic(KafkaConsumerBehavior.this.topicKey))
|
|
68
|
5968
|
2759
-
4731
|
Apply
|
akka.actor.typed.scaladsl.Behaviors.setup
|
|
akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.technical.KafkaConsumerBehavior.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol]) => {
val kafkaConfiguration: org.make.api.extensions.KafkaConfiguration = org.make.api.extensions.KafkaConfiguration.apply(context.system);
val consumer: org.apache.kafka.clients.consumer.KafkaConsumer[String,T] = KafkaConsumerBehavior.this.createConsumer(kafkaConfiguration, name);
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume);
implicit val system: akka.actor.typed.ActorSystem[Nothing] = context.system;
val dispatcher: scala.concurrent.ExecutionContextExecutor = system.dispatchers.lookup(akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher));
akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.KafkaConsumerBehavior.Protocol](((x0$1: org.make.api.technical.KafkaConsumerBehavior.Protocol) => x0$1 match {
case (replyTo: akka.actor.typed.ActorRef[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus]): org.make.api.technical.KafkaConsumerBehavior.CheckState((replyTo @ _)) => {
if (consumer.assignment().size().>(0))
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
else
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting);
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
case org.make.api.technical.KafkaConsumerBehavior.Consume => {
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume);
val records: org.apache.kafka.clients.consumer.ConsumerRecords[String,T] = consumer.poll(kafkaConfiguration.pollTimeout);
val handleRecords: scala.concurrent.Future[akka.Done] = akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => {
val messageClass: String = message match {
case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event
case _ => message
}.getClass().getSimpleName();
org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String)));
KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable {
def <init>(): <$anon: Throwable => Unit> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e)
case (defaultCase$ @ _) => default.apply(x1)
};
final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[Throwable,Unit]))(dispatcher)
})).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system));
scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)) match {
case (value: scala.concurrent.Future[akka.Done]): scala.util.Success[scala.concurrent.Future[akka.Done]](_) => ()
case (exception: Throwable): scala.util.Failure[scala.concurrent.Future[akka.Done]]((e @ _)) => context.log.error("Timeout occurred while consuming message", e)
};
consumer.commitSync();
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
})).receiveSignal(({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]] with java.io.Serializable {
def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match {
case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => {
consumer.close();
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
case (defaultCase$ @ _) => default.apply(x2)
};
final def isDefinedAt(x2: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): Boolean = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match {
case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]]))
}))
|
|
69
|
7039
|
2819
-
2853
|
Apply
|
akka.actor.typed.ExtensionId.apply
|
|
org.make.api.extensions.KafkaConfiguration.apply(context.system)
|
|
69
|
7731
|
2838
-
2852
|
Select
|
akka.actor.typed.scaladsl.ActorContext.system
|
|
context.system
|
|
70
|
6170
|
2875
-
2915
|
Apply
|
org.make.api.technical.KafkaConsumerBehavior.createConsumer
|
|
KafkaConsumerBehavior.this.createConsumer(kafkaConfiguration, name)
|
|
72
|
7485
|
2923
-
2935
|
Select
|
akka.actor.typed.scaladsl.ActorContext.self
|
|
context.self
|
|
72
|
6207
|
2923
-
2945
|
Apply
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume)
|
|
72
|
6757
|
2938
-
2945
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.Consume
|
|
org.make.api.technical.KafkaConsumerBehavior.Consume
|
|
74
|
7650
|
2997
-
3011
|
Select
|
akka.actor.typed.scaladsl.ActorContext.system
|
|
context.system
|
|
75
|
7452
|
3035
-
3108
|
Apply
|
akka.actor.typed.Dispatchers.lookup
|
|
system.dispatchers.lookup(akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher))
|
|
75
|
6879
|
3091
-
3106
|
Select
|
org.make.api.kafka.kafkaDispatcher
|
|
org.make.api.kafka.kafkaDispatcher
|
|
75
|
6065
|
3061
-
3107
|
Apply
|
akka.actor.typed.DispatcherSelector.fromConfig
|
|
akka.actor.typed.DispatcherSelector.fromConfig(org.make.api.kafka.kafkaDispatcher)
|
|
80
|
6970
|
3216
-
3248
|
Apply
|
scala.Int.>
|
|
consumer.assignment().size().>(0)
|
|
81
|
6763
|
3266
-
3281
|
Block
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
|
|
81
|
7510
|
3266
-
3281
|
Apply
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
|
|
81
|
6131
|
3276
-
3281
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.Ready
|
|
org.make.api.technical.KafkaConsumerBehavior.Ready
|
|
83
|
7655
|
3317
-
3334
|
Apply
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting)
|
|
83
|
6312
|
3327
-
3334
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.Waiting
|
|
org.make.api.technical.KafkaConsumerBehavior.Waiting
|
|
83
|
6845
|
3317
-
3334
|
Block
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting)
|
|
85
|
5993
|
3361
-
3375
|
TypeApply
|
akka.actor.typed.scaladsl.Behaviors.same
|
|
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
|
|
87
|
6136
|
3414
-
3436
|
Apply
|
akka.actor.typed.ActorRef.ActorRefOps.!
|
|
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume)
|
|
87
|
6955
|
3429
-
3436
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.Consume
|
|
org.make.api.technical.KafkaConsumerBehavior.Consume
|
|
87
|
7324
|
3414
-
3426
|
Select
|
akka.actor.typed.scaladsl.ActorContext.self
|
|
context.self
|
|
88
|
6688
|
3463
-
3508
|
Apply
|
org.apache.kafka.clients.consumer.KafkaConsumer.poll
|
|
consumer.poll(kafkaConfiguration.pollTimeout)
|
|
88
|
7516
|
3477
-
3507
|
Select
|
org.make.api.extensions.KafkaConfiguration.pollTimeout
|
|
kafkaConfiguration.pollTimeout
|
|
90
|
6284
|
3582
-
3606
|
Select
|
scala.collection.IterableOnce.iterator
|
|
scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator
|
|
91
|
7627
|
3627
-
3636
|
Apply
|
org.apache.kafka.clients.consumer.ConsumerRecord.value
|
|
x$1.value()
|
|
92
|
6850
|
3662
-
3687
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.handleMessagesParallelism
|
|
KafkaConsumerBehavior.this.handleMessagesParallelism
|
|
96
|
5997
|
3738
-
3919
|
Apply
|
java.lang.Class.getSimpleName
|
|
message match {
case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event
case _ => message
}.getClass().getSimpleName()
|
|
97
|
7374
|
3955
-
4009
|
Apply
|
org.make.core.SlugHelper.apply
|
|
org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String))
|
|
97
|
6956
|
3936
-
4010
|
Apply
|
org.make.api.technical.tracing.Tracing.entrypoint
|
|
org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String)))
|
|
100
|
6098
|
4027
-
4221
|
Apply
|
scala.concurrent.Future.recover
|
|
KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable {
def <init>(): <$anon: Throwable => Unit> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e)
case (defaultCase$ @ _) => default.apply(x1)
};
final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[Throwable,Unit]))(dispatcher)
|
|
102
|
7481
|
4261
-
4272
|
Select
|
akka.stream.scaladsl.Sink.ignore
|
|
akka.stream.scaladsl.Sink.ignore
|
|
102
|
6691
|
4260
-
4260
|
ApplyToImplicitArgs
|
akka.stream.Materializer.matFromSystem
|
|
stream.this.Materializer.matFromSystem(system)
|
|
102
|
5913
|
3541
-
4273
|
ApplyToImplicitArgs
|
akka.stream.scaladsl.Source.runWith
|
|
akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => {
val messageClass: String = message match {
case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event
case _ => message
}.getClass().getSimpleName();
org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String)));
KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable {
def <init>(): <$anon: Throwable => Unit> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e)
case (defaultCase$ @ _) => default.apply(x1)
};
final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[Throwable,Unit]))(dispatcher)
})).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system))
|
|
104
|
5960
|
4287
-
4341
|
Apply
|
scala.util.Try.apply
|
|
scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout))
|
|
104
|
7631
|
4318
-
4339
|
Select
|
org.make.api.technical.KafkaConsumerBehavior.handleMessagesTimeout
|
|
KafkaConsumerBehavior.this.handleMessagesTimeout
|
|
104
|
6778
|
4291
-
4340
|
Apply
|
scala.concurrent.Await.ready
|
|
scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)
|
|
105
|
7448
|
4380
-
4382
|
Literal
|
<nosymbol>
|
|
()
|
|
106
|
7000
|
4416
-
4480
|
Apply
|
org.slf4j.Logger.error
|
|
context.log.error("Timeout occurred while consuming message", e)
|
|
109
|
6102
|
4544
-
4565
|
Apply
|
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync
|
|
consumer.commitSync()
|
|
110
|
7486
|
4578
-
4592
|
TypeApply
|
akka.actor.typed.scaladsl.Behaviors.same
|
|
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
|
|
112
|
6766
|
3116
-
4725
|
Apply
|
akka.actor.typed.scaladsl.Behaviors.Receive.receiveSignal
|
|
akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.KafkaConsumerBehavior.Protocol](((x0$1: org.make.api.technical.KafkaConsumerBehavior.Protocol) => x0$1 match {
case (replyTo: akka.actor.typed.ActorRef[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus]): org.make.api.technical.KafkaConsumerBehavior.CheckState((replyTo @ _)) => {
if (consumer.assignment().size().>(0))
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Ready)
else
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.ConsumerStatus](replyTo).!(org.make.api.technical.KafkaConsumerBehavior.Waiting);
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
case org.make.api.technical.KafkaConsumerBehavior.Consume => {
typed.this.ActorRef.ActorRefOps[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.self).!(org.make.api.technical.KafkaConsumerBehavior.Consume);
val records: org.apache.kafka.clients.consumer.ConsumerRecords[String,T] = consumer.poll(kafkaConfiguration.pollTimeout);
val handleRecords: scala.concurrent.Future[akka.Done] = akka.stream.scaladsl.Source.fromIterator[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]]((() => scala.jdk.CollectionConverters.IterableHasAsScala[org.apache.kafka.clients.consumer.ConsumerRecord[String,T]](records).asScala.iterator)).map[T](((x$1: org.apache.kafka.clients.consumer.ConsumerRecord[String,T]) => x$1.value())).mapAsync[Any](KafkaConsumerBehavior.this.handleMessagesParallelism)(((message: T) => {
val messageClass: String = message match {
case (wrapper @ (_: org.make.core.EventWrapper[_])) => wrapper.event
case _ => message
}.getClass().getSimpleName();
org.make.api.technical.tracing.Tracing.entrypoint(org.make.core.SlugHelper.apply(("".+(KafkaConsumerBehavior.this.getClass().getSimpleName()).+("-").+(messageClass): String)));
KafkaConsumerBehavior.this.handleMessage(message)(context.system).recover[Any](({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable {
def <init>(): <$anon: Throwable => Unit> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => context.log.error(("Error while handling message of type [".+(messageClass).+("]: ").+(message): String), e)
case (defaultCase$ @ _) => default.apply(x1)
};
final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match {
case (e @ _) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[Throwable,Unit]))(dispatcher)
})).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(stream.this.Materializer.matFromSystem(system));
scala.util.Try.apply[scala.concurrent.Future[akka.Done]](scala.concurrent.Await.ready[akka.Done](handleRecords, KafkaConsumerBehavior.this.handleMessagesTimeout)) match {
case (value: scala.concurrent.Future[akka.Done]): scala.util.Success[scala.concurrent.Future[akka.Done]](_) => ()
case (exception: Throwable): scala.util.Failure[scala.concurrent.Future[akka.Done]]((e @ _)) => context.log.error("Timeout occurred while consuming message", e)
};
consumer.commitSync();
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
})).receiveSignal(({
@SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]] with java.io.Serializable {
def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]> = {
$anonfun.super.<init>();
()
};
final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match {
case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => {
consumer.close();
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
}
case (defaultCase$ @ _) => default.apply(x2)
};
final def isDefinedAt(x2: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): Boolean = ((x2.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal) @unchecked) match {
case (_1: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true
case (defaultCase$ @ _) => false
}
};
new $anonfun()
}: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.KafkaConsumerBehavior.Protocol], akka.actor.typed.Signal),akka.actor.typed.Behavior[org.make.api.technical.KafkaConsumerBehavior.Protocol]]))
|
|
112
|
7596
|
4626
-
4626
|
Apply
|
org.make.api.technical.KafkaConsumerBehavior.$anonfun.<init>
|
|
new $anonfun()
|
|
114
|
6664
|
4672
-
4688
|
Apply
|
org.apache.kafka.clients.consumer.KafkaConsumer.close
|
|
consumer.close()
|
|
115
|
5921
|
4701
-
4715
|
TypeApply
|
akka.actor.typed.scaladsl.Behaviors.same
|
|
akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.KafkaConsumerBehavior.Protocol]
|
|
121
|
7408
|
4793
-
4804
|
Select
|
scala.concurrent.Future.unit
|
|
scala.concurrent.Future.unit
|