1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.proposal
21 
22 import akka.actor.typed
23 import akka.actor.typed.receptionist.Receptionist
24 import akka.actor.typed.scaladsl.Behaviors
25 import akka.actor.typed.{ActorRef, Behavior}
26 import org.make.api.kafka.kafkaDispatcher
27 import org.make.api.sessionhistory.SessionHistoryCoordinatorServiceComponent
28 import org.make.api.technical.{ActorSystemHelper, IdGeneratorComponent}
29 import org.make.api.technical.crm.SendMailPublisherServiceComponent
30 import org.make.api.userhistory.UserHistoryCoordinatorServiceComponent
31 import org.make.api.user.PersistentUserProposalsServiceComponent
32 import org.make.api.operation.OperationOfQuestionServiceComponent
33 
34 import scala.concurrent.duration.FiniteDuration
35 
36 object ProposalSupervisor {
37 
38   type ProposalSupervisorDependencies =
39     ProposalIndexerServiceComponent
40       with SendMailPublisherServiceComponent
41       with OperationOfQuestionServiceComponent
42       with PersistentUserProposalsServiceComponent
43       with UserHistoryCoordinatorServiceComponent
44       with SessionHistoryCoordinatorServiceComponent
45       with IdGeneratorComponent
46 
47   val name: String = "proposal"
48   def apply(dependencies: ProposalSupervisorDependencies, lockDuration: FiniteDuration): Behavior[Nothing] = {
49 
50     Behaviors.setup[Nothing] { context =>
51       val proposalCoordinator: ActorRef[ProposalCommand] = ProposalCoordinator(
52         system = context.system,
53         sessionHistoryCoordinatorService = dependencies.sessionHistoryCoordinatorService,
54         lockDuration = lockDuration,
55         idGenerator = dependencies.idGenerator
56       )
57       context.watch(proposalCoordinator)
58       context.system.receptionist ! Receptionist.Register(ProposalCoordinator.Key, proposalCoordinator)
59 
60       context.watch(
61         context.spawn(
62           ActorSystemHelper.superviseWithBackoff(ProposalKafkaProducerBehavior()),
63           ProposalKafkaProducerBehavior.name,
64           typed.Props.empty.withDispatcherFromConfig(kafkaDispatcher)
65         )
66       )
67 
68       context.watch(
69         context.spawn(
70           ActorSystemHelper
71             .superviseWithBackoff(ProposalUserHistoryConsumerBehavior(dependencies.userHistoryCoordinatorService)),
72           ProposalUserHistoryConsumerBehavior.name,
73           typed.Props.empty.withDispatcherFromConfig(kafkaDispatcher)
74         )
75       )
76 
77       context.watch(
78         context.spawn(
79           ActorSystemHelper.superviseWithBackoff(
80             ProposalEmailConsumerActor(
81               dependencies.sendMailPublisherService,
82               dependencies.persistentUserProposalsService
83             )
84           ),
85           ProposalEmailConsumerActor.name,
86           typed.Props.empty.withDispatcherFromConfig(kafkaDispatcher)
87         )
88       )
89 
90       context.watch(
91         context.spawn(
92           ActorSystemHelper.superviseWithBackoff(
93             ProposalConsumerBehavior(dependencies.proposalIndexerService, dependencies.operationOfQuestionService)
94           ),
95           ProposalConsumerBehavior.name,
96           akka.actor.typed.Props.empty.withDispatcherFromConfig(kafkaDispatcher)
97         )
98       )
99 
100       Behaviors.empty
101     }
102   }
103 }
Line Stmt Id Pos Tree Symbol Tests Code
47 33513 1854 - 1864 Literal <nosymbol> "proposal"
50 37712 1981 - 3845 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[Nothing](((context: akka.actor.typed.scaladsl.ActorContext[Nothing]) => { val proposalCoordinator: akka.actor.typed.ActorRef[org.make.api.proposal.ProposalCommand] = ProposalCoordinator.apply(context.system, dependencies.sessionHistoryCoordinatorService, lockDuration, dependencies.idGenerator); context.watch[org.make.api.proposal.ProposalCommand](proposalCoordinator); typed.this.ActorRef.ActorRefOps[akka.actor.typed.receptionist.Receptionist.Command](context.system.receptionist).!(akka.actor.typed.receptionist.Receptionist.Register.apply[org.make.api.proposal.ProposalCommand](ProposalCoordinator.Key, proposalCoordinator)); context.watch[org.make.api.proposal.PublishedProposalEvent](context.spawn[org.make.api.proposal.PublishedProposalEvent](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.proposal.PublishedProposalEvent](ProposalKafkaProducerBehavior.apply()), ProposalKafkaProducerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))); context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalUserHistoryConsumerBehavior.apply(dependencies.userHistoryCoordinatorService)), ProposalUserHistoryConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))); context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalEmailConsumerActor.apply(dependencies.sendMailPublisherService, dependencies.persistentUserProposalsService)), ProposalEmailConsumerActor.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))); context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalConsumerBehavior.apply(dependencies.proposalIndexerService, dependencies.operationOfQuestionService)), ProposalConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))); akka.actor.typed.scaladsl.Behaviors.empty[Nothing] }))
51 47631 2078 - 2313 Apply org.make.api.proposal.ProposalCoordinator.apply ProposalCoordinator.apply(context.system, dependencies.sessionHistoryCoordinatorService, lockDuration, dependencies.idGenerator)
52 47292 2116 - 2130 Select akka.actor.typed.scaladsl.ActorContext.system context.system
53 38410 2175 - 2220 Select org.make.api.sessionhistory.SessionHistoryCoordinatorServiceComponent.sessionHistoryCoordinatorService dependencies.sessionHistoryCoordinatorService
55 30797 2281 - 2305 Select org.make.api.technical.IdGeneratorComponent.idGenerator dependencies.idGenerator
57 40039 2320 - 2354 Apply akka.actor.typed.scaladsl.ActorContext.watch context.watch[org.make.api.proposal.ProposalCommand](proposalCoordinator)
58 33264 2361 - 2458 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[akka.actor.typed.receptionist.Receptionist.Command](context.system.receptionist).!(akka.actor.typed.receptionist.Receptionist.Register.apply[org.make.api.proposal.ProposalCommand](ProposalCoordinator.Key, proposalCoordinator))
58 31899 2361 - 2388 Select akka.actor.typed.ActorSystem.receptionist context.system.receptionist
58 44928 2413 - 2436 Select org.make.api.proposal.ProposalCoordinator.Key ProposalCoordinator.Key
58 38130 2391 - 2458 Apply akka.actor.typed.receptionist.Receptionist.Register.apply akka.actor.typed.receptionist.Receptionist.Register.apply[org.make.api.proposal.ProposalCommand](ProposalCoordinator.Key, proposalCoordinator)
60 44966 2466 - 2720 Apply akka.actor.typed.scaladsl.ActorContext.watch context.watch[org.make.api.proposal.PublishedProposalEvent](context.spawn[org.make.api.proposal.PublishedProposalEvent](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.proposal.PublishedProposalEvent](ProposalKafkaProducerBehavior.apply()), ProposalKafkaProducerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)))
61 32972 2489 - 2712 Apply akka.actor.typed.scaladsl.ActorContext.spawn context.spawn[org.make.api.proposal.PublishedProposalEvent](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.proposal.PublishedProposalEvent](ProposalKafkaProducerBehavior.apply()), ProposalKafkaProducerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))
62 47330 2553 - 2584 Apply org.make.api.proposal.ProposalKafkaProducerBehavior.apply ProposalKafkaProducerBehavior.apply()
62 38442 2514 - 2585 Apply org.make.api.technical.ActorSystemHelper.superviseWithBackoff org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.proposal.PublishedProposalEvent](ProposalKafkaProducerBehavior.apply())
63 31332 2597 - 2631 Select org.make.api.proposal.ProposalKafkaProducerBehavior.name ProposalKafkaProducerBehavior.name
64 40854 2643 - 2702 Apply akka.actor.typed.Props.withDispatcherFromConfig akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)
64 44652 2686 - 2701 Select org.make.api.kafka.kafkaDispatcher org.make.api.kafka.kafkaDispatcher
68 33007 2728 - 3049 Apply akka.actor.typed.scaladsl.ActorContext.watch context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalUserHistoryConsumerBehavior.apply(dependencies.userHistoryCoordinatorService)), ProposalUserHistoryConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)))
69 39992 2751 - 3041 Apply akka.actor.typed.scaladsl.ActorContext.spawn context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalUserHistoryConsumerBehavior.apply(dependencies.userHistoryCoordinatorService)), ProposalUserHistoryConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))
71 47367 2776 - 2908 Apply org.make.api.technical.ActorSystemHelper.superviseWithBackoff org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalUserHistoryConsumerBehavior.apply(dependencies.userHistoryCoordinatorService))
71 37880 2864 - 2906 Select org.make.api.userhistory.UserHistoryCoordinatorServiceComponent.userHistoryCoordinatorService dependencies.userHistoryCoordinatorService
71 34018 2828 - 2907 Apply org.make.api.proposal.ProposalUserHistoryConsumerBehavior.apply ProposalUserHistoryConsumerBehavior.apply(dependencies.userHistoryCoordinatorService)
72 39508 2920 - 2960 Select org.make.api.proposal.ProposalUserHistoryConsumerBehavior.name ProposalUserHistoryConsumerBehavior.name
73 31371 3015 - 3030 Select org.make.api.kafka.kafkaDispatcher org.make.api.kafka.kafkaDispatcher
73 44411 2972 - 3031 Apply akka.actor.typed.Props.withDispatcherFromConfig akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)
77 32153 3057 - 3453 Apply akka.actor.typed.scaladsl.ActorContext.watch context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalEmailConsumerActor.apply(dependencies.sendMailPublisherService, dependencies.persistentUserProposalsService)), ProposalEmailConsumerActor.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)))
78 40029 3080 - 3445 Apply akka.actor.typed.scaladsl.ActorContext.spawn context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalEmailConsumerActor.apply(dependencies.sendMailPublisherService, dependencies.persistentUserProposalsService)), ProposalEmailConsumerActor.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))
79 46517 3105 - 3321 Apply org.make.api.technical.ActorSystemHelper.superviseWithBackoff org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalEmailConsumerActor.apply(dependencies.sendMailPublisherService, dependencies.persistentUserProposalsService))
80 34055 3157 - 3309 Apply org.make.api.proposal.ProposalEmailConsumerActor.apply ProposalEmailConsumerActor.apply(dependencies.sendMailPublisherService, dependencies.persistentUserProposalsService)
81 45501 3199 - 3236 Select org.make.api.technical.crm.SendMailPublisherServiceComponent.sendMailPublisherService dependencies.sendMailPublisherService
82 37919 3252 - 3295 Select org.make.api.user.PersistentUserProposalsServiceComponent.persistentUserProposalsService dependencies.persistentUserProposalsService
85 39546 3333 - 3364 Select org.make.api.proposal.ProposalEmailConsumerActor.name ProposalEmailConsumerActor.name
86 44449 3376 - 3435 Apply akka.actor.typed.Props.withDispatcherFromConfig akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)
86 31121 3419 - 3434 Select org.make.api.kafka.kafkaDispatcher org.make.api.kafka.kafkaDispatcher
90 32188 3461 - 3816 Apply akka.actor.typed.scaladsl.ActorContext.watch context.watch[org.make.api.technical.KafkaConsumerBehavior.Protocol](context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalConsumerBehavior.apply(dependencies.proposalIndexerService, dependencies.operationOfQuestionService)), ProposalConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)))
91 39790 3484 - 3808 Apply akka.actor.typed.scaladsl.ActorContext.spawn context.spawn[org.make.api.technical.KafkaConsumerBehavior.Protocol](org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalConsumerBehavior.apply(dependencies.proposalIndexerService, dependencies.operationOfQuestionService)), ProposalConsumerBehavior.name, akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher))
92 46555 3509 - 3675 Apply org.make.api.technical.ActorSystemHelper.superviseWithBackoff org.make.api.technical.ActorSystemHelper.superviseWithBackoff[org.make.api.technical.KafkaConsumerBehavior.Protocol](ProposalConsumerBehavior.apply(dependencies.proposalIndexerService, dependencies.operationOfQuestionService))
93 50451 3561 - 3663 Apply org.make.api.proposal.ProposalConsumerBehavior.apply ProposalConsumerBehavior.apply(dependencies.proposalIndexerService, dependencies.operationOfQuestionService)
93 37954 3623 - 3662 Select org.make.api.operation.OperationOfQuestionServiceComponent.operationOfQuestionService dependencies.operationOfQuestionService
93 45538 3586 - 3621 Select org.make.api.proposal.ProposalIndexerServiceComponent.proposalIndexerService dependencies.proposalIndexerService
95 39460 3687 - 3716 Select org.make.api.proposal.ProposalConsumerBehavior.name ProposalConsumerBehavior.name
96 44484 3728 - 3798 Apply akka.actor.typed.Props.withDispatcherFromConfig akka.actor.typed.Props.empty.withDispatcherFromConfig(org.make.api.kafka.kafkaDispatcher)
96 31156 3782 - 3797 Select org.make.api.kafka.kafkaDispatcher org.make.api.kafka.kafkaDispatcher
100 45995 3824 - 3839 TypeApply akka.actor.typed.scaladsl.Behaviors.empty akka.actor.typed.scaladsl.Behaviors.empty[Nothing]