1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.extensions
21 
22 import akka.actor.typed.{Behavior, SupervisorStrategy}
23 import akka.actor.typed.scaladsl.Behaviors
24 import kamon.Kamon
25 import kamon.metric.Gauge
26 import org.make.api.technical.MonitorableExecutionContext
27 
28 import scala.concurrent.duration.DurationInt
29 
30 object ThreadPoolMonitoringActor {
31   def apply(): Behavior[Protocol] = {
32     Behaviors.supervise(monitorThreadPools()).onFailure(SupervisorStrategy.resume)
33   }
34 
35   @SuppressWarnings(Array("org.wartremover.warts.Recursion"))
36   private def monitorThreadPools(executors: Map[String, ExecutorWithGauges] = Map.empty): Behavior[Protocol] = {
37     Behaviors.withTimers { timers =>
38       timers.startTimerAtFixedRate(Monitor, 1.second)
39       Behaviors.receiveMessage {
40         case Monitor =>
41           executors.foreachEntry {
42             case (_, executorAndGauges) =>
43               val executor = executorAndGauges.executor
44               executorAndGauges.activeTasks.update(executor.activeTasks)
45               executorAndGauges.currentTasks.update(executor.currentTasks)
46               executorAndGauges.maxTasks.update(executor.maxTasks)
47               executorAndGauges.waitingTasks.update(executor.waitingTasks)
48           }
49           Behaviors.same
50         case MonitorThreadPool(newExecutor, name) =>
51           val executorWithGauges = ExecutorWithGauges(
52             newExecutor,
53             Kamon.gauge("executors-active-threads").withTag("name", name),
54             Kamon.gauge("executors-core-size").withTag("name", name),
55             Kamon.gauge("executors-max-threads").withTag("name", name),
56             Kamon.gauge("executors-waiting").withTag("name", name)
57           )
58           monitorThreadPools(executors + (name -> executorWithGauges))
59       }
60     }
61   }
62   sealed trait Protocol
63   case object Monitor extends Protocol
64   final case class MonitorThreadPool(pool: MonitorableExecutionContext, name: String) extends Protocol
65 
66   final case class ExecutorWithGauges(
67     executor: MonitorableExecutionContext,
68     activeTasks: Gauge,
69     currentTasks: Gauge,
70     maxTasks: Gauge,
71     waitingTasks: Gauge
72   )
73 
74   val name: String = "ThreadPoolMonitoringActor"
75 
76 }
Line Stmt Id Pos Tree Symbol Tests Code
32 50698 1155 - 1180 Select akka.actor.typed.SupervisorStrategy.resume akka.actor.typed.SupervisorStrategy.resume
32 45228 1123 - 1123 Select org.make.api.extensions.ThreadPoolMonitoringActor.monitorThreadPools$default$1 ThreadPoolMonitoringActor.this.monitorThreadPools$default$1
32 42288 1103 - 1181 ApplyToImplicitArgs akka.actor.typed.scaladsl.Behaviors.Supervise.onFailure akka.actor.typed.scaladsl.Behaviors.supervise[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol](ThreadPoolMonitoringActor.this.monitorThreadPools(ThreadPoolMonitoringActor.this.monitorThreadPools$default$1)).onFailure[Nothing](akka.actor.typed.SupervisorStrategy.resume)((ClassTag.Nothing: scala.reflect.ClassTag[Nothing]))
32 37377 1123 - 1143 Apply org.make.api.extensions.ThreadPoolMonitoringActor.monitorThreadPools ThreadPoolMonitoringActor.this.monitorThreadPools(ThreadPoolMonitoringActor.this.monitorThreadPools$default$1)
37 37906 1366 - 2484 Apply akka.actor.typed.scaladsl.Behaviors.withTimers akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol]) => { timers.startTimerAtFixedRate(ThreadPoolMonitoringActor.this.Monitor, scala.concurrent.duration.`package`.DurationInt(1).second); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol](((x0$1: org.make.api.extensions.ThreadPoolMonitoringActor.Protocol) => x0$1 match { case ThreadPoolMonitoringActor.this.Monitor => { executors.foreachEntry[kamon.metric.Gauge](((x0$2: String, x1$1: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges) => scala.Tuple2.apply[String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges): (String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges)(_, (executorAndGauges @ _)) => { val executor: org.make.api.technical.MonitorableExecutionContext = executorAndGauges.executor; executorAndGauges.activeTasks.update(executor.activeTasks.toDouble); executorAndGauges.currentTasks.update(executor.currentTasks.toDouble); executorAndGauges.maxTasks.update(executor.maxTasks.toDouble); executorAndGauges.waitingTasks.update(executor.waitingTasks.toDouble) } })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol] } case (pool: org.make.api.technical.MonitorableExecutionContext, name: String): org.make.api.extensions.ThreadPoolMonitoringActor.MonitorThreadPool((newExecutor @ _), (name @ _)) => { val executorWithGauges: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges = ThreadPoolMonitoringActor.this.ExecutorWithGauges.apply(newExecutor, kamon.Kamon.gauge("executors-active-threads").withTag("name", name), kamon.Kamon.gauge("executors-core-size").withTag("name", name), kamon.Kamon.gauge("executors-max-threads").withTag("name", name), kamon.Kamon.gauge("executors-waiting").withTag("name", name)); ThreadPoolMonitoringActor.this.monitorThreadPools(executors.+[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](scala.Predef.ArrowAssoc[String](name).->[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](executorWithGauges))) } })) }))
38 35302 1434 - 1441 Select org.make.api.extensions.ThreadPoolMonitoringActor.Monitor ThreadPoolMonitoringActor.this.Monitor
38 30852 1443 - 1444 Literal <nosymbol> 1
38 35801 1405 - 1452 Apply akka.actor.typed.scaladsl.TimerScheduler.startTimerAtFixedRate timers.startTimerAtFixedRate(ThreadPoolMonitoringActor.this.Monitor, scala.concurrent.duration.`package`.DurationInt(1).second)
38 43914 1443 - 1451 Select scala.concurrent.duration.DurationConversions.second scala.concurrent.duration.`package`.DurationInt(1).second
39 41790 1459 - 2478 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol](((x0$1: org.make.api.extensions.ThreadPoolMonitoringActor.Protocol) => x0$1 match { case ThreadPoolMonitoringActor.this.Monitor => { executors.foreachEntry[kamon.metric.Gauge](((x0$2: String, x1$1: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges) => scala.Tuple2.apply[String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges): (String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges)(_, (executorAndGauges @ _)) => { val executor: org.make.api.technical.MonitorableExecutionContext = executorAndGauges.executor; executorAndGauges.activeTasks.update(executor.activeTasks.toDouble); executorAndGauges.currentTasks.update(executor.currentTasks.toDouble); executorAndGauges.maxTasks.update(executor.maxTasks.toDouble); executorAndGauges.waitingTasks.update(executor.waitingTasks.toDouble) } })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol] } case (pool: org.make.api.technical.MonitorableExecutionContext, name: String): org.make.api.extensions.ThreadPoolMonitoringActor.MonitorThreadPool((newExecutor @ _), (name @ _)) => { val executorWithGauges: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges = ThreadPoolMonitoringActor.this.ExecutorWithGauges.apply(newExecutor, kamon.Kamon.gauge("executors-active-threads").withTag("name", name), kamon.Kamon.gauge("executors-core-size").withTag("name", name), kamon.Kamon.gauge("executors-max-threads").withTag("name", name), kamon.Kamon.gauge("executors-waiting").withTag("name", name)); ThreadPoolMonitoringActor.this.monitorThreadPools(executors.+[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](scala.Predef.ArrowAssoc[String](name).->[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](executorWithGauges))) } }))
41 48853 1520 - 1945 Apply scala.collection.MapOps.foreachEntry executors.foreachEntry[kamon.metric.Gauge](((x0$2: String, x1$1: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges) => scala.Tuple2.apply[String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges): (String, org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges)(_, (executorAndGauges @ _)) => { val executor: org.make.api.technical.MonitorableExecutionContext = executorAndGauges.executor; executorAndGauges.activeTasks.update(executor.activeTasks.toDouble); executorAndGauges.currentTasks.update(executor.currentTasks.toDouble); executorAndGauges.maxTasks.update(executor.maxTasks.toDouble); executorAndGauges.waitingTasks.update(executor.waitingTasks.toDouble) } }))
43 49095 1617 - 1643 Select org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges.executor executorAndGauges.executor
44 41285 1695 - 1715 Select scala.Int.toDouble executor.activeTasks.toDouble
44 37420 1658 - 1716 Apply kamon.metric.Gauge.update executorAndGauges.activeTasks.update(executor.activeTasks.toDouble)
45 51203 1769 - 1790 Select scala.Int.toDouble executor.currentTasks.toDouble
45 42329 1731 - 1791 Apply kamon.metric.Gauge.update executorAndGauges.currentTasks.update(executor.currentTasks.toDouble)
46 35506 1840 - 1857 Select scala.Int.toDouble executor.maxTasks.toDouble
46 30609 1806 - 1858 Apply kamon.metric.Gauge.update executorAndGauges.maxTasks.update(executor.maxTasks.toDouble)
47 36860 1873 - 1933 Apply kamon.metric.Gauge.update executorAndGauges.waitingTasks.update(executor.waitingTasks.toDouble)
47 43953 1911 - 1932 Select scala.Int.toDouble executor.waitingTasks.toDouble
49 42038 1956 - 1970 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.extensions.ThreadPoolMonitoringActor.Protocol]
51 31358 2059 - 2399 Apply org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges.apply ThreadPoolMonitoringActor.this.ExecutorWithGauges.apply(newExecutor, kamon.Kamon.gauge("executors-active-threads").withTag("name", name), kamon.Kamon.gauge("executors-core-size").withTag("name", name), kamon.Kamon.gauge("executors-max-threads").withTag("name", name), kamon.Kamon.gauge("executors-waiting").withTag("name", name))
53 37173 2116 - 2177 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("executors-active-threads").withTag("name", name)
54 51245 2191 - 2247 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("executors-core-size").withTag("name", name)
55 42360 2261 - 2319 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("executors-max-threads").withTag("name", name)
56 35255 2333 - 2387 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("executors-waiting").withTag("name", name)
58 44460 2442 - 2468 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String](name).->[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](executorWithGauges)
58 49640 2410 - 2470 Apply org.make.api.extensions.ThreadPoolMonitoringActor.monitorThreadPools ThreadPoolMonitoringActor.this.monitorThreadPools(executors.+[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](scala.Predef.ArrowAssoc[String](name).->[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](executorWithGauges)))
58 36900 2429 - 2469 Apply scala.collection.immutable.MapOps.+ executors.+[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](scala.Predef.ArrowAssoc[String](name).->[org.make.api.extensions.ThreadPoolMonitoringActor.ExecutorWithGauges](executorWithGauges))
74 51283 2858 - 2885 Literal <nosymbol> "ThreadPoolMonitoringActor"