1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.organisation
21 
22 import akka.Done
23 import akka.actor.typed.{ActorSystem, Behavior}
24 import akka.util.Timeout
25 import grizzled.slf4j.Logging
26 import org.make.api.technical.KafkaConsumerBehavior
27 import org.make.api.technical.KafkaConsumerBehavior.Protocol
28 import org.make.api.user.UserProducerBehavior
29 import org.make.api.userhistory.{OrganisationRegisteredEvent, OrganisationUpdatedEvent, UserEvent, UserEventWrapper}
30 import org.make.core.user.UserId
31 import org.make.core.user.indexed.IndexedOrganisation
32 
33 import scala.concurrent.ExecutionContext.Implicits.global
34 import scala.concurrent.Future
35 import scala.concurrent.duration.DurationInt
36 
37 class OrganisationConsumerBehavior(
38   organisationService: OrganisationService,
39   elasticsearchOrganisationAPI: OrganisationSearchEngine
40 ) extends KafkaConsumerBehavior[UserEventWrapper]
41     with Logging {
42 
43   override protected val topicKey: String = UserProducerBehavior.topicKey
44 
45   implicit val timeout: Timeout = Timeout(5.seconds)
46 
47   override def handleMessage(message: UserEventWrapper)(system: ActorSystem[_]): Future[_] = {
48     message.event match {
49       case event: OrganisationRegisteredEvent => onCreateOrUpdate(event)
50       case event: OrganisationUpdatedEvent    => onCreateOrUpdate(event)
51       case event                              => doNothing(event)
52     }
53   }
54 
55   def onCreateOrUpdate(event: UserEvent): Future[Done] = {
56     retrieveAndShapeOrganisation(event.userId).flatMap(indexOrUpdate)
57   }
58 
59   def indexOrUpdate(organisation: IndexedOrganisation): Future[Done] = {
60     debug(s"Indexing $organisation")
61     elasticsearchOrganisationAPI
62       .findOrganisationById(organisation.organisationId)
63       .flatMap {
64         case None => elasticsearchOrganisationAPI.indexOrganisation(organisation)
65         case Some(found) =>
66           elasticsearchOrganisationAPI.updateOrganisation(organisation.copy(countsByQuestion = found.countsByQuestion))
67       }
68   }
69 
70   private def retrieveAndShapeOrganisation(id: UserId): Future[IndexedOrganisation] = {
71     organisationService.getOrganisation(id).flatMap {
72       case None               => Future.failed(new IllegalArgumentException(s"Organisation ${id.value} doesn't exist"))
73       case Some(organisation) => Future.successful(IndexedOrganisation.createFromOrganisation(organisation))
74     }
75   }
76 
77   override val groupId = "organisation-consumer"
78 }
79 
80 object OrganisationConsumerBehavior {
81   def apply(
82     organisationService: OrganisationService,
83     elasticsearchOrganisationAPI: OrganisationSearchEngine
84   ): Behavior[Protocol] =
85     new OrganisationConsumerBehavior(organisationService, elasticsearchOrganisationAPI)
86       .createBehavior(name)
87 
88   val name: String = "organisation-consumer"
89 }
Line Stmt Id Pos Tree Symbol Tests Code
43 6083 1650 - 1679 Select org.make.api.user.UserProducerBehavior.topicKey org.make.api.user.UserProducerBehavior.topicKey
45 5881 1715 - 1733 Apply akka.util.Timeout.apply akka.util.Timeout.apply(scala.concurrent.duration.`package`.DurationInt(5).seconds)
45 7465 1723 - 1724 Literal <nosymbol> 5
45 6630 1723 - 1732 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(5).seconds
48 7184 1834 - 1847 Select org.make.api.userhistory.UserEventWrapper.event message.event
49 6895 1905 - 1928 Apply org.make.api.organisation.OrganisationConsumerBehavior.onCreateOrUpdate OrganisationConsumerBehavior.this.onCreateOrUpdate(event)
50 5936 1978 - 2001 Apply org.make.api.organisation.OrganisationConsumerBehavior.onCreateOrUpdate OrganisationConsumerBehavior.this.onCreateOrUpdate(event)
51 7332 2051 - 2067 Apply org.make.api.technical.KafkaConsumerBehavior.doNothing OrganisationConsumerBehavior.this.doNothing(event)
56 6588 2171 - 2183 Select org.make.api.userhistory.UserPersistentEvent.userId event.userId
56 6187 2193 - 2206 Apply org.make.api.organisation.OrganisationConsumerBehavior.indexOrUpdate OrganisationConsumerBehavior.this.indexOrUpdate(organisation)
56 6633 2142 - 2207 ApplyToImplicitArgs scala.concurrent.Future.flatMap OrganisationConsumerBehavior.this.retrieveAndShapeOrganisation(event.userId).flatMap[akka.Done](((organisation: org.make.core.user.indexed.IndexedOrganisation) => OrganisationConsumerBehavior.this.indexOrUpdate(organisation)))(scala.concurrent.ExecutionContext.Implicits.global)
56 7587 2192 - 2192 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
60 5890 2290 - 2322 Apply grizzled.slf4j.Logging.debug OrganisationConsumerBehavior.this.debug(("Indexing ".+(organisation): String))
62 7297 2384 - 2411 Select org.make.core.user.indexed.IndexedOrganisation.organisationId organisation.organisationId
63 7197 2327 - 2667 ApplyToImplicitArgs scala.concurrent.Future.flatMap OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.findOrganisationById(organisation.organisationId).flatMap[akka.Done](((x0$1: Option[org.make.core.user.indexed.IndexedOrganisation]) => x0$1 match { case scala.None => OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.indexOrganisation(organisation, OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.indexOrganisation$default$2) case (value: org.make.core.user.indexed.IndexedOrganisation): Some[org.make.core.user.indexed.IndexedOrganisation]((found @ _)) => OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.updateOrganisation({ <artifact> val x$1: Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion] @scala.reflect.internal.annotations.uncheckedBounds = found.countsByQuestion; <artifact> val x$2: org.make.core.user.UserId = organisation.copy$default$1; <artifact> val x$3: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$2; <artifact> val x$4: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$3; <artifact> val x$5: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$4; <artifact> val x$6: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$5; <artifact> val x$7: Boolean = organisation.copy$default$6; <artifact> val x$8: Int = organisation.copy$default$7; <artifact> val x$9: Int = organisation.copy$default$8; <artifact> val x$10: org.make.core.reference.Country = organisation.copy$default$9; <artifact> val x$11: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$10; organisation.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$1) }, OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.updateOrganisation$default$2) }))(scala.concurrent.ExecutionContext.Implicits.global)
63 5856 2428 - 2428 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
64 6052 2451 - 2511 Apply org.make.api.organisation.OrganisationSearchEngine.indexOrganisation OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.indexOrganisation(organisation, OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.indexOrganisation$default$2)
64 6810 2480 - 2480 Select org.make.api.organisation.OrganisationSearchEngine.indexOrganisation$default$2 OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.indexOrganisation$default$2
66 7392 2635 - 2657 Select org.make.core.user.indexed.IndexedOrganisation.countsByQuestion found.countsByQuestion
66 6554 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$10 organisation.copy$default$10
66 5851 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$5 organisation.copy$default$5
66 7493 2579 - 2579 Select org.make.api.organisation.OrganisationSearchEngine.updateOrganisation$default$2 OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.updateOrganisation$default$2
66 6081 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$2 organisation.copy$default$2
66 6781 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$7 organisation.copy$default$7
66 7301 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$6 organisation.copy$default$6
66 6628 2550 - 2659 Apply org.make.api.organisation.OrganisationSearchEngine.updateOrganisation OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.updateOrganisation({ <artifact> val x$1: Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion] @scala.reflect.internal.annotations.uncheckedBounds = found.countsByQuestion; <artifact> val x$2: org.make.core.user.UserId = organisation.copy$default$1; <artifact> val x$3: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$2; <artifact> val x$4: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$3; <artifact> val x$5: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$4; <artifact> val x$6: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$5; <artifact> val x$7: Boolean = organisation.copy$default$6; <artifact> val x$8: Int = organisation.copy$default$7; <artifact> val x$9: Int = organisation.copy$default$8; <artifact> val x$10: org.make.core.reference.Country = organisation.copy$default$9; <artifact> val x$11: Option[String] @scala.reflect.internal.annotations.uncheckedBounds = organisation.copy$default$10; organisation.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$1) }, OrganisationConsumerBehavior.this.elasticsearchOrganisationAPI.updateOrganisation$default$2)
66 6595 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$1 organisation.copy$default$1
66 6006 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$8 organisation.copy$default$8
66 7396 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$9 organisation.copy$default$9
66 6741 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$4 organisation.copy$default$4
66 7527 2611 - 2611 Select org.make.core.user.indexed.IndexedOrganisation.copy$default$3 organisation.copy$default$3
66 5706 2598 - 2658 Apply org.make.core.user.indexed.IndexedOrganisation.copy organisation.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$1)
71 6631 2765 - 3049 ApplyToImplicitArgs scala.concurrent.Future.flatMap OrganisationConsumerBehavior.this.organisationService.getOrganisation(id).flatMap[org.make.core.user.indexed.IndexedOrganisation](((x0$1: Option[org.make.core.user.User]) => x0$1 match { case scala.None => scala.concurrent.Future.failed[Nothing](new scala.`package`.IllegalArgumentException(("Organisation ".+(id.value).+(" doesn\'t exist"): String))) case (value: org.make.core.user.User): Some[org.make.core.user.User]((organisation @ _)) => scala.concurrent.Future.successful[org.make.core.user.indexed.IndexedOrganisation](org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation(organisation, org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation$default$2)) }))(scala.concurrent.ExecutionContext.Implicits.global)
71 7495 2813 - 2813 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
72 6785 2862 - 2933 Apply java.lang.IllegalArgumentException.<init> new scala.`package`.IllegalArgumentException(("Organisation ".+(id.value).+(" doesn\'t exist"): String))
72 6009 2848 - 2934 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](new scala.`package`.IllegalArgumentException(("Organisation ".+(id.value).+(" doesn\'t exist"): String)))
73 7330 3006 - 3006 Select org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation$default$2 org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation$default$2
73 5667 2968 - 3043 Apply scala.concurrent.Future.successful scala.concurrent.Future.successful[org.make.core.user.indexed.IndexedOrganisation](org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation(organisation, org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation$default$2))
73 6558 2986 - 3042 Apply org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation(organisation, org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation$default$2)
77 5795 3080 - 3103 Literal <nosymbol> "organisation-consumer"
86 6769 3293 - 3404 Apply org.make.api.technical.KafkaConsumerBehavior.createBehavior new OrganisationConsumerBehavior(organisationService, elasticsearchOrganisationAPI).createBehavior(OrganisationConsumerBehavior.this.name)
86 7202 3399 - 3403 Select org.make.api.organisation.OrganisationConsumerBehavior.name OrganisationConsumerBehavior.this.name
88 6051 3427 - 3450 Literal <nosymbol> "organisation-consumer"