1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.actor.Address
23 import akka.actor.typed.Behavior
24 import akka.actor.typed.scaladsl.Behaviors
25 import akka.actor.typed.scaladsl.adapter.TypedActorSystemOps
26 import akka.cluster.Cluster
27 import org.make.constructr.coordination.Coordination
28 
29 import scala.concurrent.duration.DurationInt
30 import scala.util.{Failure, Success}
31 
32 object MakeDowningActor {
33   val name: String = "MakeDowningActor"
34 
35   sealed trait Protocol
36   case object AutoDown extends Protocol
37   final case class MembersReceived(members: Set[Address]) extends Protocol
38   final case class CoordinationError(e: Throwable) extends Protocol
39 
40   def apply(): Behavior[Protocol] = {
41     Behaviors.setup { context =>
42       val coordination = Coordination(context.system.name, context.system.toClassic)
43       Behaviors.withTimers { timers =>
44         timers.startTimerAtFixedRate(AutoDown, 10.seconds)
45 
46         Behaviors.receiveMessage {
47           case AutoDown =>
48             context.pipeToSelf(coordination.getNodes()) {
49               case Success(nodes) => MembersReceived(nodes)
50               case Failure(e)     => CoordinationError(e)
51             }
52             Behaviors.same
53           case MembersReceived(nodes) =>
54             val cluster = Cluster(context.system)
55             val members = cluster.state.members
56             members.foreach { member =>
57               if (!nodes.contains(member.uniqueAddress.address)) {
58                 context.log.warn(
59                   s"Downing node ${member.uniqueAddress.address.toString} since it is no longer present in coordination"
60                 )
61                 cluster.down(member.uniqueAddress.address)
62               }
63             }
64             Behaviors.same
65           case CoordinationError(e) =>
66             context.log.error("Error while retrieving nodes", e)
67             Behaviors.same
68         }
69       }
70     }
71   }
72 
73 }
Line Stmt Id Pos Tree Symbol Tests Code
33 42849 1152 - 1170 Literal <nosymbol> "MakeDowningActor"
41 36625 1422 - 2603 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.technical.MakeDowningActor.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.MakeDowningActor.Protocol]) => { val coordination: org.make.constructr.coordination.Coordination = org.make.constructr.coordination.Coordination.apply(context.system.name, akka.actor.typed.scaladsl.adapter.`package`.TypedActorSystemOps(context.system).toClassic); akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.MakeDowningActor.Protocol](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.MakeDowningActor.Protocol]) => { timers.startTimerAtFixedRate(MakeDowningActor.this.AutoDown, scala.concurrent.duration.`package`.DurationInt(10).seconds); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.MakeDowningActor.Protocol](((x0$1: org.make.api.technical.MakeDowningActor.Protocol) => x0$1 match { case MakeDowningActor.this.AutoDown => { context.pipeToSelf[Set[akka.actor.Address]](coordination.getNodes())(((x0$2: scala.util.Try[Set[akka.actor.Address]]) => x0$2 match { case (value: Set[akka.actor.Address]): scala.util.Success[Set[akka.actor.Address]]((nodes @ _)) => MakeDowningActor.this.MembersReceived.apply(nodes) case (exception: Throwable): scala.util.Failure[Set[akka.actor.Address]]((e @ _)) => MakeDowningActor.this.CoordinationError.apply(e) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (members: Set[akka.actor.Address]): org.make.api.technical.MakeDowningActor.MembersReceived((nodes @ _)) => { val cluster: akka.cluster.Cluster = akka.cluster.Cluster.apply(context.system); val members: scala.collection.immutable.SortedSet[akka.cluster.Member] = cluster.state.members; members.foreach[Unit](((member: akka.cluster.Member) => if (nodes.contains(member.uniqueAddress.address).unary_!) { context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String)); cluster.down(member.uniqueAddress.address) } else ())); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (e: Throwable): org.make.api.technical.MakeDowningActor.CoordinationError((e @ _)) => { context.log.error("Error while retrieving nodes", e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } })) })) }))
42 34986 1489 - 1508 Select akka.actor.typed.ActorSystem.name context.system.name
42 36786 1476 - 1535 Apply org.make.constructr.coordination.Coordination.apply org.make.constructr.coordination.Coordination.apply(context.system.name, akka.actor.typed.scaladsl.adapter.`package`.TypedActorSystemOps(context.system).toClassic)
42 43881 1510 - 1534 Select akka.actor.typed.scaladsl.adapter.TypedActorSystemOps.toClassic akka.actor.typed.scaladsl.adapter.`package`.TypedActorSystemOps(context.system).toClassic
42 47757 1510 - 1524 Select akka.actor.typed.scaladsl.ActorContext.system context.system
43 40483 1542 - 2597 Apply akka.actor.typed.scaladsl.Behaviors.withTimers akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.MakeDowningActor.Protocol](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.MakeDowningActor.Protocol]) => { timers.startTimerAtFixedRate(MakeDowningActor.this.AutoDown, scala.concurrent.duration.`package`.DurationInt(10).seconds); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.MakeDowningActor.Protocol](((x0$1: org.make.api.technical.MakeDowningActor.Protocol) => x0$1 match { case MakeDowningActor.this.AutoDown => { context.pipeToSelf[Set[akka.actor.Address]](coordination.getNodes())(((x0$2: scala.util.Try[Set[akka.actor.Address]]) => x0$2 match { case (value: Set[akka.actor.Address]): scala.util.Success[Set[akka.actor.Address]]((nodes @ _)) => MakeDowningActor.this.MembersReceived.apply(nodes) case (exception: Throwable): scala.util.Failure[Set[akka.actor.Address]]((e @ _)) => MakeDowningActor.this.CoordinationError.apply(e) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (members: Set[akka.actor.Address]): org.make.api.technical.MakeDowningActor.MembersReceived((nodes @ _)) => { val cluster: akka.cluster.Cluster = akka.cluster.Cluster.apply(context.system); val members: scala.collection.immutable.SortedSet[akka.cluster.Member] = cluster.state.members; members.foreach[Unit](((member: akka.cluster.Member) => if (nodes.contains(member.uniqueAddress.address).unary_!) { context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String)); cluster.down(member.uniqueAddress.address) } else ())); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (e: Throwable): org.make.api.technical.MakeDowningActor.CoordinationError((e @ _)) => { context.log.error("Error while retrieving nodes", e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } })) }))
44 50415 1583 - 1633 Apply akka.actor.typed.scaladsl.TimerScheduler.startTimerAtFixedRate timers.startTimerAtFixedRate(MakeDowningActor.this.AutoDown, scala.concurrent.duration.`package`.DurationInt(10).seconds)
44 40987 1622 - 1624 Literal <nosymbol> 10
44 49375 1612 - 1620 Select org.make.api.technical.MakeDowningActor.AutoDown MakeDowningActor.this.AutoDown
44 33396 1622 - 1632 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(10).seconds
46 48292 1643 - 2589 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.MakeDowningActor.Protocol](((x0$1: org.make.api.technical.MakeDowningActor.Protocol) => x0$1 match { case MakeDowningActor.this.AutoDown => { context.pipeToSelf[Set[akka.actor.Address]](coordination.getNodes())(((x0$2: scala.util.Try[Set[akka.actor.Address]]) => x0$2 match { case (value: Set[akka.actor.Address]): scala.util.Success[Set[akka.actor.Address]]((nodes @ _)) => MakeDowningActor.this.MembersReceived.apply(nodes) case (exception: Throwable): scala.util.Failure[Set[akka.actor.Address]]((e @ _)) => MakeDowningActor.this.CoordinationError.apply(e) })); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (members: Set[akka.actor.Address]): org.make.api.technical.MakeDowningActor.MembersReceived((nodes @ _)) => { val cluster: akka.cluster.Cluster = akka.cluster.Cluster.apply(context.system); val members: scala.collection.immutable.SortedSet[akka.cluster.Member] = cluster.state.members; members.foreach[Unit](((member: akka.cluster.Member) => if (nodes.contains(member.uniqueAddress.address).unary_!) { context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String)); cluster.down(member.uniqueAddress.address) } else ())); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } case (e: Throwable): org.make.api.technical.MakeDowningActor.CoordinationError((e @ _)) => { context.log.error("Error while retrieving nodes", e); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol] } }))
48 43315 1728 - 1751 Apply org.make.constructr.coordination.Coordination.getNodes coordination.getNodes()
48 43625 1709 - 1886 Apply akka.actor.typed.scaladsl.ActorContext.pipeToSelf context.pipeToSelf[Set[akka.actor.Address]](coordination.getNodes())(((x0$2: scala.util.Try[Set[akka.actor.Address]]) => x0$2 match { case (value: Set[akka.actor.Address]): scala.util.Success[Set[akka.actor.Address]]((nodes @ _)) => MakeDowningActor.this.MembersReceived.apply(nodes) case (exception: Throwable): scala.util.Failure[Set[akka.actor.Address]]((e @ _)) => MakeDowningActor.this.CoordinationError.apply(e) }))
49 35023 1792 - 1814 Apply org.make.api.technical.MakeDowningActor.MembersReceived.apply MakeDowningActor.this.MembersReceived.apply(nodes)
50 47793 1852 - 1872 Apply org.make.api.technical.MakeDowningActor.CoordinationError.apply MakeDowningActor.this.CoordinationError.apply(e)
52 36828 1899 - 1913 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol]
54 49838 1989 - 2003 Select akka.actor.typed.scaladsl.ActorContext.system context.system
54 41028 1981 - 2004 Apply akka.actor.ExtensionId.apply akka.cluster.Cluster.apply(context.system)
55 33432 2031 - 2052 Select akka.cluster.ClusterEvent.CurrentClusterState.members cluster.state.members
56 33181 2065 - 2421 Apply scala.collection.IterableOnceOps.foreach members.foreach[Unit](((member: akka.cluster.Member) => if (nodes.contains(member.uniqueAddress.address).unary_!) { context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String)); cluster.down(member.uniqueAddress.address) } else ()))
57 43347 2111 - 2156 Select scala.Boolean.unary_! nodes.contains(member.uniqueAddress.address).unary_!
57 41756 2107 - 2107 Block <nosymbol> ()
57 50928 2127 - 2155 Select akka.cluster.UniqueAddress.address member.uniqueAddress.address
57 49872 2107 - 2107 Literal <nosymbol> ()
57 36587 2158 - 2407 Block <nosymbol> { context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String)); cluster.down(member.uniqueAddress.address) }
58 34938 2176 - 2332 Apply org.slf4j.Logger.warn context.log.warn(("Downing node ".+(member.uniqueAddress.address.toString).+(" since it is no longer present in coordination"): String))
61 47547 2362 - 2390 Select akka.cluster.UniqueAddress.address member.uniqueAddress.address
61 43663 2349 - 2391 Apply akka.cluster.Cluster.down cluster.down(member.uniqueAddress.address)
64 50962 2434 - 2448 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol]
66 43107 2500 - 2552 Apply org.slf4j.Logger.error context.log.error("Error while retrieving nodes", e)
67 34976 2565 - 2579 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.MakeDowningActor.Protocol]