1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
23 import akka.actor.typed.scaladsl.AskPattern.Askable
24 import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
25 import akka.actor.typed.{ActorRef, ActorSystem, BackoffSupervisorStrategy, Behavior, Scheduler, SupervisorStrategy}
26 import akka.util.Timeout
27 
28 import scala.concurrent.duration.DurationInt
29 import scala.concurrent.Future
30 
31 object ActorSystemHelper {
32   implicit class RichActorSystem(val self: ActorSystem[_]) extends AnyVal {
33     import self.executionContext
34 
35     def findRefByKey[T](key: ServiceKey[T])(implicit timeout: Timeout, scheduler: Scheduler): Future[ActorRef[T]] = {
36       (self.receptionist ? Receptionist.Find(key)).flatMap {
37         case key.Listing(services) =>
38           services.find(_.path.address.hasLocalScope) match {
39             case Some(ref) => Future.successful(ref)
40             case None =>
41               Future.failed(new IllegalStateException(s"Receptionist unable to find actor ref for key: $key"))
42           }
43       }
44 
45     }
46   }
47 
48   private val maxRestarts = 50
49   val DefaultFallbackStrategy: BackoffSupervisorStrategy =
50     SupervisorStrategy
51       .restartWithBackoff(minBackoff = 3.seconds, maxBackoff = 30.seconds, randomFactor = 0.2)
52       .withMaxRestarts(maxRestarts)
53 
54   def superviseWithBackoff[T](behavior: Behavior[T]): Behavior[T] = {
55     Behaviors.supervise(behavior).onFailure(DefaultFallbackStrategy)
56   }
57 
58   implicit def contextToSystem(implicit context: ActorContext[_]): ActorSystem[_] = context.system
59 }
Line Stmt Id Pos Tree Symbol Tests Code
36 364 1485 - 1485 Select akka.actor.typed.ActorSystem.executionContext RichActorSystem.this.self.executionContext
36 340 1433 - 1795 ApplyToImplicitArgs scala.concurrent.Future.flatMap akka.actor.typed.scaladsl.AskPattern.Askable[akka.actor.typed.receptionist.Receptionist.Command](RichActorSystem.this.self.receptionist).?[akka.actor.typed.receptionist.Receptionist.Listing](akka.actor.typed.receptionist.Receptionist.Find.apply[T](key))(timeout, scheduler).flatMap[akka.actor.typed.ActorRef[T]](((x0$1: akka.actor.typed.receptionist.Receptionist.Listing) => x0$1 match { case key.Listing.unapply(<unapply-selector>) <unapply> ((services @ _)) => services.find(((x$1: akka.actor.typed.ActorRef[T]) => x$1.path.address.hasLocalScope)) match { case (value: akka.actor.typed.ActorRef[T]): Some[akka.actor.typed.ActorRef[T]]((ref @ _)) => scala.concurrent.Future.successful[akka.actor.typed.ActorRef[T]](ref) case scala.None => scala.concurrent.Future.failed[Nothing](new java.lang.IllegalStateException(("Receptionist unable to find actor ref for key: ".+(key): String))) } }))(RichActorSystem.this.self.executionContext)
36 378 1433 - 1450 Select akka.actor.typed.ActorSystem.receptionist RichActorSystem.this.self.receptionist
36 354 1453 - 1475 Apply akka.actor.typed.receptionist.Receptionist.Find.apply akka.actor.typed.receptionist.Receptionist.Find.apply[T](key)
38 386 1535 - 1578 Apply scala.collection.IterableOnceOps.find services.find(((x$1: akka.actor.typed.ActorRef[T]) => x$1.path.address.hasLocalScope))
38 347 1549 - 1577 Select akka.actor.Address.hasLocalScope x$1.path.address.hasLocalScope
39 359 1617 - 1639 Apply scala.concurrent.Future.successful scala.concurrent.Future.successful[akka.actor.typed.ActorRef[T]](ref)
41 336 1693 - 1774 Apply java.lang.IllegalStateException.<init> new java.lang.IllegalStateException(("Receptionist unable to find actor ref for key: ".+(key): String))
41 374 1679 - 1775 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](new java.lang.IllegalStateException(("Receptionist unable to find actor ref for key: ".+(key): String)))
48 379 1836 - 1838 Literal <nosymbol> 50
51 382 1984 - 1986 Literal <nosymbol> 30
51 355 1960 - 1961 Literal <nosymbol> 3
51 358 1984 - 1994 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(30).seconds
51 348 1960 - 1969 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(3).seconds
51 335 2011 - 2014 Literal <nosymbol> 0.2
52 373 2039 - 2050 Select org.make.api.technical.ActorSystemHelper.maxRestarts ActorSystemHelper.this.maxRestarts
52 367 1902 - 2051 Apply akka.actor.typed.BackoffSupervisorStrategy.withMaxRestarts akka.actor.typed.SupervisorStrategy.restartWithBackoff(scala.concurrent.duration.`package`.DurationInt(3).seconds, scala.concurrent.duration.`package`.DurationInt(30).seconds, 0.2).withMaxRestarts(ActorSystemHelper.this.maxRestarts)
55 376 2127 - 2191 ApplyToImplicitArgs akka.actor.typed.scaladsl.Behaviors.Supervise.onFailure akka.actor.typed.scaladsl.Behaviors.supervise[T](behavior).onFailure[Nothing](ActorSystemHelper.this.DefaultFallbackStrategy)((ClassTag.Nothing: scala.reflect.ClassTag[Nothing]))
55 345 2167 - 2190 Select org.make.api.technical.ActorSystemHelper.DefaultFallbackStrategy ActorSystemHelper.this.DefaultFallbackStrategy
58 356 2281 - 2295 Select akka.actor.typed.scaladsl.ActorContext.system context.system