1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.actor.typed.scaladsl.AskPattern.Askable
23 import akka.actor.typed.scaladsl.Behaviors
24 import akka.actor.typed.{Behavior, Scheduler}
25 import akka.cluster.sharding.ShardRegion.ClusterShardingStats
26 import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
27 import akka.cluster.sharding.typed.{scaladsl, GetClusterShardingStats}
28 import akka.cluster.typed.Cluster
29 import akka.pattern.AskTimeoutException
30 import akka.util.Timeout
31 import com.typesafe.config.Config
32 import kamon.Kamon
33 import kamon.metric.Gauge
34 
35 import scala.concurrent.duration.{Duration, FiniteDuration}
36 import scala.jdk.CollectionConverters._
37 import scala.util.{Failure, Success}
38 
39 object ClusterShardingMonitor {
40   trait ClusterShardingMonitorConfiguration {
41     def refreshInterval: FiniteDuration
42     def statsTimeout: FiniteDuration
43     def shardedRegions: Seq[String]
44   }
45 
46   object ClusterShardingMonitorConfiguration {
47     def apply(config: Config): ClusterShardingMonitorConfiguration = {
48       new ClusterShardingMonitorConfiguration {
49         override val refreshInterval: FiniteDuration = {
50           Duration(
51             config.getDuration("kamon.cluster-sharding.refresh-interval").toMillis,
52             scala.concurrent.duration.MILLISECONDS
53           )
54         }
55 
56         override val statsTimeout: FiniteDuration = {
57           Duration(
58             config.getDuration("kamon.cluster-sharding.stats-timeout").toMillis,
59             scala.concurrent.duration.MILLISECONDS
60           )
61         }
62 
63         override val shardedRegions: Seq[String] = {
64           config.getStringList("kamon.cluster-sharding.regions").asScala.toSeq
65         }
66       }
67     }
68   }
69 
70   def apply(): Behavior[Protocol] = {
71     ActorSystemHelper.superviseWithBackoff(monitorClusterSharding())
72   }
73 
74   private def monitorClusterSharding(): Behavior[Protocol] = {
75     Behaviors.setup { context =>
76       val configuration = ClusterShardingMonitorConfiguration(context.system.settings.config)
77       val gauges: Map[String, ShardingGauges] = configuration.shardedRegions.map { region =>
78         region -> new ShardingGauges(region)
79       }.toMap
80 
81       implicit val timeout: Timeout = Timeout(configuration.statsTimeout * 2)
82       implicit val scheduler: Scheduler = context.system.scheduler
83 
84       Behaviors.withTimers { timers =>
85         timers.startTimerAtFixedRate(Monitor, configuration.refreshInterval)
86 
87         Behaviors.receiveMessage {
88           case Monitor =>
89             val shardingInfo = scaladsl.ClusterSharding(context.system)
90             gauges.foreachEntry {
91               case (region, regionGauges) =>
92                 val statsFuture = shardingInfo.shardState
93                   .ask(GetClusterShardingStats(EntityTypeKey(region), configuration.statsTimeout, _))
94 
95                 context.pipeToSelf(statsFuture) {
96                   case Success(stats) => StatsSuccess(regionGauges, stats)
97                   case Failure(e)     => StatsFailure(region, e)
98                 }
99             }
100             Behaviors.same
101           case StatsSuccess(regionGauges, stats) =>
102             val allProposals = stats.regions.values.map(_.stats.values.sum).sum
103             regionGauges.totalActorsCount.update(allProposals)
104 
105             val maybeRegion = stats.regions.get(Cluster(context.system).selfMember.address)
106             maybeRegion.foreach { shardStats =>
107               regionGauges.nodeActorsCount.update(shardStats.stats.values.sum)
108               regionGauges.nodeShardsCount.update(shardStats.stats.size)
109             }
110             Behaviors.same
111           case StatsFailure(region, e: AskTimeoutException) if e.getMessage.contains("terminated") =>
112             context.log.warn(s"Unable to retrieve stats for $region due to terminated actor: ${e.getMessage}")
113             Behaviors.same
114           case StatsFailure(region, e) =>
115             context.log.error(s"Unable to retrieve stats for $region:", e)
116             Behaviors.same
117 
118         }
119       }
120     }
121   }
122 
123   sealed trait Protocol
124   case object Monitor extends Protocol
125   final case class StatsSuccess(gauges: ShardingGauges, stats: ClusterShardingStats) extends Protocol
126   final case class StatsFailure(region: String, throwable: Throwable) extends Protocol
127 
128   val name: String = "actor-sharding-monitor"
129 
130   class ShardingGauges(name: String) {
131     val totalActorsCount: Gauge = Kamon.gauge("sharding-total-actors").withTag("region", name)
132     val nodeActorsCount: Gauge = Kamon.gauge("sharding-node-actors").withTag("region", name)
133     val nodeShardsCount: Gauge = Kamon.gauge("sharding-node-shards").withTag("region", name)
134   }
135 }
Line Stmt Id Pos Tree Symbol Tests Code
48 39192 1746 - 1749 Apply org.make.api.technical.ClusterShardingMonitor.ClusterShardingMonitorConfiguration.$anon.<init> new $anon()
50 35785 1855 - 2011 Apply scala.concurrent.duration.Duration.apply scala.concurrent.duration.Duration.apply(config.getDuration("kamon.cluster-sharding.refresh-interval").toMillis(), MILLISECONDS)
51 31624 1877 - 1947 Apply java.time.Duration.toMillis config.getDuration("kamon.cluster-sharding.refresh-interval").toMillis()
52 43632 1961 - 1999 Literal <nosymbol> MILLISECONDS
57 38164 2087 - 2240 Apply scala.concurrent.duration.Duration.apply scala.concurrent.duration.Duration.apply(config.getDuration("kamon.cluster-sharding.stats-timeout").toMillis(), MILLISECONDS)
58 32639 2109 - 2176 Apply java.time.Duration.toMillis config.getDuration("kamon.cluster-sharding.stats-timeout").toMillis()
59 45251 2190 - 2228 Literal <nosymbol> MILLISECONDS
64 50174 2315 - 2369 Apply com.typesafe.config.Config.getStringList config.getStringList("kamon.cluster-sharding.regions")
64 43077 2315 - 2383 Select scala.collection.IterableOnceOps.toSeq scala.jdk.CollectionConverters.ListHasAsScala[String](config.getStringList("kamon.cluster-sharding.regions")).asScala.toSeq
71 31659 2494 - 2518 Apply org.make.api.technical.ClusterShardingMonitor.monitorClusterSharding ClusterShardingMonitor.this.monitorClusterSharding()
71 44689 2455 - 2519 Apply org.make.api.technical.ActorSystemHelper.superviseWithBackoff ActorSystemHelper.superviseWithBackoff[org.make.api.technical.ClusterShardingMonitor.Protocol](ClusterShardingMonitor.this.monitorClusterSharding())
75 31701 2592 - 4690 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.technical.ClusterShardingMonitor.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.ClusterShardingMonitor.Protocol]) => { val configuration: org.make.api.technical.ClusterShardingMonitor.ClusterShardingMonitorConfiguration = ClusterShardingMonitor.this.ClusterShardingMonitorConfiguration.apply(context.system.settings.config); val gauges: Map[String,org.make.api.technical.ClusterShardingMonitor.ShardingGauges] = configuration.shardedRegions.map[(String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)](((region: String) => scala.Predef.ArrowAssoc[String](region).->[org.make.api.technical.ClusterShardingMonitor.ShardingGauges](new ClusterShardingMonitor.this.ShardingGauges(region)))).toMap[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](scala.this.<:<.refl[(String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)]); implicit val timeout: akka.util.Timeout = akka.util.Timeout.apply(configuration.statsTimeout.*(2L)); implicit val scheduler: akka.actor.typed.Scheduler = context.system.scheduler; akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.ClusterShardingMonitor.Protocol](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.ClusterShardingMonitor.Protocol]) => { timers.startTimerAtFixedRate(ClusterShardingMonitor.this.Monitor, configuration.refreshInterval); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.ClusterShardingMonitor.Protocol](((x0$1: org.make.api.technical.ClusterShardingMonitor.Protocol) => x0$1 match { case ClusterShardingMonitor.this.Monitor => { val shardingInfo: akka.cluster.sharding.typed.scaladsl.ClusterSharding = akka.cluster.sharding.typed.scaladsl.ClusterSharding.apply(context.system); gauges.foreachEntry[Unit](((x0$2: String, x1$1: org.make.api.technical.ClusterShardingMonitor.ShardingGauges) => scala.Tuple2.apply[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.technical.ClusterShardingMonitor.ShardingGauges): (String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)((region @ _), (regionGauges @ _)) => { val statsFuture: scala.concurrent.Future[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = akka.actor.typed.scaladsl.AskPattern.Askable[akka.cluster.sharding.typed.ClusterShardingQuery](shardingInfo.shardState).ask[akka.cluster.sharding.ShardRegion.ClusterShardingStats](((x$1: akka.actor.typed.ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => akka.cluster.sharding.typed.GetClusterShardingStats.apply(akka.cluster.sharding.typed.scaladsl.EntityTypeKey.apply[Any](region)((ClassTag.Any: scala.reflect.ClassTag[Any])), configuration.statsTimeout, x$1)))(timeout, scheduler); context.pipeToSelf[akka.cluster.sharding.ShardRegion.ClusterShardingStats](statsFuture)(((x0$3: scala.util.Try[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => x0$3 match { case (value: akka.cluster.sharding.ShardRegion.ClusterShardingStats): scala.util.Success[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((stats @ _)) => ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats) case (exception: Throwable): scala.util.Failure[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((e @ _)) => ClusterShardingMonitor.this.StatsFailure.apply(region, e) })) } })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (gauges: org.make.api.technical.ClusterShardingMonitor.ShardingGauges, stats: akka.cluster.sharding.ShardRegion.ClusterShardingStats): org.make.api.technical.ClusterShardingMonitor.StatsSuccess((regionGauges @ _), (stats @ _)) => { val allProposals: Int = stats.regions.values.map[Int](((x$2: akka.cluster.sharding.ShardRegion.ShardRegionStats) => x$2.stats.values.sum[Int](math.this.Numeric.IntIsIntegral))).sum[Int](math.this.Numeric.IntIsIntegral); regionGauges.totalActorsCount.update(allProposals.toDouble); val maybeRegion: Option[akka.cluster.sharding.ShardRegion.ShardRegionStats] = stats.regions.get(akka.cluster.typed.Cluster.apply(context.system).selfMember.address); maybeRegion.foreach[kamon.metric.Gauge](((shardStats: akka.cluster.sharding.ShardRegion.ShardRegionStats) => { regionGauges.nodeActorsCount.update(shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble); regionGauges.nodeShardsCount.update(shardStats.stats.size.toDouble) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ (_: akka.pattern.AskTimeoutException))) if e.getMessage().contains("terminated") => { context.log.warn(("Unable to retrieve stats for ".+(region).+(" due to terminated actor: ").+(e.getMessage()): String)); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ _)) => { context.log.error(("Unable to retrieve stats for ".+(region).+(":"): String), e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } })) })) }))
76 32395 2647 - 2714 Apply org.make.api.technical.ClusterShardingMonitor.ClusterShardingMonitorConfiguration.apply ClusterShardingMonitor.this.ClusterShardingMonitorConfiguration.apply(context.system.settings.config)
76 36596 2683 - 2713 Select akka.actor.typed.Settings.config context.system.settings.config
78 45167 2826 - 2852 Apply org.make.api.technical.ClusterShardingMonitor.ShardingGauges.<init> new ClusterShardingMonitor.this.ShardingGauges(region)
78 38202 2816 - 2852 Apply scala.Predef.ArrowAssoc.-> scala.Predef.ArrowAssoc[String](region).->[org.make.api.technical.ClusterShardingMonitor.ShardingGauges](new ClusterShardingMonitor.this.ShardingGauges(region))
79 51234 2861 - 2861 TypeApply scala.<:<.refl scala.this.<:<.refl[(String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)]
79 43117 2763 - 2866 ApplyToImplicitArgs scala.collection.IterableOnceOps.toMap configuration.shardedRegions.map[(String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)](((region: String) => scala.Predef.ArrowAssoc[String](region).->[org.make.api.technical.ClusterShardingMonitor.ShardingGauges](new ClusterShardingMonitor.this.ShardingGauges(region)))).toMap[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](scala.this.<:<.refl[(String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)])
81 39229 2914 - 2944 Apply scala.concurrent.duration.FiniteDuration.* configuration.statsTimeout.*(2L)
81 30791 2906 - 2945 Apply akka.util.Timeout.apply akka.util.Timeout.apply(configuration.statsTimeout.*(2L))
82 44727 2988 - 3012 Select akka.actor.typed.ActorSystem.scheduler context.system.scheduler
84 35275 3020 - 4684 Apply akka.actor.typed.scaladsl.Behaviors.withTimers akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.ClusterShardingMonitor.Protocol](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.ClusterShardingMonitor.Protocol]) => { timers.startTimerAtFixedRate(ClusterShardingMonitor.this.Monitor, configuration.refreshInterval); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.ClusterShardingMonitor.Protocol](((x0$1: org.make.api.technical.ClusterShardingMonitor.Protocol) => x0$1 match { case ClusterShardingMonitor.this.Monitor => { val shardingInfo: akka.cluster.sharding.typed.scaladsl.ClusterSharding = akka.cluster.sharding.typed.scaladsl.ClusterSharding.apply(context.system); gauges.foreachEntry[Unit](((x0$2: String, x1$1: org.make.api.technical.ClusterShardingMonitor.ShardingGauges) => scala.Tuple2.apply[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.technical.ClusterShardingMonitor.ShardingGauges): (String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)((region @ _), (regionGauges @ _)) => { val statsFuture: scala.concurrent.Future[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = akka.actor.typed.scaladsl.AskPattern.Askable[akka.cluster.sharding.typed.ClusterShardingQuery](shardingInfo.shardState).ask[akka.cluster.sharding.ShardRegion.ClusterShardingStats](((x$1: akka.actor.typed.ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => akka.cluster.sharding.typed.GetClusterShardingStats.apply(akka.cluster.sharding.typed.scaladsl.EntityTypeKey.apply[Any](region)((ClassTag.Any: scala.reflect.ClassTag[Any])), configuration.statsTimeout, x$1)))(timeout, scheduler); context.pipeToSelf[akka.cluster.sharding.ShardRegion.ClusterShardingStats](statsFuture)(((x0$3: scala.util.Try[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => x0$3 match { case (value: akka.cluster.sharding.ShardRegion.ClusterShardingStats): scala.util.Success[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((stats @ _)) => ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats) case (exception: Throwable): scala.util.Failure[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((e @ _)) => ClusterShardingMonitor.this.StatsFailure.apply(region, e) })) } })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (gauges: org.make.api.technical.ClusterShardingMonitor.ShardingGauges, stats: akka.cluster.sharding.ShardRegion.ClusterShardingStats): org.make.api.technical.ClusterShardingMonitor.StatsSuccess((regionGauges @ _), (stats @ _)) => { val allProposals: Int = stats.regions.values.map[Int](((x$2: akka.cluster.sharding.ShardRegion.ShardRegionStats) => x$2.stats.values.sum[Int](math.this.Numeric.IntIsIntegral))).sum[Int](math.this.Numeric.IntIsIntegral); regionGauges.totalActorsCount.update(allProposals.toDouble); val maybeRegion: Option[akka.cluster.sharding.ShardRegion.ShardRegionStats] = stats.regions.get(akka.cluster.typed.Cluster.apply(context.system).selfMember.address); maybeRegion.foreach[kamon.metric.Gauge](((shardStats: akka.cluster.sharding.ShardRegion.ShardRegionStats) => { regionGauges.nodeActorsCount.update(shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble); regionGauges.nodeShardsCount.update(shardStats.stats.size.toDouble) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ (_: akka.pattern.AskTimeoutException))) if e.getMessage().contains("terminated") => { context.log.warn(("Unable to retrieve stats for ".+(region).+(" due to terminated actor: ").+(e.getMessage()): String)); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ _)) => { context.log.error(("Unable to retrieve stats for ".+(region).+(":"): String), e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } })) }))
85 49634 3099 - 3128 Select org.make.api.technical.ClusterShardingMonitor.ClusterShardingMonitorConfiguration.refreshInterval configuration.refreshInterval
85 45204 3061 - 3129 Apply akka.actor.typed.scaladsl.TimerScheduler.startTimerAtFixedRate timers.startTimerAtFixedRate(ClusterShardingMonitor.this.Monitor, configuration.refreshInterval)
85 36360 3090 - 3097 Select org.make.api.technical.ClusterShardingMonitor.Monitor ClusterShardingMonitor.this.Monitor
87 43396 3139 - 4676 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.ClusterShardingMonitor.Protocol](((x0$1: org.make.api.technical.ClusterShardingMonitor.Protocol) => x0$1 match { case ClusterShardingMonitor.this.Monitor => { val shardingInfo: akka.cluster.sharding.typed.scaladsl.ClusterSharding = akka.cluster.sharding.typed.scaladsl.ClusterSharding.apply(context.system); gauges.foreachEntry[Unit](((x0$2: String, x1$1: org.make.api.technical.ClusterShardingMonitor.ShardingGauges) => scala.Tuple2.apply[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.technical.ClusterShardingMonitor.ShardingGauges): (String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)((region @ _), (regionGauges @ _)) => { val statsFuture: scala.concurrent.Future[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = akka.actor.typed.scaladsl.AskPattern.Askable[akka.cluster.sharding.typed.ClusterShardingQuery](shardingInfo.shardState).ask[akka.cluster.sharding.ShardRegion.ClusterShardingStats](((x$1: akka.actor.typed.ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => akka.cluster.sharding.typed.GetClusterShardingStats.apply(akka.cluster.sharding.typed.scaladsl.EntityTypeKey.apply[Any](region)((ClassTag.Any: scala.reflect.ClassTag[Any])), configuration.statsTimeout, x$1)))(timeout, scheduler); context.pipeToSelf[akka.cluster.sharding.ShardRegion.ClusterShardingStats](statsFuture)(((x0$3: scala.util.Try[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => x0$3 match { case (value: akka.cluster.sharding.ShardRegion.ClusterShardingStats): scala.util.Success[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((stats @ _)) => ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats) case (exception: Throwable): scala.util.Failure[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((e @ _)) => ClusterShardingMonitor.this.StatsFailure.apply(region, e) })) } })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (gauges: org.make.api.technical.ClusterShardingMonitor.ShardingGauges, stats: akka.cluster.sharding.ShardRegion.ClusterShardingStats): org.make.api.technical.ClusterShardingMonitor.StatsSuccess((regionGauges @ _), (stats @ _)) => { val allProposals: Int = stats.regions.values.map[Int](((x$2: akka.cluster.sharding.ShardRegion.ShardRegionStats) => x$2.stats.values.sum[Int](math.this.Numeric.IntIsIntegral))).sum[Int](math.this.Numeric.IntIsIntegral); regionGauges.totalActorsCount.update(allProposals.toDouble); val maybeRegion: Option[akka.cluster.sharding.ShardRegion.ShardRegionStats] = stats.regions.get(akka.cluster.typed.Cluster.apply(context.system).selfMember.address); maybeRegion.foreach[kamon.metric.Gauge](((shardStats: akka.cluster.sharding.ShardRegion.ShardRegionStats) => { regionGauges.nodeActorsCount.update(shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble); regionGauges.nodeShardsCount.update(shardStats.stats.size.toDouble) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ (_: akka.pattern.AskTimeoutException))) if e.getMessage().contains("terminated") => { context.log.warn(("Unable to retrieve stats for ".+(region).+(" due to terminated actor: ").+(e.getMessage()): String)); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } case (region: String, throwable: Throwable): org.make.api.technical.ClusterShardingMonitor.StatsFailure((region @ _), (e @ _)) => { context.log.error(("Unable to retrieve stats for ".+(region).+(":"): String), e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol] } }))
89 50734 3223 - 3263 Apply akka.actor.typed.ExtensionId.apply akka.cluster.sharding.typed.scaladsl.ClusterSharding.apply(context.system)
89 37359 3248 - 3262 Select akka.actor.typed.scaladsl.ActorContext.system context.system
90 36393 3276 - 3725 Apply scala.collection.MapOps.foreachEntry gauges.foreachEntry[Unit](((x0$2: String, x1$1: org.make.api.technical.ClusterShardingMonitor.ShardingGauges) => scala.Tuple2.apply[String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges](x0$2, x1$1) match { case (_1: String, _2: org.make.api.technical.ClusterShardingMonitor.ShardingGauges): (String, org.make.api.technical.ClusterShardingMonitor.ShardingGauges)((region @ _), (regionGauges @ _)) => { val statsFuture: scala.concurrent.Future[akka.cluster.sharding.ShardRegion.ClusterShardingStats] = akka.actor.typed.scaladsl.AskPattern.Askable[akka.cluster.sharding.typed.ClusterShardingQuery](shardingInfo.shardState).ask[akka.cluster.sharding.ShardRegion.ClusterShardingStats](((x$1: akka.actor.typed.ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => akka.cluster.sharding.typed.GetClusterShardingStats.apply(akka.cluster.sharding.typed.scaladsl.EntityTypeKey.apply[Any](region)((ClassTag.Any: scala.reflect.ClassTag[Any])), configuration.statsTimeout, x$1)))(timeout, scheduler); context.pipeToSelf[akka.cluster.sharding.ShardRegion.ClusterShardingStats](statsFuture)(((x0$3: scala.util.Try[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => x0$3 match { case (value: akka.cluster.sharding.ShardRegion.ClusterShardingStats): scala.util.Success[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((stats @ _)) => ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats) case (exception: Throwable): scala.util.Failure[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((e @ _)) => ClusterShardingMonitor.this.StatsFailure.apply(region, e) })) } }))
93 42865 3377 - 3502 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.Askable.ask akka.actor.typed.scaladsl.AskPattern.Askable[akka.cluster.sharding.typed.ClusterShardingQuery](shardingInfo.shardState).ask[akka.cluster.sharding.ShardRegion.ClusterShardingStats](((x$1: akka.actor.typed.ActorRef[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => akka.cluster.sharding.typed.GetClusterShardingStats.apply(akka.cluster.sharding.typed.scaladsl.EntityTypeKey.apply[Any](region)((ClassTag.Any: scala.reflect.ClassTag[Any])), configuration.statsTimeout, x$1)))(timeout, scheduler)
95 43892 3520 - 3711 Apply akka.actor.typed.scaladsl.ActorContext.pipeToSelf context.pipeToSelf[akka.cluster.sharding.ShardRegion.ClusterShardingStats](statsFuture)(((x0$3: scala.util.Try[akka.cluster.sharding.ShardRegion.ClusterShardingStats]) => x0$3 match { case (value: akka.cluster.sharding.ShardRegion.ClusterShardingStats): scala.util.Success[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((stats @ _)) => ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats) case (exception: Throwable): scala.util.Failure[akka.cluster.sharding.ShardRegion.ClusterShardingStats]((e @ _)) => ClusterShardingMonitor.this.StatsFailure.apply(region, e) }))
96 38436 3595 - 3628 Apply org.make.api.technical.ClusterShardingMonitor.StatsSuccess.apply ClusterShardingMonitor.this.StatsSuccess.apply(regionGauges, stats)
97 30829 3670 - 3693 Apply org.make.api.technical.ClusterShardingMonitor.StatsFailure.apply ClusterShardingMonitor.this.StatsFailure.apply(region, e)
100 49670 3738 - 3752 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol]
102 44960 3876 - 3876 Select scala.math.Numeric.IntIsIntegral math.this.Numeric.IntIsIntegral
102 37393 3861 - 3879 ApplyToImplicitArgs scala.collection.IterableOnceOps.sum x$2.stats.values.sum[Int](math.this.Numeric.IntIsIntegral)
102 42904 3836 - 3884 ApplyToImplicitArgs scala.collection.IterableOnceOps.sum stats.regions.values.map[Int](((x$2: akka.cluster.sharding.ShardRegion.ShardRegionStats) => x$2.stats.values.sum[Int](math.this.Numeric.IntIsIntegral))).sum[Int](math.this.Numeric.IntIsIntegral)
102 51185 3881 - 3881 Select scala.math.Numeric.IntIsIntegral math.this.Numeric.IntIsIntegral
103 38473 3934 - 3946 Select scala.Int.toDouble allProposals.toDouble
103 30583 3897 - 3947 Apply kamon.metric.Gauge.update regionGauges.totalActorsCount.update(allProposals.toDouble)
105 44681 4005 - 4019 Select akka.actor.typed.scaladsl.ActorContext.system context.system
105 35816 3997 - 4039 Select akka.cluster.Member.address akka.cluster.typed.Cluster.apply(context.system).selfMember.address
105 48896 3979 - 4040 Apply scala.collection.MapOps.get stats.regions.get(akka.cluster.typed.Cluster.apply(context.system).selfMember.address)
106 30623 4053 - 4254 Apply scala.Option.foreach maybeRegion.foreach[kamon.metric.Gauge](((shardStats: akka.cluster.sharding.ShardRegion.ShardRegionStats) => { regionGauges.nodeActorsCount.update(shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble); regionGauges.nodeShardsCount.update(shardStats.stats.size.toDouble) }))
107 51223 4103 - 4167 Apply kamon.metric.Gauge.update regionGauges.nodeActorsCount.update(shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble)
107 44996 4163 - 4163 Select scala.math.Numeric.IntIsIntegral math.this.Numeric.IntIsIntegral
107 37155 4139 - 4166 Select scala.Int.toDouble shardStats.stats.values.sum[Int](math.this.Numeric.IntIsIntegral).toDouble
108 34544 4182 - 4240 Apply kamon.metric.Gauge.update regionGauges.nodeShardsCount.update(shardStats.stats.size.toDouble)
108 42342 4218 - 4239 Select scala.Int.toDouble shardStats.stats.size.toDouble
110 44444 4267 - 4281 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol]
111 36876 4345 - 4380 Apply java.lang.String.contains e.getMessage().contains("terminated")
112 49624 4396 - 4494 Apply org.slf4j.Logger.warn context.log.warn(("Unable to retrieve stats for ".+(region).+(" due to terminated actor: ").+(e.getMessage()): String))
113 46070 4507 - 4521 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol]
115 37949 4576 - 4638 Apply org.slf4j.Logger.error context.log.error(("Unable to retrieve stats for ".+(region).+(":"): String), e)
116 51261 4651 - 4665 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.ClusterShardingMonitor.Protocol]
128 44478 4970 - 4994 Literal <nosymbol> "actor-sharding-monitor"
131 45507 5124 - 5128 Select org.make.api.technical.ClusterShardingMonitor.ShardingGauges.name ShardingGauges.this.name
131 37988 5069 - 5129 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("sharding-total-actors").withTag("region", ShardingGauges.this.name)
131 49381 5114 - 5122 Literal <nosymbol> "region"
131 36910 5081 - 5104 Literal <nosymbol> "sharding-total-actors"
132 35030 5217 - 5221 Select org.make.api.technical.ClusterShardingMonitor.ShardingGauges.name ShardingGauges.this.name
132 51021 5175 - 5197 Literal <nosymbol> "sharding-node-actors"
132 42895 5207 - 5215 Literal <nosymbol> "region"
132 30573 5163 - 5222 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("sharding-node-actors").withTag("region", ShardingGauges.this.name)
133 49422 5310 - 5314 Select org.make.api.technical.ClusterShardingMonitor.ShardingGauges.name ShardingGauges.this.name
133 36676 5300 - 5308 Literal <nosymbol> "region"
133 41564 5256 - 5315 Apply kamon.metric.Tagging.withTag kamon.Kamon.gauge("sharding-node-shards").withTag("region", ShardingGauges.this.name)
133 44518 5268 - 5290 Literal <nosymbol> "sharding-node-shards"