1 /* 2 * Make.org Core API 3 * Copyright (C) 2020 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api.technical.healthcheck 21 22 import akka.util.Timeout 23 import org.apache.kafka.clients.admin.{AdminClient, DescribeClusterOptions} 24 import org.apache.kafka.clients.producer.ProducerConfig 25 import org.make.api.extensions.KafkaConfiguration 26 import org.make.api.technical.TimeSettings 27 import org.make.api.technical.healthcheck.HealthCheck.Status 28 29 import java.util.Properties 30 import scala.concurrent.{ExecutionContext, Future} 31 import scala.util.{Failure, Success, Using} 32 33 class KafkaHealthCheck(kafkaConfiguration: KafkaConfiguration) extends HealthCheck { 34 35 override val techno: String = "kafka" 36 37 val timeout: Timeout = TimeSettings.defaultTimeout 38 39 private def createClient() = { 40 val properties = new Properties 41 properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.connectionString) 42 AdminClient.create(properties) 43 } 44 45 override def healthCheck()(implicit ctx: ExecutionContext): Future[Status] = { 46 Future { 47 Using(createClient()) { client => 48 client 49 .describeCluster(new DescribeClusterOptions().timeoutMs(kafkaConfiguration.pollTimeout.toInt)) 50 .clusterId() 51 .get(timeout.duration.length, timeout.duration.unit) 52 } match { 53 case Success(_) => Status.OK 54 case Failure(e) => Status.NOK(Some(e.getMessage)) 55 } 56 } 57 } 58 }
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 35 | 48367 | 1343 - 1350 | Literal | <nosymbol> | org.scalatest.testsuite | "kafka" |
| 37 | 40497 | 1377 - 1404 | Select | org.make.api.technical.TimeSettings.defaultTimeout | org.scalatest.testsuite | org.make.api.technical.TimeSettings.defaultTimeout |
| 40 | 32966 | 1460 - 1474 | Apply | java.util.Properties.<init> | new java.util.Properties() | |
| 41 | 50001 | 1502 - 1541 | Literal | <nosymbol> | "bootstrap.servers" | |
| 41 | 34016 | 1479 - 1579 | Apply | java.util.Properties.setProperty | properties.setProperty("bootstrap.servers", KafkaHealthCheck.this.kafkaConfiguration.connectionString) | |
| 41 | 41593 | 1543 - 1578 | Select | org.make.api.extensions.KafkaConfiguration.connectionString | KafkaHealthCheck.this.kafkaConfiguration.connectionString | |
| 42 | 46479 | 1584 - 1614 | Apply | org.apache.kafka.clients.admin.AdminClient.create | org.apache.kafka.clients.admin.AdminClient.create(properties) | |
| 46 | 39504 | 1705 - 2084 | ApplyToImplicitArgs | scala.concurrent.Future.apply | org.scalatest.testsuite | scala.concurrent.Future.apply[org.make.api.technical.healthcheck.HealthCheck.Status](scala.util.Using.apply[org.apache.kafka.clients.admin.AdminClient, String](KafkaHealthCheck.this.createClient())(((client: org.apache.kafka.clients.admin.AdminClient) => client.describeCluster(new org.apache.kafka.clients.admin.DescribeClusterOptions().timeoutMs(scala.Predef.int2Integer(KafkaHealthCheck.this.kafkaConfiguration.pollTimeout.toInt))).clusterId().get(KafkaHealthCheck.this.timeout.duration.length, KafkaHealthCheck.this.timeout.duration.unit)))(Using.this.Releasable.AutoCloseableIsReleasable) match { case (value: String): scala.util.Success[String](_) => org.make.api.technical.healthcheck.HealthCheck.Status.OK case (exception: Throwable): scala.util.Failure[String]((e @ _)) => org.make.api.technical.healthcheck.HealthCheck.Status.NOK.apply(scala.Some.apply[String](e.getMessage())) })(ctx) |