1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2020 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.healthcheck
21 
22 import akka.util.Timeout
23 import org.apache.kafka.clients.admin.{AdminClient, DescribeClusterOptions}
24 import org.apache.kafka.clients.producer.ProducerConfig
25 import org.make.api.extensions.KafkaConfiguration
26 import org.make.api.technical.TimeSettings
27 import org.make.api.technical.healthcheck.HealthCheck.Status
28 
29 import java.util.Properties
30 import scala.concurrent.{ExecutionContext, Future}
31 import scala.util.{Failure, Success, Using}
32 
33 class KafkaHealthCheck(kafkaConfiguration: KafkaConfiguration) extends HealthCheck {
34 
35   override val techno: String = "kafka"
36 
37   val timeout: Timeout = TimeSettings.defaultTimeout
38 
39   private def createClient() = {
40     val properties = new Properties
41     properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfiguration.connectionString)
42     AdminClient.create(properties)
43   }
44 
45   override def healthCheck()(implicit ctx: ExecutionContext): Future[Status] = {
46     Future {
47       Using(createClient()) { client =>
48         client
49           .describeCluster(new DescribeClusterOptions().timeoutMs(kafkaConfiguration.pollTimeout.toInt))
50           .clusterId()
51           .get(timeout.duration.length, timeout.duration.unit)
52       } match {
53         case Success(_) => Status.OK
54         case Failure(e) => Status.NOK(Some(e.getMessage))
55       }
56     }
57   }
58 }
Line Stmt Id Pos Tree Symbol Tests Code
35 48367 1343 - 1350 Literal <nosymbol> org.scalatest.testsuite "kafka"
37 40497 1377 - 1404 Select org.make.api.technical.TimeSettings.defaultTimeout org.scalatest.testsuite org.make.api.technical.TimeSettings.defaultTimeout
40 32966 1460 - 1474 Apply java.util.Properties.<init> new java.util.Properties()
41 50001 1502 - 1541 Literal <nosymbol> "bootstrap.servers"
41 34016 1479 - 1579 Apply java.util.Properties.setProperty properties.setProperty("bootstrap.servers", KafkaHealthCheck.this.kafkaConfiguration.connectionString)
41 41593 1543 - 1578 Select org.make.api.extensions.KafkaConfiguration.connectionString KafkaHealthCheck.this.kafkaConfiguration.connectionString
42 46479 1584 - 1614 Apply org.apache.kafka.clients.admin.AdminClient.create org.apache.kafka.clients.admin.AdminClient.create(properties)
46 39504 1705 - 2084 ApplyToImplicitArgs scala.concurrent.Future.apply org.scalatest.testsuite scala.concurrent.Future.apply[org.make.api.technical.healthcheck.HealthCheck.Status](scala.util.Using.apply[org.apache.kafka.clients.admin.AdminClient, String](KafkaHealthCheck.this.createClient())(((client: org.apache.kafka.clients.admin.AdminClient) => client.describeCluster(new org.apache.kafka.clients.admin.DescribeClusterOptions().timeoutMs(scala.Predef.int2Integer(KafkaHealthCheck.this.kafkaConfiguration.pollTimeout.toInt))).clusterId().get(KafkaHealthCheck.this.timeout.duration.length, KafkaHealthCheck.this.timeout.duration.unit)))(Using.this.Releasable.AutoCloseableIsReleasable) match { case (value: String): scala.util.Success[String](_) => org.make.api.technical.healthcheck.HealthCheck.Status.OK case (exception: Throwable): scala.util.Failure[String]((e @ _)) => org.make.api.technical.healthcheck.HealthCheck.Status.NOK.apply(scala.Some.apply[String](e.getMessage())) })(ctx)