1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import com.sksamuel.avro4s.{Decoder, DefaultFieldMapper, FieldMapper, SchemaFor}
23 import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
24 import io.confluent.kafka.serializers.KafkaAvroDeserializer
25 import org.apache.avro.Schema
26 import org.apache.avro.generic.GenericRecord
27 import org.apache.kafka.common.serialization.Deserializer
28 
29 import java.util
30 
31 class MakeKafkaAvroDeserializer[T: Decoder: SchemaFor](
32   registryUrl: String,
33   fieldMapper: FieldMapper = DefaultFieldMapper
34 ) extends Deserializer[T] {
35 
36   val schema: Schema = SchemaFor[T].schema(fieldMapper)
37   val decoder: Decoder[T] = Decoder[T]
38 
39   private val identityMapCapacity = 1000
40   private val delegate = new KafkaAvroDeserializer(new CachedSchemaRegistryClient(registryUrl, identityMapCapacity))
41 
42   override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {
43     delegate.configure(configs, isKey)
44   }
45 
46   override def close(): Unit = {
47     delegate.close()
48   }
49 
50   @SuppressWarnings(Array("org.wartremover.warts.AsInstanceOf"))
51   override def deserialize(topic: String, data: Array[Byte]): T = {
52     decoder.decode(delegate.deserialize(topic, data).asInstanceOf[GenericRecord], schema, fieldMapper)
53   }
54 }
Line Stmt Id Pos Tree Symbol Tests Code
36 6553 1344 - 1355 Select org.make.api.technical.MakeKafkaAvroDeserializer.fieldMapper MakeKafkaAvroDeserializer.this.fieldMapper
36 7378 1333 - 1333 Select org.make.api.technical.MakeKafkaAvroDeserializer.evidence$2 MakeKafkaAvroDeserializer.this.evidence$2
36 5703 1324 - 1356 Apply com.sksamuel.avro4s.SchemaFor.schema com.sksamuel.avro4s.SchemaFor.apply[T](MakeKafkaAvroDeserializer.this.evidence$2).schema(MakeKafkaAvroDeserializer.this.fieldMapper)
37 6625 1385 - 1395 ApplyToImplicitArgs com.sksamuel.avro4s.Decoder.apply com.sksamuel.avro4s.Decoder.apply[T](MakeKafkaAvroDeserializer.this.evidence$1)
37 7492 1392 - 1392 Select org.make.api.technical.MakeKafkaAvroDeserializer.evidence$1 MakeKafkaAvroDeserializer.this.evidence$1
39 5834 1433 - 1437 Literal <nosymbol> 1000
40 7326 1463 - 1554 Apply io.confluent.kafka.serializers.KafkaAvroDeserializer.<init> new io.confluent.kafka.serializers.KafkaAvroDeserializer(new io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(MakeKafkaAvroDeserializer.this.registryUrl, MakeKafkaAvroDeserializer.this.identityMapCapacity))
40 5971 1489 - 1553 Apply io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init> new io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(MakeKafkaAvroDeserializer.this.registryUrl, MakeKafkaAvroDeserializer.this.identityMapCapacity)
40 6419 1533 - 1552 Select org.make.api.technical.MakeKafkaAvroDeserializer.identityMapCapacity MakeKafkaAvroDeserializer.this.identityMapCapacity
40 7265 1520 - 1531 Select org.make.api.technical.MakeKafkaAvroDeserializer.registryUrl MakeKafkaAvroDeserializer.this.registryUrl
43 6524 1641 - 1675 Apply io.confluent.kafka.serializers.KafkaAvroDeserializer.configure MakeKafkaAvroDeserializer.this.delegate.configure(configs, isKey)
47 5666 1718 - 1734 Apply io.confluent.kafka.serializers.KafkaAvroDeserializer.close MakeKafkaAvroDeserializer.this.delegate.close()
52 7494 1892 - 1953 TypeApply scala.Any.asInstanceOf MakeKafkaAvroDeserializer.this.delegate.deserialize(topic, data).asInstanceOf[org.apache.avro.generic.GenericRecord]
52 6731 1955 - 1961 Select org.make.api.technical.MakeKafkaAvroDeserializer.schema MakeKafkaAvroDeserializer.this.schema
52 5794 1963 - 1974 Select org.make.api.technical.MakeKafkaAvroDeserializer.fieldMapper MakeKafkaAvroDeserializer.this.fieldMapper
52 7199 1877 - 1975 Apply com.sksamuel.avro4s.Decoder.decode MakeKafkaAvroDeserializer.this.decoder.decode(MakeKafkaAvroDeserializer.this.delegate.deserialize(topic, data).asInstanceOf[org.apache.avro.generic.GenericRecord], MakeKafkaAvroDeserializer.this.schema, MakeKafkaAvroDeserializer.this.fieldMapper)