1 /* 2 * Make.org Core API 3 * Copyright (C) 2018 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api 21 22 import com.sksamuel.avro4s.{Encoder, SchemaFor} 23 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} 24 import org.apache.kafka.common.serialization.{Serializer, StringSerializer} 25 import org.make.api.docker.DockerKafkaService 26 27 import java.util.Properties 28 29 trait KafkaTest extends MakeUnitTest with DockerKafkaService { 30 def createProducer[T: SchemaFor: Encoder]: KafkaProducer[String, T] = { 31 val registryUrl = s"http://localhost:$registryExposedPort" 32 val props = new Properties() 33 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, s"localhost:$kafkaExposedPort") 34 props.put(ProducerConfig.ACKS_CONFIG, "all") 35 props.put(ProducerConfig.RETRIES_CONFIG, "3") 36 props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384") 37 props.put(ProducerConfig.LINGER_MS_CONFIG, "1") 38 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432") 39 val serializer: Serializer[T] = createSerializer[T](registryUrl) 40 new KafkaProducer(props, new StringSerializer(), serializer) 41 } 42 43 def createSerializer[T: SchemaFor: Encoder](registryUrl: String): Serializer[T] 44 }
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 32 | 22044 | 1256 - 1272 | Apply | java.util.Properties.<init> | org.make.api.kafkatest | new java.util.Properties() |
| 33 | 22115 | 1277 - 1359 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("bootstrap.servers", ("localhost:".+(KafkaTest.this.kafkaExposedPort): String)) |
| 33 | 22201 | 1287 - 1326 | Literal | <nosymbol> | org.make.api.kafkatest | "bootstrap.servers" |
| 34 | 22070 | 1364 - 1408 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("acks", "all") |
| 35 | 22231 | 1413 - 1458 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("retries", "3") |
| 36 | 22122 | 1463 - 1515 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("batch.size", "16384") |
| 37 | 22037 | 1520 - 1567 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("linger.ms", "1") |
| 38 | 22198 | 1572 - 1630 | Apply | java.util.Properties.put | org.make.api.kafkatest | props.put("buffer.memory", "33554432") |
| 39 | 22143 | 1667 - 1699 | ApplyToImplicitArgs | org.make.api.KafkaTest.createSerializer | org.make.api.kafkatest | KafkaTest.this.createSerializer[T](registryUrl)(evidence$1, evidence$2) |
| 40 | 22055 | 1729 - 1751 | Apply | org.apache.kafka.common.serialization.StringSerializer.<init> | org.make.api.kafkatest | new org.apache.kafka.common.serialization.StringSerializer() |
| 40 | 22206 | 1704 - 1764 | Apply | org.apache.kafka.clients.producer.KafkaProducer.<init> | org.make.api.kafkatest | new org.apache.kafka.clients.producer.KafkaProducer[String,T](props, new org.apache.kafka.common.serialization.StringSerializer(), serializer) |