1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.actor.typed.eventstream.EventStream.Subscribe
23 import akka.actor.typed.scaladsl.Behaviors
24 import akka.actor.typed.{Behavior, LogOptions, PostStop}
25 import com.sksamuel.avro4s.{Encoder, SchemaFor}
26 import org.apache.kafka.clients.producer._
27 import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
28 import org.make.api.extensions.KafkaConfiguration
29 import org.make.core.{AvroSerializers, WithEventId}
30 import org.slf4j.event.Level
31 
32 import scala.concurrent.duration.DurationInt
33 import java.util.{Properties, UUID}
34 import scala.reflect.ClassTag
35 
36 abstract class KafkaProducerBehavior[Event: ClassTag, Wrapper: SchemaFor: Encoder] extends AvroSerializers {
37 
38   protected val topicKey: String
39 
40   protected def createProducerConfiguration(kafkaConfiguration: KafkaConfiguration, name: String): Properties = {
41     val props = new Properties()
42     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.connectionString)
43     props.put(ProducerConfig.ACKS_CONFIG, "all")
44     props.put(ProducerConfig.RETRIES_CONFIG, "3")
45     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
46     props.put(ProducerConfig.LINGER_MS_CONFIG, "1")
47     props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.make.api.technical.MakePartitioner")
48     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
49     props.put(ProducerConfig.CLIENT_ID_CONFIG, name)
50     props
51   }
52 
53   protected def createProducer(kafkaConfiguration: KafkaConfiguration, name: String): KafkaProducer[String, Wrapper] = {
54     val props: Properties = createProducerConfiguration(kafkaConfiguration, name)
55     val valueSerializer: Serializer[Wrapper] =
56       new MakeKafkaAvroSerializer[Wrapper](kafkaConfiguration.schemaRegistry)
57     new KafkaProducer(props, new StringSerializer(), valueSerializer)
58   }
59 
60   protected def wrapEvent(event: Event): Wrapper
61 
62   @SuppressWarnings(Array("org.wartremover.warts.Null"))
63   def createBehavior(name: String): Behavior[Event] = {
64     Behaviors.setup { context =>
65       val kafkaConfiguration = KafkaConfiguration(context.system)
66       val topic = kafkaConfiguration.topic(topicKey)
67       val producer = createProducer(kafkaConfiguration, name)
68       context.system.eventStream ! Subscribe(context.self)
69 
70       val sendCallBack: Callback = (r: RecordMetadata, e: Exception) => {
71         val topic = Option(r).map(_.topic()).getOrElse("unknown")
72         Option(e).foreach(e => context.log.error(s"Error when producing message on topic $topic", e))
73       }
74       context.log.info(s"Starting producer for topic $topic")
75       Behaviors.logMessages[Event](
76         LogOptions().withLevel(Level.DEBUG),
77         Behaviors
78           .receiveMessage[Event] { event =>
79             if (producer.partitionsFor(topic).size > 0) {
80               val record = wrapEvent(event)
81               producer.send(
82                 new ProducerRecord[String, Wrapper](
83                   topic,
84                   None.orNull,
85                   System.currentTimeMillis(),
86                   extractKey(record).getOrElse(UUID.randomUUID().toString),
87                   record
88                 ),
89                 sendCallBack
90               )
91               Behaviors.same
92             } else {
93               context.scheduleOnce(100.milliseconds, context.self, event)
94               Behaviors.same
95             }
96           }
97           .receiveSignal {
98             case (_, PostStop) =>
99               context.log.info(s"Stopping producer for topic $topic")
100               producer.close()
101               Behaviors.same
102           }
103       )
104     }
105   }
106 
107   private def extractKey(message: Wrapper): Option[String] = {
108     message match {
109       case event: WithEventId => event.eventId.map(_.value)
110       case _                  => None
111     }
112   }
113 }
Line Stmt Id Pos Tree Symbol Tests Code
41 6618 1620 - 1636 Apply java.util.Properties.<init> new java.util.Properties()
42 6133 1651 - 1690 Literal <nosymbol> "bootstrap.servers"
42 7463 1692 - 1727 Select org.make.api.extensions.KafkaConfiguration.connectionString kafkaConfiguration.connectionString
42 6727 1641 - 1728 Apply java.util.Properties.put props.put("bootstrap.servers", kafkaConfiguration.connectionString)
43 5875 1733 - 1777 Apply java.util.Properties.put props.put("acks", "all")
44 7699 1782 - 1827 Apply java.util.Properties.put props.put("retries", "3")
45 6847 1832 - 1884 Apply java.util.Properties.put props.put("batch.size", "16384")
46 5934 1889 - 1936 Apply java.util.Properties.put props.put("linger.ms", "1")
47 7414 1941 - 2033 Apply java.util.Properties.put props.put("partitioner.class", "org.make.api.technical.MakePartitioner")
48 6485 2038 - 2096 Apply java.util.Properties.put props.put("buffer.memory", "33554432")
49 6094 2101 - 2149 Apply java.util.Properties.put props.put("client.id", name)
54 7556 2314 - 2367 Apply org.make.api.technical.KafkaProducerBehavior.createProducerConfiguration KafkaProducerBehavior.this.createProducerConfiguration(kafkaConfiguration, name)
56 6806 2421 - 2421 Select org.make.api.technical.KafkaProducerBehavior.evidence$2 KafkaProducerBehavior.this.evidence$2
56 5884 2421 - 2421 TypeApply org.make.api.technical.MakeKafkaAvroSerializer.<init>$default$2 technical.this.MakeKafkaAvroSerializer.<init>$default$2[Wrapper]
56 7630 2421 - 2421 Select org.make.api.technical.KafkaProducerBehavior.evidence$3 KafkaProducerBehavior.this.evidence$3
56 5958 2421 - 2492 ApplyToImplicitArgs org.make.api.technical.MakeKafkaAvroSerializer.<init> new org.make.api.technical.MakeKafkaAvroSerializer[Wrapper](kafkaConfiguration.schemaRegistry, technical.this.MakeKafkaAvroSerializer.<init>$default$2[Wrapper])(KafkaProducerBehavior.this.evidence$3, KafkaProducerBehavior.this.evidence$2)
56 6711 2458 - 2491 Select org.make.api.extensions.KafkaConfiguration.schemaRegistry kafkaConfiguration.schemaRegistry
57 7388 2522 - 2544 Apply org.apache.kafka.common.serialization.StringSerializer.<init> new org.apache.kafka.common.serialization.StringSerializer()
57 6589 2497 - 2562 Apply org.apache.kafka.clients.producer.KafkaProducer.<init> new org.apache.kafka.clients.producer.KafkaProducer[String,Wrapper](props, new org.apache.kafka.common.serialization.StringSerializer(), valueSerializer)
64 5850 2735 - 4306 Apply akka.actor.typed.scaladsl.Behaviors.setup org.scalatest.testsuite akka.actor.typed.scaladsl.Behaviors.setup[Event](((context: akka.actor.typed.scaladsl.ActorContext[Event]) => { val kafkaConfiguration: org.make.api.extensions.KafkaConfiguration = org.make.api.extensions.KafkaConfiguration.apply(context.system); val topic: String = kafkaConfiguration.topic(KafkaProducerBehavior.this.topicKey); val producer: org.apache.kafka.clients.producer.KafkaProducer[String,Wrapper] = KafkaProducerBehavior.this.createProducer(kafkaConfiguration, name); typed.this.ActorRef.ActorRefOps[akka.actor.typed.eventstream.EventStream.Command](context.system.eventStream).!(akka.actor.typed.eventstream.EventStream.Subscribe.apply[Event](context.self)(KafkaProducerBehavior.this.evidence$1)); val sendCallBack: org.apache.kafka.clients.producer.Callback = ((r: org.apache.kafka.clients.producer.RecordMetadata, e: Exception) => { val topic: String = scala.Option.apply[org.apache.kafka.clients.producer.RecordMetadata](r).map[String](((x$1: org.apache.kafka.clients.producer.RecordMetadata) => x$1.topic())).getOrElse[String]("unknown"); scala.Option.apply[Exception](e).foreach[Unit](((e: Exception) => context.log.error(("Error when producing message on topic ".+(topic): String), e))) }); context.log.info(("Starting producer for topic ".+(topic): String)); akka.actor.typed.scaladsl.Behaviors.logMessages[Event](akka.actor.typed.LogOptions.apply().withLevel(DEBUG), akka.actor.typed.scaladsl.Behaviors.receiveMessage[Event](((event: Event) => if (producer.partitionsFor(topic).size().>(0)) { val record: Wrapper = KafkaProducerBehavior.this.wrapEvent(event); producer.send(new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record), sendCallBack); akka.actor.typed.scaladsl.Behaviors.same[Event] } else { context.scheduleOnce[Event](scala.concurrent.duration.`package`.DurationInt(100).milliseconds, context.self, event); akka.actor.typed.scaladsl.Behaviors.same[Event] })).receiveSignal(({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]] with java.io.Serializable { def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[Event]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[Event]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => { context.log.info(("Stopping producer for topic ".+(topic): String)); producer.close(); akka.actor.typed.scaladsl.Behaviors.same[Event] } case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): Boolean = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]]))) }))
65 7523 2795 - 2829 Apply akka.actor.typed.ExtensionId.apply org.make.api.extensions.KafkaConfiguration.apply(context.system)
65 6099 2814 - 2828 Select akka.actor.typed.scaladsl.ActorContext.system context.system
66 5893 2848 - 2882 Apply org.make.api.extensions.KafkaConfiguration.topic kafkaConfiguration.topic(KafkaProducerBehavior.this.topicKey)
66 6660 2873 - 2881 Select org.make.api.technical.KafkaProducerBehavior.topicKey KafkaProducerBehavior.this.topicKey
67 7217 2904 - 2944 Apply org.make.api.technical.KafkaProducerBehavior.createProducer KafkaProducerBehavior.this.createProducer(kafkaConfiguration, name)
68 6811 2951 - 2977 Select akka.actor.typed.ActorSystem.eventStream context.system.eventStream
68 7358 2989 - 2989 Select org.make.api.technical.KafkaProducerBehavior.evidence$1 KafkaProducerBehavior.this.evidence$1
68 6066 2990 - 3002 Select akka.actor.typed.scaladsl.ActorContext.self context.self
68 6517 2980 - 3003 ApplyToImplicitArgs akka.actor.typed.eventstream.EventStream.Subscribe.apply akka.actor.typed.eventstream.EventStream.Subscribe.apply[Event](context.self)(KafkaProducerBehavior.this.evidence$1)
68 6080 2951 - 3003 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[akka.actor.typed.eventstream.EventStream.Command](context.system.eventStream).!(akka.actor.typed.eventstream.EventStream.Subscribe.apply[Event](context.self)(KafkaProducerBehavior.this.evidence$1))
71 7460 3099 - 3144 Apply scala.Option.getOrElse scala.Option.apply[org.apache.kafka.clients.producer.RecordMetadata](r).map[String](((x$1: org.apache.kafka.clients.producer.RecordMetadata) => x$1.topic())).getOrElse[String]("unknown")
72 5821 3153 - 3246 Apply scala.Option.foreach scala.Option.apply[Exception](e).foreach[Unit](((e: Exception) => context.log.error(("Error when producing message on topic ".+(topic): String), e)))
72 6667 3176 - 3245 Apply org.slf4j.Logger.error context.log.error(("Error when producing message on topic ".+(topic): String), e)
74 7222 3261 - 3316 Apply org.slf4j.Logger.info context.log.info(("Starting producer for topic ".+(topic): String))
75 6739 3323 - 4300 Apply akka.actor.typed.scaladsl.Behaviors.logMessages akka.actor.typed.scaladsl.Behaviors.logMessages[Event](akka.actor.typed.LogOptions.apply().withLevel(DEBUG), akka.actor.typed.scaladsl.Behaviors.receiveMessage[Event](((event: Event) => if (producer.partitionsFor(topic).size().>(0)) { val record: Wrapper = KafkaProducerBehavior.this.wrapEvent(event); producer.send(new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record), sendCallBack); akka.actor.typed.scaladsl.Behaviors.same[Event] } else { context.scheduleOnce[Event](scala.concurrent.duration.`package`.DurationInt(100).milliseconds, context.self, event); akka.actor.typed.scaladsl.Behaviors.same[Event] })).receiveSignal(({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]] with java.io.Serializable { def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[Event]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[Event]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => { context.log.info(("Stopping producer for topic ".+(topic): String)); producer.close(); akka.actor.typed.scaladsl.Behaviors.same[Event] } case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): Boolean = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]])))
76 6782 3361 - 3396 Apply akka.actor.typed.LogOptions.withLevel akka.actor.typed.LogOptions.apply().withLevel(DEBUG)
79 5931 3476 - 3514 Apply scala.Int.> producer.partitionsFor(topic).size().>(0)
79 6585 3516 - 3953 Block <nosymbol> { val record: Wrapper = KafkaProducerBehavior.this.wrapEvent(event); producer.send(new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record), sendCallBack); akka.actor.typed.scaladsl.Behaviors.same[Event] }
80 7360 3545 - 3561 Apply org.make.api.technical.KafkaProducerBehavior.wrapEvent KafkaProducerBehavior.this.wrapEvent(event)
81 5935 3576 - 3910 Apply org.apache.kafka.clients.producer.KafkaProducer.send producer.send(new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record), sendCallBack)
82 6784 3607 - 3864 Apply org.apache.kafka.clients.producer.ProducerRecord.<init> new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record)
84 6082 3687 - 3698 ApplyToImplicitArgs scala.Option.orNull scala.None.orNull[Null](scala.this.<:<.refl[Null])
84 6578 3692 - 3692 TypeApply scala.<:<.refl scala.this.<:<.refl[Null]
85 7584 3718 - 3744 Apply java.lang.System.currentTimeMillis java.lang.System.currentTimeMillis()
85 6629 3718 - 3744 ApplyImplicitView scala.Predef.long2Long scala.Predef.long2Long(java.lang.System.currentTimeMillis())
86 5880 3793 - 3819 Apply java.util.UUID.toString java.util.UUID.randomUUID().toString()
86 7291 3764 - 3820 Apply scala.Option.getOrElse KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString())
91 7329 3925 - 3939 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[Event]
92 6809 3959 - 4077 Block <nosymbol> { context.scheduleOnce[Event](scala.concurrent.duration.`package`.DurationInt(100).milliseconds, context.self, event); akka.actor.typed.scaladsl.Behaviors.same[Event] }
93 6632 4014 - 4026 Select akka.actor.typed.scaladsl.ActorContext.self context.self
93 7586 3996 - 4012 Select scala.concurrent.duration.DurationConversions.milliseconds scala.concurrent.duration.`package`.DurationInt(100).milliseconds
93 5848 3975 - 4034 Apply akka.actor.typed.scaladsl.ActorContext.scheduleOnce context.scheduleOnce[Event](scala.concurrent.duration.`package`.DurationInt(100).milliseconds, context.self, event)
93 5772 3996 - 3999 Literal <nosymbol> 100
94 7296 4049 - 4063 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[Event]
97 7525 3406 - 4292 Apply akka.actor.typed.scaladsl.Behaviors.Receive.receiveSignal akka.actor.typed.scaladsl.Behaviors.receiveMessage[Event](((event: Event) => if (producer.partitionsFor(topic).size().>(0)) { val record: Wrapper = KafkaProducerBehavior.this.wrapEvent(event); producer.send(new org.apache.kafka.clients.producer.ProducerRecord[String,Wrapper](topic, scala.None.orNull[Null](scala.this.<:<.refl[Null]), scala.Predef.long2Long(java.lang.System.currentTimeMillis()), KafkaProducerBehavior.this.extractKey(record).getOrElse[String](java.util.UUID.randomUUID().toString()), record), sendCallBack); akka.actor.typed.scaladsl.Behaviors.same[Event] } else { context.scheduleOnce[Event](scala.concurrent.duration.`package`.DurationInt(100).milliseconds, context.self, event); akka.actor.typed.scaladsl.Behaviors.same[Event] })).receiveSignal(({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]] with java.io.Serializable { def <init>(): <$anon: ((akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)) => akka.actor.typed.Behavior[Event]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal), B1 >: akka.actor.typed.Behavior[Event]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => { context.log.info(("Stopping producer for topic ".+(topic): String)); producer.close(); akka.actor.typed.scaladsl.Behaviors.same[Event] } case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): Boolean = ((x1.asInstanceOf[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)]: (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal) @unchecked) match { case (_1: akka.actor.typed.scaladsl.ActorContext[Event], _2: akka.actor.typed.Signal): (akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal)(_, akka.actor.typed.PostStop) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(akka.actor.typed.scaladsl.ActorContext[Event], akka.actor.typed.Signal),akka.actor.typed.Behavior[Event]]))
97 5775 4115 - 4115 Apply org.make.api.technical.KafkaProducerBehavior.$anonfun.<init> new $anonfun()
99 6050 4165 - 4220 Apply org.slf4j.Logger.info context.log.info(("Stopping producer for topic ".+(topic): String))
100 7390 4235 - 4251 Apply org.apache.kafka.clients.producer.KafkaProducer.close producer.close()
101 6545 4266 - 4280 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[Event]
109 7260 4446 - 4453 Select org.make.core.EventId.value x$2.value
109 6412 4428 - 4454 Apply scala.Option.map event.eventId.map[String](((x$2: org.make.core.EventId) => x$2.value))
110 6005 4488 - 4492 Select scala.None scala.None