1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api
21 
22 import com.sksamuel.avro4s.{Encoder, SchemaFor}
23 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
24 import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
25 import org.make.api.docker.DockerKafkaService
26 
27 import java.util.Properties
28 
29 trait KafkaTest extends MakeUnitTest with DockerKafkaService {
30   def createProducer[T: SchemaFor: Encoder]: KafkaProducer[String, T] = {
31     val registryUrl = s"http://localhost:$registryExposedPort"
32     val props = new Properties()
33     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:$kafkaExposedPort")
34     props.put(ProducerConfig.ACKS_CONFIG, "all")
35     props.put(ProducerConfig.RETRIES_CONFIG, "3")
36     props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384")
37     props.put(ProducerConfig.LINGER_MS_CONFIG, "1")
38     props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432")
39     val serializer: Serializer[T] = createSerializer[T](registryUrl)
40     new KafkaProducer(props, new StringSerializer(), serializer)
41   }
42 
43   def createSerializer[T: SchemaFor: Encoder](registryUrl: String): Serializer[T]
44 }
Line Stmt Id Pos Tree Symbol Tests Code
32 22044 1256 - 1272 Apply java.util.Properties.<init> org.make.api.kafkatest new java.util.Properties()
33 22115 1277 - 1359 Apply java.util.Properties.put org.make.api.kafkatest props.put("bootstrap.servers", ("localhost:".+(KafkaTest.this.kafkaExposedPort): String))
33 22201 1287 - 1326 Literal <nosymbol> org.make.api.kafkatest "bootstrap.servers"
34 22070 1364 - 1408 Apply java.util.Properties.put org.make.api.kafkatest props.put("acks", "all")
35 22231 1413 - 1458 Apply java.util.Properties.put org.make.api.kafkatest props.put("retries", "3")
36 22122 1463 - 1515 Apply java.util.Properties.put org.make.api.kafkatest props.put("batch.size", "16384")
37 22037 1520 - 1567 Apply java.util.Properties.put org.make.api.kafkatest props.put("linger.ms", "1")
38 22198 1572 - 1630 Apply java.util.Properties.put org.make.api.kafkatest props.put("buffer.memory", "33554432")
39 22143 1667 - 1699 ApplyToImplicitArgs org.make.api.KafkaTest.createSerializer org.make.api.kafkatest KafkaTest.this.createSerializer[T](registryUrl)(evidence$1, evidence$2)
40 22055 1729 - 1751 Apply org.apache.kafka.common.serialization.StringSerializer.<init> org.make.api.kafkatest new org.apache.kafka.common.serialization.StringSerializer()
40 22206 1704 - 1764 Apply org.apache.kafka.clients.producer.KafkaProducer.<init> org.make.api.kafkatest new org.apache.kafka.clients.producer.KafkaProducer[String,T](props, new org.apache.kafka.common.serialization.StringSerializer(), serializer)