1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.idea
21 
22 import akka.Done
23 import akka.actor.typed.{ActorSystem, Behavior}
24 import akka.util.Timeout
25 import grizzled.slf4j.Logging
26 import org.make.api.technical.KafkaConsumerBehavior
27 import org.make.api.technical.KafkaConsumerBehavior.Protocol
28 import org.make.core.idea.IdeaId
29 import org.make.core.idea.indexed.IndexedIdea
30 
31 import scala.concurrent.ExecutionContext.Implicits.global
32 import scala.concurrent.Future
33 import scala.concurrent.duration.DurationInt
34 
35 class IdeaConsumerBehavior(ideaService: IdeaService, elasticsearchIdeaAPI: IdeaSearchEngine)
36     extends KafkaConsumerBehavior[IdeaEventWrapper]
37     with Logging {
38 
39   override protected val topicKey: String = IdeaProducerBehavior.topicKey
40   implicit val timeout: Timeout = Timeout(5.seconds)
41 
42   override def handleMessage(message: IdeaEventWrapper)(system: ActorSystem[_]): Future[_] = {
43     message.event match {
44       case event: IdeaCreatedEvent => onCreateOrUpdate(event)
45       case event: IdeaUpdatedEvent => onCreateOrUpdate(event)
46     }
47   }
48 
49   def onCreateOrUpdate(event: IdeaEvent): Future[Done] = {
50     retrieveAndShapeIdea(event.ideaId).flatMap(indexOrUpdate)
51   }
52 
53   def indexOrUpdate(idea: IndexedIdea): Future[Done] = {
54     debug(s"Indexing $idea")
55     elasticsearchIdeaAPI
56       .findIdeaById(idea.ideaId)
57       .flatMap {
58         case None        => elasticsearchIdeaAPI.indexIdea(idea)
59         case Some(found) => elasticsearchIdeaAPI.updateIdea(idea.copy(proposalsCount = found.proposalsCount))
60       }
61   }
62 
63   private def retrieveAndShapeIdea(id: IdeaId): Future[IndexedIdea] = {
64     ideaService.fetchOne(id).flatMap {
65       case None       => Future.failed(new IllegalArgumentException(s"Idea ${id.value} doesn't exist"))
66       case Some(idea) => Future.successful(IndexedIdea.createFromIdea(idea, proposalsCount = 0))
67     }
68   }
69 
70   override val groupId = "idea-consumer"
71 }
72 
73 object IdeaConsumerBehavior {
74   def apply(ideaService: IdeaService, elasticsearchIdeaAPI: IdeaSearchEngine): Behavior[Protocol] =
75     new IdeaConsumerBehavior(ideaService, elasticsearchIdeaAPI).createBehavior(name)
76   val name: String = "idea-consumer"
77 }
Line Stmt Id Pos Tree Symbol Tests Code
39 6779 1429 - 1458 Select org.make.api.idea.IdeaProducerBehavior.topicKey IdeaProducerBehavior.topicKey
40 5962 1501 - 1502 Literal <nosymbol> 5
40 6904 1493 - 1511 Apply akka.util.Timeout.apply akka.util.Timeout.apply(scala.concurrent.duration.`package`.DurationInt(5).seconds)
40 7451 1501 - 1510 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(5).seconds
43 6069 1612 - 1625 Select org.make.api.idea.IdeaEventWrapper.event message.event
44 7488 1672 - 1695 Apply org.make.api.idea.IdeaConsumerBehavior.onCreateOrUpdate IdeaConsumerBehavior.this.onCreateOrUpdate(event)
45 6695 1734 - 1757 Apply org.make.api.idea.IdeaConsumerBehavior.onCreateOrUpdate IdeaConsumerBehavior.this.onCreateOrUpdate(event)
50 6310 1853 - 1865 Select org.make.api.idea.IdeaEvent.ideaId event.ideaId
50 6767 1874 - 1874 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
50 7597 1875 - 1888 Apply org.make.api.idea.IdeaConsumerBehavior.indexOrUpdate IdeaConsumerBehavior.this.indexOrUpdate(idea)
50 5967 1832 - 1889 ApplyToImplicitArgs scala.concurrent.Future.flatMap IdeaConsumerBehavior.this.retrieveAndShapeIdea(event.ideaId).flatMap[akka.Done](((idea: org.make.core.idea.indexed.IndexedIdea) => IdeaConsumerBehavior.this.indexOrUpdate(idea)))(scala.concurrent.ExecutionContext.Implicits.global)
54 7459 1956 - 1980 Apply grizzled.slf4j.Logging.debug IdeaConsumerBehavior.this.debug(("Indexing ".+(idea): String))
56 7004 2026 - 2037 Select org.make.core.idea.indexed.IndexedIdea.ideaId idea.ideaId
57 7629 2054 - 2054 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
57 6854 1985 - 2238 ApplyToImplicitArgs scala.concurrent.Future.flatMap IdeaConsumerBehavior.this.elasticsearchIdeaAPI.findIdeaById(idea.ideaId).flatMap[akka.Done](((x0$1: Option[org.make.core.idea.indexed.IndexedIdea]) => x0$1 match { case scala.None => IdeaConsumerBehavior.this.elasticsearchIdeaAPI.indexIdea(idea, IdeaConsumerBehavior.this.elasticsearchIdeaAPI.indexIdea$default$2) case (value: org.make.core.idea.indexed.IndexedIdea): Some[org.make.core.idea.indexed.IndexedIdea]((found @ _)) => IdeaConsumerBehavior.this.elasticsearchIdeaAPI.updateIdea({ <artifact> val x$1: Int = found.proposalsCount; <artifact> val x$2: org.make.core.idea.IdeaId = idea.copy$default$1; <artifact> val x$3: String = idea.copy$default$2; <artifact> val x$4: Option[org.make.core.question.QuestionId] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$3; <artifact> val x$5: Option[org.make.core.operation.OperationId] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$4; <artifact> val x$6: org.make.core.idea.IdeaStatus = idea.copy$default$5; <artifact> val x$7: java.time.ZonedDateTime = idea.copy$default$6; <artifact> val x$8: Option[java.time.ZonedDateTime] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$7; idea.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$1) }, IdeaConsumerBehavior.this.elasticsearchIdeaAPI.updateIdea$default$2) }))(scala.concurrent.ExecutionContext.Implicits.global)
58 7464 2084 - 2120 Apply org.make.api.idea.IdeaSearchEngine.indexIdea IdeaConsumerBehavior.this.elasticsearchIdeaAPI.indexIdea(idea, IdeaConsumerBehavior.this.elasticsearchIdeaAPI.indexIdea$default$2)
58 6135 2105 - 2105 Select org.make.api.idea.IdeaSearchEngine.indexIdea$default$2 IdeaConsumerBehavior.this.elasticsearchIdeaAPI.indexIdea$default$2
59 7557 2181 - 2229 Apply org.make.core.idea.indexed.IndexedIdea.copy idea.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$1)
59 7416 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$5 idea.copy$default$5
59 6734 2170 - 2170 Select org.make.api.idea.IdeaSearchEngine.updateIdea$default$2 IdeaConsumerBehavior.this.elasticsearchIdeaAPI.updateIdea$default$2
59 6097 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$7 idea.copy$default$7
59 6728 2208 - 2228 Select org.make.core.idea.indexed.IndexedIdea.proposalsCount found.proposalsCount
59 5974 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$4 idea.copy$default$4
59 5791 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$1 idea.copy$default$1
59 6849 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$3 idea.copy$default$3
59 7010 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$6 idea.copy$default$6
59 7626 2186 - 2186 Select org.make.core.idea.indexed.IndexedIdea.copy$default$2 idea.copy$default$2
59 5885 2149 - 2230 Apply org.make.api.idea.IdeaSearchEngine.updateIdea IdeaConsumerBehavior.this.elasticsearchIdeaAPI.updateIdea({ <artifact> val x$1: Int = found.proposalsCount; <artifact> val x$2: org.make.core.idea.IdeaId = idea.copy$default$1; <artifact> val x$3: String = idea.copy$default$2; <artifact> val x$4: Option[org.make.core.question.QuestionId] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$3; <artifact> val x$5: Option[org.make.core.operation.OperationId] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$4; <artifact> val x$6: org.make.core.idea.IdeaStatus = idea.copy$default$5; <artifact> val x$7: java.time.ZonedDateTime = idea.copy$default$6; <artifact> val x$8: Option[java.time.ZonedDateTime] @scala.reflect.internal.annotations.uncheckedBounds = idea.copy$default$7; idea.copy(x$2, x$3, x$4, x$5, x$6, x$7, x$8, x$1) }, IdeaConsumerBehavior.this.elasticsearchIdeaAPI.updateIdea$default$2)
64 7561 2353 - 2353 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
64 6663 2320 - 2561 ApplyToImplicitArgs scala.concurrent.Future.flatMap IdeaConsumerBehavior.this.ideaService.fetchOne(id).flatMap[org.make.core.idea.indexed.IndexedIdea](((x0$1: Option[org.make.core.idea.Idea]) => x0$1 match { case scala.None => scala.concurrent.Future.failed[Nothing](new scala.`package`.IllegalArgumentException(("Idea ".+(id.value).+(" doesn\'t exist"): String))) case (value: org.make.core.idea.Idea): Some[org.make.core.idea.Idea]((idea @ _)) => scala.concurrent.Future.successful[org.make.core.idea.indexed.IndexedIdea](org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, 0)) }))(scala.concurrent.ExecutionContext.Implicits.global)
65 7423 2380 - 2458 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](new scala.`package`.IllegalArgumentException(("Idea ".+(id.value).+(" doesn\'t exist"): String)))
65 5957 2394 - 2457 Apply java.lang.IllegalArgumentException.<init> new scala.`package`.IllegalArgumentException(("Idea ".+(id.value).+(" doesn\'t exist"): String))
66 6943 2502 - 2554 Apply org.make.core.idea.indexed.IndexedIdea.createFromIdea org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, 0)
66 6101 2484 - 2555 Apply scala.concurrent.Future.successful scala.concurrent.Future.successful[org.make.core.idea.indexed.IndexedIdea](org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, 0))
70 5892 2592 - 2607 Literal <nosymbol> "idea-consumer"
75 6812 2745 - 2825 Apply org.make.api.technical.KafkaConsumerBehavior.createBehavior new IdeaConsumerBehavior(ideaService, elasticsearchIdeaAPI).createBehavior(IdeaConsumerBehavior.this.name)
75 7618 2820 - 2824 Select org.make.api.idea.IdeaConsumerBehavior.name IdeaConsumerBehavior.this.name
76 5966 2847 - 2862 Literal <nosymbol> "idea-consumer"