1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.job
21 
22 import cats.implicits._
23 import akka.actor.typed.ActorRef
24 import akka.actor.typed.scaladsl.AskPattern._
25 import akka.util.Timeout
26 import org.make.api.technical.job.JobActor.Protocol.Command._
27 import org.make.api.technical.job.JobActor.Protocol.Response._
28 import org.make.api.technical.job.JobReportingActor.JobReportingActorFacade
29 import org.make.api.technical.{ActorSystemComponent, IdGeneratorComponent, SpawnActorServiceComponent, TimeSettings}
30 import org.make.core.job.Job
31 import org.make.core.job.Job.{JobId, Progress}
32 
33 import scala.concurrent.ExecutionContext.Implicits.global
34 import scala.concurrent.Future
35 import scala.concurrent.duration.FiniteDuration
36 
37 trait JobCoordinatorComponent {
38   def jobCoordinator: ActorRef[JobActor.Protocol.Command]
39 }
40 
41 trait JobCoordinatorService {
42   def start(id: JobId, heartRate: FiniteDuration = Job.defaultHeartRate)(
43     work: JobReportingActorFacade => Future[Unit]
44   ): Future[JobAcceptance]
45   def heartbeat(id: JobId): Future[Unit]
46   def report(id: JobId, value: Progress): Future[Unit]
47   def finish(id: JobId, outcome: Option[Throwable]): Future[Unit]
48   def get(id: JobId): Future[Option[Job]]
49 }
50 
51 trait JobCoordinatorServiceComponent {
52   def jobCoordinatorService: JobCoordinatorService
53 }
54 
55 trait DefaultJobCoordinatorServiceComponent extends JobCoordinatorServiceComponent {
56   self: ActorSystemComponent with IdGeneratorComponent with JobCoordinatorComponent with SpawnActorServiceComponent =>
57 
58   override lazy val jobCoordinatorService: JobCoordinatorService = new DefaultJobCoordinatorService
59 
60   class DefaultJobCoordinatorService extends JobCoordinatorService {
61 
62     implicit val timeout: Timeout = TimeSettings.defaultTimeout
63 
64     override def start(id: JobId, heartRate: FiniteDuration)(
65       work: JobReportingActorFacade => Future[Unit]
66     ): Future[JobAcceptance] = {
67       (jobCoordinator ? (Start(id, _))).flatMap {
68         case acceptance @ JobAcceptance(true) =>
69           spawnActorService
70             .spawn(
71               behavior = JobReportingActor(id, work, jobCoordinatorService, heartRate),
72               name = s"JobReportingActor-${id.value}"
73             )
74             .map(_ => acceptance)
75         case acceptance => Future.successful(acceptance)
76       }
77     }
78 
79     override def heartbeat(id: JobId): Future[Unit] = {
80       (jobCoordinator ? (Heartbeat(id, _))).void
81     }
82 
83     override def report(id: JobId, progress: Progress): Future[Unit] = {
84       (jobCoordinator ? (Report(id, progress, _))).void
85     }
86 
87     override def finish(id: JobId, outcome: Option[Throwable]): Future[Unit] = {
88       (jobCoordinator ? (Finish(id, outcome, _))).void
89     }
90 
91     override def get(id: JobId): Future[Option[Job]] = {
92       (jobCoordinator ? (Get(id, _))).map(_.value)
93     }
94 
95   }
96 
97 }
Line Stmt Id Pos Tree Symbol Tests Code
62 10870 2428 - 2455 Select org.make.api.technical.TimeSettings.defaultTimeout org.make.api.technical.job.jobcoordinatorservicetest org.make.api.technical.TimeSettings.defaultTimeout
67 17363 2652 - 2652 Select scala.concurrent.ExecutionContext.Implicits.global org.make.api.technical.job.jobcoordinatorservicetest scala.concurrent.ExecutionContext.Implicits.global
67 17001 2611 - 2625 Select org.make.api.technical.job.JobCoordinatorComponent.jobCoordinator org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorServiceComponent.this.jobCoordinator
67 15167 2629 - 2641 Apply org.make.api.technical.job.JobActor.Protocol.Command.Start.apply org.make.api.technical.job.jobcoordinatorservicetest org.make.api.technical.job.JobActor.Protocol.Command.Start.apply(id, x$1)
67 17449 2626 - 2626 Select org.make.api.technical.ActorSystemComponent.actorSystem org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorServiceComponent.this.actorSystem
67 11664 2626 - 2626 Select org.make.api.technical.job.DefaultJobCoordinatorServiceComponent.DefaultJobCoordinatorService.timeout org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorService.this.timeout
67 13889 2626 - 2626 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem org.make.api.technical.job.jobcoordinatorservicetest akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)
67 13904 2611 - 3005 ApplyToImplicitArgs scala.concurrent.Future.flatMap org.make.api.technical.job.jobcoordinatorservicetest akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](((x$1: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance]) => org.make.api.technical.job.JobActor.Protocol.Command.Start.apply(id, x$1)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)).flatMap[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](((x0$1: org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance) => x0$1 match { case (acceptance @ (isAccepted: Boolean): org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance(true)) => { <artifact> val qual$1: org.make.api.technical.SpawnActorService = DefaultJobCoordinatorServiceComponent.this.spawnActorService; <artifact> val x$1: akka.actor.typed.Behavior[org.make.api.technical.job.JobReportingActor.Protocol.Command] @scala.reflect.internal.annotations.uncheckedBounds = JobReportingActor.apply(id, work, DefaultJobCoordinatorServiceComponent.this.jobCoordinatorService, heartRate); <artifact> val x$2: String = ("JobReportingActor-".+(id.value): String); <artifact> val x$3: akka.actor.typed.Props = qual$1.spawn$default$3[Nothing]; qual$1.spawn[org.make.api.technical.job.JobReportingActor.Protocol.Command](x$1, x$2, x$3) }.map[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](((x$2: akka.actor.typed.ActorRef[org.make.api.technical.job.JobReportingActor.Protocol.Command]) => acceptance))(scala.concurrent.ExecutionContext.Implicits.global) case (acceptance @ _) => scala.concurrent.Future.successful[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](acceptance) }))(scala.concurrent.ExecutionContext.Implicits.global)
69 11878 2713 - 2730 Select org.make.api.technical.SpawnActorServiceComponent.spawnActorService DefaultJobCoordinatorServiceComponent.this.spawnActorService
70 10885 2713 - 2906 Apply org.make.api.technical.SpawnActorService.spawn qual$1.spawn[org.make.api.technical.job.JobReportingActor.Protocol.Command](x$1, x$2, x$3)
70 14639 2744 - 2744 TypeApply org.make.api.technical.SpawnActorService.spawn$default$3 qual$1.spawn$default$3[Nothing]
71 18246 2776 - 2837 Apply org.make.api.technical.job.JobReportingActor.apply JobReportingActor.apply(id, work, DefaultJobCoordinatorServiceComponent.this.jobCoordinatorService, heartRate)
74 16881 2923 - 2923 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
74 14874 2713 - 2940 ApplyToImplicitArgs scala.concurrent.Future.map { <artifact> val qual$1: org.make.api.technical.SpawnActorService = DefaultJobCoordinatorServiceComponent.this.spawnActorService; <artifact> val x$1: akka.actor.typed.Behavior[org.make.api.technical.job.JobReportingActor.Protocol.Command] @scala.reflect.internal.annotations.uncheckedBounds = JobReportingActor.apply(id, work, DefaultJobCoordinatorServiceComponent.this.jobCoordinatorService, heartRate); <artifact> val x$2: String = ("JobReportingActor-".+(id.value): String); <artifact> val x$3: akka.actor.typed.Props = qual$1.spawn$default$3[Nothing]; qual$1.spawn[org.make.api.technical.job.JobReportingActor.Protocol.Command](x$1, x$2, x$3) }.map[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](((x$2: akka.actor.typed.ActorRef[org.make.api.technical.job.JobReportingActor.Protocol.Command]) => acceptance))(scala.concurrent.ExecutionContext.Implicits.global)
75 11678 2968 - 2997 Apply scala.concurrent.Future.successful scala.concurrent.Future.successful[org.make.api.technical.job.JobActor.Protocol.Response.JobAcceptance](acceptance)
80 11289 3091 - 3091 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
80 16642 3091 - 3091 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)
80 17841 3094 - 3110 Apply org.make.api.technical.job.JobActor.Protocol.Command.Heartbeat.apply org.make.api.technical.job.JobActor.Protocol.Command.Heartbeat.apply(id, x$3)
80 10196 3076 - 3090 Select org.make.api.technical.job.JobCoordinatorComponent.jobCoordinator DefaultJobCoordinatorServiceComponent.this.jobCoordinator
80 10791 3091 - 3091 Select org.make.api.technical.ActorSystemComponent.actorSystem DefaultJobCoordinatorServiceComponent.this.actorSystem
80 14894 3076 - 3111 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.Askable.? akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$3: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Heartbeat.apply(id, x$3)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem))
80 17370 3091 - 3091 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
80 14430 3091 - 3091 Select org.make.api.technical.job.DefaultJobCoordinatorServiceComponent.DefaultJobCoordinatorService.timeout DefaultJobCoordinatorService.this.timeout
80 13793 3076 - 3117 Select cats.Functor.Ops.void cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.api.technical.job.JobActor.Protocol.Response.Process](akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$3: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Heartbeat.apply(id, x$3)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void
84 15379 3205 - 3247 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.Askable.? akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$4: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Report.apply(id, progress, x$4)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem))
84 14186 3205 - 3253 Select cats.Functor.Ops.void cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.api.technical.job.JobActor.Protocol.Response.Process](akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$4: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Report.apply(id, progress, x$4)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void
84 17861 3223 - 3246 Apply org.make.api.technical.job.JobActor.Protocol.Command.Report.apply org.make.api.technical.job.JobActor.Protocol.Command.Report.apply(id, progress, x$4)
84 10076 3205 - 3219 Select org.make.api.technical.job.JobCoordinatorComponent.jobCoordinator DefaultJobCoordinatorServiceComponent.this.jobCoordinator
84 11661 3220 - 3220 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
84 16661 3220 - 3220 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)
84 10799 3220 - 3220 Select org.make.api.technical.ActorSystemComponent.actorSystem DefaultJobCoordinatorServiceComponent.this.actorSystem
84 14621 3220 - 3220 Select org.make.api.technical.job.DefaultJobCoordinatorServiceComponent.DefaultJobCoordinatorService.timeout DefaultJobCoordinatorService.this.timeout
84 17564 3220 - 3220 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
88 18341 3367 - 3389 Apply org.make.api.technical.job.JobActor.Protocol.Command.Finish.apply org.make.api.technical.job.JobActor.Protocol.Command.Finish.apply(id, outcome, x$5)
88 13098 3349 - 3390 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.Askable.? akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$5: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Finish.apply(id, outcome, x$5)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem))
88 17578 3364 - 3364 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
88 13896 3349 - 3396 Select cats.Functor.Ops.void cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.api.technical.job.JobActor.Protocol.Response.Process](akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.Process](((x$5: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.Process]) => org.make.api.technical.job.JobActor.Protocol.Command.Finish.apply(id, outcome, x$5)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void
88 17155 3364 - 3364 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)
88 10094 3349 - 3363 Select org.make.api.technical.job.JobCoordinatorComponent.jobCoordinator DefaultJobCoordinatorServiceComponent.this.jobCoordinator
88 11568 3364 - 3364 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
88 14636 3364 - 3364 Select org.make.api.technical.job.DefaultJobCoordinatorServiceComponent.DefaultJobCoordinatorService.timeout DefaultJobCoordinatorService.this.timeout
88 11008 3364 - 3364 Select org.make.api.technical.ActorSystemComponent.actorSystem DefaultJobCoordinatorServiceComponent.this.actorSystem
92 16890 3483 - 3483 ApplyToImplicitArgs akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem org.make.api.technical.job.jobcoordinatorservicetest akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)
92 18142 3486 - 3496 Apply org.make.api.technical.job.JobActor.Protocol.Command.Get.apply org.make.api.technical.job.jobcoordinatorservicetest org.make.api.technical.job.JobActor.Protocol.Command.Get.apply(id, x$6)
92 11027 3483 - 3483 Select org.make.api.technical.ActorSystemComponent.actorSystem org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorServiceComponent.this.actorSystem
92 13584 3503 - 3510 Select org.make.api.technical.job.JobActor.Protocol.Response.State.value x$7.value
92 14532 3483 - 3483 Select org.make.api.technical.job.DefaultJobCoordinatorServiceComponent.DefaultJobCoordinatorService.timeout org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorService.this.timeout
92 11580 3502 - 3502 Select scala.concurrent.ExecutionContext.Implicits.global org.make.api.technical.job.jobcoordinatorservicetest scala.concurrent.ExecutionContext.Implicits.global
92 10593 3468 - 3482 Select org.make.api.technical.job.JobCoordinatorComponent.jobCoordinator org.make.api.technical.job.jobcoordinatorservicetest DefaultJobCoordinatorServiceComponent.this.jobCoordinator
92 17594 3468 - 3511 ApplyToImplicitArgs scala.concurrent.Future.map org.make.api.technical.job.jobcoordinatorservicetest akka.actor.typed.scaladsl.AskPattern.Askable[org.make.api.technical.job.JobActor.Protocol.Command](DefaultJobCoordinatorServiceComponent.this.jobCoordinator).?[org.make.api.technical.job.JobActor.Protocol.Response.State](((x$6: akka.actor.typed.ActorRef[org.make.api.technical.job.JobActor.Protocol.Response.State]) => org.make.api.technical.job.JobActor.Protocol.Command.Get.apply(id, x$6)))(DefaultJobCoordinatorService.this.timeout, akka.actor.typed.scaladsl.AskPattern.schedulerFromActorSystem(DefaultJobCoordinatorServiceComponent.this.actorSystem)).map[Option[org.make.core.job.Job]](((x$7: org.make.api.technical.job.JobActor.Protocol.Response.State) => x$7.value))(scala.concurrent.ExecutionContext.Implicits.global)