1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.crm
21 
22 import akka.actor.typed.Behavior
23 import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
24 import grizzled.slf4j.Logging
25 import org.make.api.technical.ActorProtocol
26 import org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse
27 import org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command
28 
29 import scala.concurrent.duration.{DurationInt, FiniteDuration}
30 import scala.concurrent.{ExecutionContext, Future, Promise}
31 import scala.util.{Failure, Success}
32 
33 object CrmSynchroCsvMonitor extends Logging {
34 
35   @SuppressWarnings(Array("org.wartremover.warts.Recursion"))
36   def apply(crmClient: CrmClient, promise: Promise[Unit], tickInterval: FiniteDuration)(
37     jobIds: Seq[Long]
38   )(implicit executionContext: ExecutionContext): Behavior[Protocol.Command] = {
39 
40     def running(jobIds: Seq[Long], pendingCalls: Seq[Long]): Behavior[Protocol.Command] = {
41       Behaviors.setup { implicit context =>
42         Behaviors.receiveMessage {
43           case Command.Tick           => handleTick(jobIds, pendingCalls)
44           case Command.QuotaAvailable => Behaviors.same
45           case Command.CrmCallSucceeded(jobId, response) =>
46             handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(running)
47           case Command.CrmCallFailed(jobId, e) =>
48             handleErrorResponse(jobId, e)(jobIds, pendingCalls)(running)
49         }
50       }
51     }
52 
53     def blocked(jobIds: Seq[Long], pendingCalls: Seq[Long]): Behavior[Protocol.Command] = {
54       Behaviors.withTimers { timers =>
55         timers.startSingleTimer(Command.QuotaAvailable, 1.hour)
56         Behaviors.receiveMessage {
57           case Command.Tick => Behaviors.same
58           case Command.QuotaAvailable =>
59             logger.info("Job checker becomes available again")
60             running(jobIds, pendingCalls)
61           case Command.CrmCallSucceeded(jobId, response) =>
62             handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(blocked)
63           case Command.CrmCallFailed(jobId, e) =>
64             handleErrorResponse(jobId, e)(jobIds, pendingCalls)(blocked)
65         }
66       }
67     }
68 
69     def handleTick(jobIds: Seq[Long], pendingCalls: Seq[Long])(
70       implicit context: ActorContext[Command]
71     ): Behavior[Command] = {
72       (jobIds, pendingCalls) match {
73         case (Seq(), Seq()) =>
74           promise.success {}
75           Behaviors.stopped
76         case (Seq(), _) =>
77           Behaviors.same
78         case (toMonitor +: tail, _) =>
79           val futureCall: Future[ManageContactsWithCsvResponse] = crmClient.monitorCsvImport(toMonitor)
80           context.pipeToSelf(futureCall) {
81             case Success(response) => Command.CrmCallSucceeded(toMonitor, response)
82             case Failure(e)        => Command.CrmCallFailed(toMonitor, e)
83           }
84           running(tail, pendingCalls :+ toMonitor)
85       }
86 
87     }
88 
89     def handleErrorResponse(jobId: Long, error: Throwable)(jobIds: Seq[Long], pendingCalls: Seq[Long])(
90       next: (Seq[Long], Seq[Long]) => Behavior[Command]
91     ): Behavior[Command] =
92       error match {
93         case QuotaExceeded(_, message) =>
94           logger.warn(s"Job checker is becoming blocked for an hour, received message is $message")
95           blocked(jobIds :+ jobId, pendingCalls.filterNot(_ == jobId))
96         case other =>
97           logger.error(s"Error when checking status for job $jobId", other)
98           next(jobIds :+ jobId, pendingCalls.filterNot(_ == jobId))
99       }
100 
101     def handleSuccessfulResponse(jobId: Long, response: BasicCrmResponse.ManageContactsWithCsvResponse)(
102       jobIds: Seq[Long],
103       pendingCalls: Seq[Long]
104     )(next: (Seq[Long], Seq[Long]) => Behavior[Command]): Behavior[Command] =
105       if (response.data.forall(response => response.status == "Completed" || response.status == "Abort")) {
106         logger.info(s"Job $jobId completed successfully: $response")
107         if (response.data.forall(response => response.errorCount > 0)) {
108           logger.error(
109             s"Job $jobId has errors: $response, " +
110               s"file should be at https://api.mailjet.com/v3/DATA/Batchjob/$jobId/CSVError/text:csv"
111           )
112         }
113         next(jobIds, pendingCalls.filterNot(_ == jobId))
114       } else {
115         next(jobIds :+ jobId, pendingCalls.filterNot(_ == jobId))
116       }
117 
118     Behaviors.withTimers { timers =>
119       timers.startTimerWithFixedDelay(Command.Tick, tickInterval)
120       running(jobIds, Seq.empty)
121     }
122   }
123 
124   sealed abstract class Protocol extends ActorProtocol
125 
126   object Protocol {
127 
128     sealed abstract class Command extends Protocol
129 
130     object Command {
131       case object Tick extends Command
132       case object QuotaAvailable extends Command
133       final case class CrmCallSucceeded(jobId: Long, response: ManageContactsWithCsvResponse) extends Command
134       final case class CrmCallFailed(jobId: Long, error: Throwable) extends Command
135     }
136   }
137 
138 }
Line Stmt Id Pos Tree Symbol Tests Code
41 23315 1662 - 2150 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((implicit context: akka.actor.typed.scaladsl.ActorContext[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]) => akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((x0$1: org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command) => x0$1 match { case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick => handleTick(jobIds, pendingCalls)(context) case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable => akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command] case (jobId: Long, response: org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded((jobId @ _), (response @ _)) => handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls))) case (jobId: Long, error: Throwable): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed((jobId @ _), (e @ _)) => handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls))) }))))
42 24429 1708 - 2142 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((x0$1: org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command) => x0$1 match { case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick => handleTick(jobIds, pendingCalls)(context) case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable => akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command] case (jobId: Long, response: org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded((jobId @ _), (response @ _)) => handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls))) case (jobId: Long, error: Throwable): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed((jobId @ _), (e @ _)) => handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls))) }))
43 24594 1776 - 1808 ApplyToImplicitArgs org.make.api.technical.crm.CrmSynchroCsvMonitor.handleTick handleTick(jobIds, pendingCalls)(context)
44 22541 1850 - 1864 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]
46 24904 1937 - 2009 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.handleSuccessfulResponse handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls)))
46 27430 2001 - 2008 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.running running(jobIds, pendingCalls)
48 26475 2072 - 2132 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.handleErrorResponse handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => running(jobIds, pendingCalls)))
48 22637 2124 - 2131 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.running running(jobIds, pendingCalls)
54 26334 2256 - 2865 Apply akka.actor.typed.scaladsl.Behaviors.withTimers akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]) => { timers.startSingleTimer(org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable, scala.concurrent.duration.`package`.DurationInt(1).hour); akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((x0$1: org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command) => x0$1 match { case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick => akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command] case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable => { CrmSynchroCsvMonitor.this.logger.info("Job checker becomes available again"); running(jobIds, pendingCalls) } case (jobId: Long, response: org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded((jobId @ _), (response @ _)) => handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls))) case (jobId: Long, error: Throwable): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed((jobId @ _), (e @ _)) => handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls))) })) }))
55 26780 2321 - 2343 Select org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable
55 24522 2345 - 2346 Literal <nosymbol> 1
55 27445 2297 - 2352 Apply akka.actor.typed.scaladsl.TimerScheduler.startSingleTimer timers.startSingleTimer(org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable, scala.concurrent.duration.`package`.DurationInt(1).hour)
55 22552 2345 - 2351 Select scala.concurrent.duration.DurationConversions.hour scala.concurrent.duration.`package`.DurationInt(1).hour
56 22474 2361 - 2857 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((x0$1: org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command) => x0$1 match { case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick => akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command] case org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.QuotaAvailable => { CrmSynchroCsvMonitor.this.logger.info("Job checker becomes available again"); running(jobIds, pendingCalls) } case (jobId: Long, response: org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded((jobId @ _), (response @ _)) => handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls))) case (jobId: Long, error: Throwable): org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed((jobId @ _), (e @ _)) => handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls))) }))
57 25033 2419 - 2433 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]
59 22648 2487 - 2537 Apply grizzled.slf4j.Logger.info CrmSynchroCsvMonitor.this.logger.info("Job checker becomes available again")
60 26614 2550 - 2579 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.running running(jobIds, pendingCalls)
62 24442 2716 - 2723 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.blocked blocked(jobIds, pendingCalls)
62 23326 2652 - 2724 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.handleSuccessfulResponse handleSuccessfulResponse(jobId, response)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls)))
64 24531 2787 - 2847 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.handleErrorResponse handleErrorResponse(jobId, e)(jobIds, pendingCalls)(((jobIds: Seq[Long], pendingCalls: Seq[Long]) => blocked(jobIds, pendingCalls)))
64 26916 2839 - 2846 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.blocked blocked(jobIds, pendingCalls)
74 25045 3090 - 3108 Apply scala.concurrent.Promise.success promise.success(())
75 22791 3119 - 3136 TypeApply akka.actor.typed.scaladsl.Behaviors.stopped akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]
77 26625 3174 - 3188 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]
79 24455 3294 - 3331 ApplyToImplicitArgs org.make.api.technical.crm.CrmClient.monitorCsvImport crmClient.monitorCsvImport(toMonitor, crmClient.monitorCsvImport$default$2)(executionContext)
80 24883 3342 - 3544 Apply akka.actor.typed.scaladsl.ActorContext.pipeToSelf context.pipeToSelf[org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse](futureCall)(((x0$1: scala.util.Try[org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse]) => x0$1 match { case (value: org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse): scala.util.Success[org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse]((response @ _)) => org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded.apply(toMonitor, response) case (exception: Throwable): scala.util.Failure[org.make.api.technical.crm.BasicCrmResponse.ManageContactsWithCsvResponse]((e @ _)) => org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed.apply(toMonitor, e) }))
81 23086 3413 - 3458 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded.apply org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallSucceeded.apply(toMonitor, response)
82 26931 3497 - 3532 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed.apply org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.CrmCallFailed.apply(toMonitor, e)
84 26342 3555 - 3595 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.running running(tail, pendingCalls.:+[Long](toMonitor))
84 22487 3569 - 3594 Apply scala.collection.SeqOps.:+ pendingCalls.:+[Long](toMonitor)
94 24972 3871 - 3960 Apply grizzled.slf4j.Logger.warn CrmSynchroCsvMonitor.this.logger.warn(("Job checker is becoming blocked for an hour, received message is ".+(message): String))
95 24373 3996 - 4030 Apply scala.collection.IterableOps.filterNot pendingCalls.filterNot(((x$1: Long) => x$1.==(jobId)))
95 26472 4019 - 4029 Apply scala.Long.== x$1.==(jobId)
95 23095 3971 - 4031 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.blocked blocked(jobIds.:+[Long](jobId), pendingCalls.filterNot(((x$1: Long) => x$1.==(jobId))))
95 22798 3979 - 3994 Apply scala.collection.SeqOps.:+ jobIds.:+[Long](jobId)
97 26856 4064 - 4129 Apply grizzled.slf4j.Logger.error CrmSynchroCsvMonitor.this.logger.error(("Error when checking status for job ".+(jobId): String), other)
98 26104 4162 - 4196 Apply scala.collection.IterableOps.filterNot pendingCalls.filterNot(((x$2: Long) => x$2.==(jobId)))
98 22323 4185 - 4195 Apply scala.Long.== x$2.==(jobId)
98 24981 4140 - 4197 Apply scala.Function2.apply next.apply(jobIds.:+[Long](jobId), pendingCalls.filterNot(((x$2: Long) => x$2.==(jobId))))
98 24897 4145 - 4160 Apply scala.collection.SeqOps.:+ jobIds.:+[Long](jobId)
105 22267 4551 - 4958 Block <nosymbol> { CrmSynchroCsvMonitor.this.logger.info(("Job ".+(jobId).+(" completed successfully: ").+(response): String)); if (response.data.forall(((response: org.make.api.technical.crm.CsvImportResponse) => response.errorCount.>(0)))) CrmSynchroCsvMonitor.this.logger.error(("Job ".+(jobId).+(" has errors: ").+(response).+(", "): String).+(("file should be at https://api.mailjet.com/v3/DATA/Batchjob/".+(jobId).+("/CSVError/text:csv"): String))) else (); next.apply(jobIds, pendingCalls.filterNot(((x$3: Long) => x$3.==(jobId)))) }
105 22946 4507 - 4518 Literal <nosymbol> "Completed"
105 28182 4455 - 4549 Apply scala.collection.IterableOnceOps.forall response.data.forall(((response: org.make.api.technical.crm.CsvImportResponse) => response.status.==("Completed").||(response.status.==("Abort"))))
105 24210 4488 - 4548 Apply scala.Boolean.|| response.status.==("Completed").||(response.status.==("Abort"))
105 26402 4522 - 4548 Apply java.lang.Object.== response.status.==("Abort")
106 26868 4561 - 4621 Apply grizzled.slf4j.Logger.info CrmSynchroCsvMonitor.this.logger.info(("Job ".+(jobId).+(" completed successfully: ").+(response): String))
107 26411 4630 - 4630 Literal <nosymbol> ()
107 24362 4630 - 4630 Block <nosymbol> ()
107 24526 4667 - 4690 Apply scala.Int.> response.errorCount.>(0)
107 22259 4634 - 4691 Apply scala.collection.IterableOnceOps.forall response.data.forall(((response: org.make.api.technical.crm.CsvImportResponse) => response.errorCount.>(0)))
108 22958 4705 - 4883 Block grizzled.slf4j.Logger.error CrmSynchroCsvMonitor.this.logger.error(("Job ".+(jobId).+(" has errors: ").+(response).+(", "): String).+(("file should be at https://api.mailjet.com/v3/DATA/Batchjob/".+(jobId).+("/CSVError/text:csv"): String)))
108 25123 4705 - 4883 Apply grizzled.slf4j.Logger.error CrmSynchroCsvMonitor.this.logger.error(("Job ".+(jobId).+(" has errors: ").+(response).+(", "): String).+(("file should be at https://api.mailjet.com/v3/DATA/Batchjob/".+(jobId).+("/CSVError/text:csv"): String)))
109 26330 4731 - 4871 Apply java.lang.String.+ ("Job ".+(jobId).+(" has errors: ").+(response).+(", "): String).+(("file should be at https://api.mailjet.com/v3/DATA/Batchjob/".+(jobId).+("/CSVError/text:csv"): String))
113 24833 4902 - 4950 Apply scala.Function2.apply next.apply(jobIds, pendingCalls.filterNot(((x$3: Long) => x$3.==(jobId))))
113 28190 4938 - 4948 Apply scala.Long.== x$3.==(jobId)
113 26838 4915 - 4949 Apply scala.collection.IterableOps.filterNot pendingCalls.filterNot(((x$3: Long) => x$3.==(jobId)))
115 22796 4996 - 5030 Apply scala.collection.IterableOps.filterNot pendingCalls.filterNot(((x$4: Long) => x$4.==(jobId)))
115 26259 4979 - 4994 Apply scala.collection.SeqOps.:+ jobIds.:+[Long](jobId)
115 24369 4974 - 5031 Block scala.Function2.apply next.apply(jobIds.:+[Long](jobId), pendingCalls.filterNot(((x$4: Long) => x$4.==(jobId))))
115 25137 5019 - 5029 Apply scala.Long.== x$4.==(jobId)
115 26712 4974 - 5031 Apply scala.Function2.apply next.apply(jobIds.:+[Long](jobId), pendingCalls.filterNot(((x$4: Long) => x$4.==(jobId))))
118 26269 5045 - 5182 Apply akka.actor.typed.scaladsl.Behaviors.withTimers akka.actor.typed.scaladsl.Behaviors.withTimers[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command](((timers: akka.actor.typed.scaladsl.TimerScheduler[org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command]) => { timers.startTimerWithFixedDelay(org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick, tickInterval); running(jobIds, scala.`package`.Seq.empty[Nothing]) }))
119 28123 5116 - 5128 Select org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick
119 26851 5084 - 5143 Apply akka.actor.typed.scaladsl.TimerScheduler.startTimerWithFixedDelay timers.startTimerWithFixedDelay(org.make.api.technical.crm.CrmSynchroCsvMonitor.Protocol.Command.Tick, tickInterval)
120 24671 5166 - 5175 TypeApply scala.collection.SeqFactory.Delegate.empty scala.`package`.Seq.empty[Nothing]
120 22413 5150 - 5176 Apply org.make.api.technical.crm.CrmSynchroCsvMonitor.running running(jobIds, scala.`package`.Seq.empty[Nothing])