1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import com.sksamuel.avro4s.{DefaultFieldMapper, Encoder, FieldMapper, SchemaFor}
23 import grizzled.slf4j.Logging
24 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
25 import io.confluent.kafka.serializers.KafkaAvroSerializer
26 import org.apache.avro.Schema
27 import org.apache.kafka.common.serialization.Serializer
28 
29 import java.util
30 import scala.jdk.CollectionConverters._
31 
32 class MakeKafkaAvroSerializer[T: Encoder: SchemaFor](registryUrl: String, fieldMapper: FieldMapper = DefaultFieldMapper)
33     extends Serializer[T]
34     with Logging {
35 
36   private val schema: Schema = SchemaFor[T].schema(DefaultFieldMapper)
37   private val encoder: Encoder[T] = Encoder[T]
38 
39   private val identityMapCapacity = 1000
40   private val delegate: Serializer[Object] = new KafkaAvroSerializer(
41     new CachedSchemaRegistryClient(registryUrl, identityMapCapacity),
42     Map("value.schema" -> schema.toString, "schema.registry.url" -> registryUrl).asJava
43   )
44 
45   override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
46     delegate.configure(configs, isKey)
47   }
48 
49   override def serialize(topic: String, data: T): Array[Byte] = {
50     delegate.serialize(topic, encoder.encode(data, schema, fieldMapper))
51   }
52 
53   override def close(): Unit = {
54     delegate.close()
55   }
56 }
Line Stmt Id Pos Tree Symbol Tests Code
36 6048 1384 - 1402 Select com.sksamuel.avro4s.DefaultFieldMapper org.make.api.makekafkatest com.sksamuel.avro4s.DefaultFieldMapper
36 7420 1364 - 1403 Apply com.sksamuel.avro4s.SchemaFor.schema org.make.api.makekafkatest com.sksamuel.avro4s.SchemaFor.apply[T](MakeKafkaAvroSerializer.this.evidence$2).schema(com.sksamuel.avro4s.DefaultFieldMapper)
36 6376 1373 - 1373 Select org.make.api.technical.MakeKafkaAvroSerializer.evidence$2 org.make.api.makekafkatest MakeKafkaAvroSerializer.this.evidence$2
37 5672 1440 - 1450 ApplyToImplicitArgs com.sksamuel.avro4s.Encoder.apply org.make.api.makekafkatest com.sksamuel.avro4s.Encoder.apply[T](MakeKafkaAvroSerializer.this.evidence$1)
37 6487 1447 - 1447 Select org.make.api.technical.MakeKafkaAvroSerializer.evidence$1 org.make.api.makekafkatest MakeKafkaAvroSerializer.this.evidence$1
39 7094 1488 - 1492 Literal <nosymbol> org.make.api.makekafkatest 1000
40 7261 1538 - 1724 Apply io.confluent.kafka.serializers.KafkaAvroSerializer.<init> org.make.api.makekafkatest new io.confluent.kafka.serializers.KafkaAvroSerializer(new io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(MakeKafkaAvroSerializer.this.registryUrl, MakeKafkaAvroSerializer.this.identityMapCapacity), scala.jdk.CollectionConverters.MapHasAsJava[String, String](scala.Predef.Map.apply[String, String](scala.Predef.ArrowAssoc[String]("value.schema").->[String](MakeKafkaAvroSerializer.this.schema.toString()), scala.Predef.ArrowAssoc[String]("schema.registry.url").->[String](MakeKafkaAvroSerializer.this.registryUrl))).asJava)
41 6737 1598 - 1609 Select org.make.api.technical.MakeKafkaAvroSerializer.registryUrl org.make.api.makekafkatest MakeKafkaAvroSerializer.this.registryUrl
41 5797 1611 - 1630 Select org.make.api.technical.MakeKafkaAvroSerializer.identityMapCapacity org.make.api.makekafkatest MakeKafkaAvroSerializer.this.identityMapCapacity
41 7258 1567 - 1631 Apply io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init> org.make.api.makekafkatest new io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(MakeKafkaAvroSerializer.this.registryUrl, MakeKafkaAvroSerializer.this.identityMapCapacity)
42 7425 1641 - 1674 Apply scala.Predef.ArrowAssoc.-> org.make.api.makekafkatest scala.Predef.ArrowAssoc[String]("value.schema").->[String](MakeKafkaAvroSerializer.this.schema.toString())
42 5899 1637 - 1720 Select scala.collection.convert.AsJavaExtensions.MapHasAsJava.asJava org.make.api.makekafkatest scala.jdk.CollectionConverters.MapHasAsJava[String, String](scala.Predef.Map.apply[String, String](scala.Predef.ArrowAssoc[String]("value.schema").->[String](MakeKafkaAvroSerializer.this.schema.toString()), scala.Predef.ArrowAssoc[String]("schema.registry.url").->[String](MakeKafkaAvroSerializer.this.registryUrl))).asJava
42 6548 1676 - 1697 Literal <nosymbol> org.make.api.makekafkatest "schema.registry.url"
42 6386 1641 - 1655 Literal <nosymbol> org.make.api.makekafkatest "value.schema"
42 6036 1659 - 1674 Apply org.apache.avro.Schema.toString org.make.api.makekafkatest MakeKafkaAvroSerializer.this.schema.toString()
42 7103 1676 - 1712 Apply scala.Predef.ArrowAssoc.-> org.make.api.makekafkatest scala.Predef.ArrowAssoc[String]("schema.registry.url").->[String](MakeKafkaAvroSerializer.this.registryUrl)
42 5745 1701 - 1712 Select org.make.api.technical.MakeKafkaAvroSerializer.registryUrl org.make.api.makekafkatest MakeKafkaAvroSerializer.this.registryUrl
42 6717 1637 - 1713 Apply scala.collection.MapFactory.apply org.make.api.makekafkatest scala.Predef.Map.apply[String, String](scala.Predef.ArrowAssoc[String]("value.schema").->[String](MakeKafkaAvroSerializer.this.schema.toString()), scala.Predef.ArrowAssoc[String]("schema.registry.url").->[String](MakeKafkaAvroSerializer.this.registryUrl))
46 6457 1811 - 1845 Apply org.apache.kafka.common.serialization.Serializer.configure MakeKafkaAvroSerializer.this.delegate.configure(configs, isKey)
50 5750 1921 - 1989 Apply org.apache.kafka.common.serialization.Serializer.serialize org.make.api.kafkaconsumertest MakeKafkaAvroSerializer.this.delegate.serialize(topic, MakeKafkaAvroSerializer.this.encoder.encode(data, MakeKafkaAvroSerializer.this.schema, MakeKafkaAvroSerializer.this.fieldMapper))
50 6037 1968 - 1974 Select org.make.api.technical.MakeKafkaAvroSerializer.schema org.make.api.kafkaconsumertest MakeKafkaAvroSerializer.this.schema
50 7430 1976 - 1987 Select org.make.api.technical.MakeKafkaAvroSerializer.fieldMapper org.make.api.kafkaconsumertest MakeKafkaAvroSerializer.this.fieldMapper
50 6522 1947 - 1988 Apply com.sksamuel.avro4s.Encoder.encode org.make.api.kafkaconsumertest MakeKafkaAvroSerializer.this.encoder.encode(data, MakeKafkaAvroSerializer.this.schema, MakeKafkaAvroSerializer.this.fieldMapper)
54 7081 2032 - 2048 Apply org.apache.kafka.common.serialization.Serializer.close org.make.api.kafkaconsumertest MakeKafkaAvroSerializer.this.delegate.close()