1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.proposal
21 
22 import cats.implicits._
23 import akka.actor.typed.{ActorSystem, Behavior}
24 import akka.util.Timeout
25 import grizzled.slf4j.Logging
26 import org.make.api.proposal.PublishedProposalEvent._
27 import org.make.api.technical.KafkaConsumerBehavior
28 import org.make.api.operation.OperationOfQuestionService
29 
30 import scala.concurrent.ExecutionContext.Implicits.global
31 import scala.concurrent.Future
32 import scala.concurrent.duration.DurationInt
33 
34 class ProposalConsumerBehavior(
35   proposalIndexerService: ProposalIndexerService,
36   operationOfQuestionService: OperationOfQuestionService
37 ) extends KafkaConsumerBehavior[ProposalEventWrapper]
38     with Logging {
39 
40   override protected val topicKey: String = ProposalKafkaProducerBehavior.topicKey
41   override val groupId = "proposal-consumer"
42 
43   implicit val timeout: Timeout = Timeout(5.seconds)
44 
45   override def handleMessage(message: ProposalEventWrapper)(system: ActorSystem[_]): Future[_] = {
46     message.event match {
47       case event: ProposalViewed               => doNothing(event)
48       case event: ReindexProposal              => onCreateOrUpdate(event)
49       case event: ProposalUpdated              => onCreateOrUpdate(event)
50       case event: ProposalReportedNotice       => doNothing(event)
51       case event: ProposalVotesVerifiedUpdated => onCreateOrUpdate(event)
52       case event: ProposalVotesUpdated         => onCreateOrUpdate(event)
53       case event: ProposalProposed             => onCreate(event)
54       case event: ProposalAccepted             => onCreateOrUpdate(event)
55       case event: ProposalRefused              => onCreateOrUpdate(event)
56       case event: ProposalPostponed            => onCreateOrUpdate(event)
57       case event: ProposalVoted                => onCreateOrUpdate(event)
58       case event: ProposalUnvoted              => onCreateOrUpdate(event)
59       case event: ProposalQualified            => onCreateOrUpdate(event)
60       case event: ProposalUnqualified          => onCreateOrUpdate(event)
61       case event: ProposalPatched              => onCreateOrUpdate(event)
62       case event: ProposalAddedToOperation     => onCreateOrUpdate(event)
63       case event: ProposalRemovedFromOperation => onCreateOrUpdate(event)
64       case event: ProposalLocked               => doNothing(event)
65       case event: ProposalAnonymized           => onCreateOrUpdate(event)
66       case event: SimilarProposalsAdded        => doNothing(event)
67       case event: ProposalKeywordsSet          => onCreateOrUpdate(event)
68     }
69 
70   }
71 
72   def onCreate(event: ProposalProposed): Future[Unit] =
73     event.question.fold(Future.unit)(operationOfQuestionService.incrementProposalsCount(_).void) >>
74       onCreateOrUpdate(event)
75 
76   def onCreateOrUpdate(event: ProposalEvent): Future[Unit] = {
77     proposalIndexerService.offer(event.id).recover {
78       case ex =>
79         error(s"Error presenting proposal to indexation queue: ${ex.getMessage}")
80     }
81   }
82 
83 }
84 
85 object ProposalConsumerBehavior {
86 
87   def apply(
88     proposalIndexerService: ProposalIndexerService,
89     operationOfQuestionService: OperationOfQuestionService
90   ): Behavior[KafkaConsumerBehavior.Protocol] =
91     new ProposalConsumerBehavior(proposalIndexerService, operationOfQuestionService).createBehavior(name)
92 
93   val name: String = "proposal-consumer"
94 }
Line Stmt Id Pos Tree Symbol Tests Code
40 6738 1459 - 1497 Select org.make.api.proposal.ProposalKafkaProducerBehavior.topicKey ProposalKafkaProducerBehavior.topicKey
41 5798 1523 - 1542 Literal <nosymbol> "proposal-consumer"
43 6387 1586 - 1595 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(5).seconds
43 7259 1586 - 1587 Literal <nosymbol> 5
43 6053 1578 - 1596 Apply akka.util.Timeout.apply akka.util.Timeout.apply(scala.concurrent.duration.`package`.DurationInt(5).seconds)
46 7428 1701 - 1714 Select org.make.api.proposal.ProposalEventWrapper.event message.event
47 6551 1773 - 1789 Apply org.make.api.technical.KafkaConsumerBehavior.doNothing ProposalConsumerBehavior.this.doNothing(event)
48 5747 1840 - 1863 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
49 7574 1914 - 1937 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
50 6742 1988 - 2004 Apply org.make.api.technical.KafkaConsumerBehavior.doNothing ProposalConsumerBehavior.this.doNothing(event)
51 5901 2055 - 2078 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
52 7266 2129 - 2152 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
53 6476 2203 - 2218 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreate ProposalConsumerBehavior.this.onCreate(event)
54 6039 2269 - 2292 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
55 7431 2343 - 2366 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
56 6523 2417 - 2440 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
57 5751 2491 - 2514 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
58 7136 2565 - 2588 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
59 6720 2639 - 2662 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
60 5837 2713 - 2736 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
61 7271 2787 - 2810 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
62 6377 2861 - 2884 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
63 5977 2935 - 2958 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
64 7400 3009 - 3025 Apply org.make.api.technical.KafkaConsumerBehavior.doNothing ProposalConsumerBehavior.this.doNothing(event)
65 6526 3076 - 3099 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
66 5755 3150 - 3166 Apply org.make.api.technical.KafkaConsumerBehavior.doNothing ProposalConsumerBehavior.this.doNothing(event)
67 7092 3217 - 3240 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
73 7234 3396 - 3396 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
73 5897 3406 - 3406 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
73 6383 3396 - 3396 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
73 6637 3406 - 3406 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
73 5650 3345 - 3345 ApplyToImplicitArgs cats.instances.FutureInstances.catsStdInstancesForFuture cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)
73 7426 3313 - 3405 Apply scala.Option.fold event.question.fold[scala.concurrent.Future[Unit]](scala.concurrent.Future.unit)(((x$1: org.make.core.question.QuestionId) => cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.core.operation.OperationOfQuestion](ProposalConsumerBehavior.this.operationOfQuestionService.incrementProposalsCount(x$1))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void))
73 6529 3345 - 3345 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
73 5826 3346 - 3399 Apply org.make.api.operation.OperationOfQuestionService.incrementProposalsCount ProposalConsumerBehavior.this.operationOfQuestionService.incrementProposalsCount(x$1)
73 7237 3313 - 3438 ApplyToImplicitArgs cats.syntax.FlatMapOps.>> cats.implicits.catsSyntaxFlatMapOps[scala.concurrent.Future, Unit](event.question.fold[scala.concurrent.Future[Unit]](scala.concurrent.Future.unit)(((x$1: org.make.core.question.QuestionId) => cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.core.operation.OperationOfQuestion](ProposalConsumerBehavior.this.operationOfQuestionService.incrementProposalsCount(x$1))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void)))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).>>[Unit](ProposalConsumerBehavior.this.onCreateOrUpdate(event))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global))
73 6675 3333 - 3344 Select scala.concurrent.Future.unit scala.concurrent.Future.unit
73 5939 3346 - 3404 Select cats.Functor.Ops.void cats.implicits.toFunctorOps[scala.concurrent.Future, org.make.core.operation.OperationOfQuestion](ProposalConsumerBehavior.this.operationOfQuestionService.incrementProposalsCount(x$1))(cats.implicits.catsStdInstancesForFuture(scala.concurrent.ExecutionContext.Implicits.global)).void
74 7104 3415 - 3438 Apply org.make.api.proposal.ProposalConsumerBehavior.onCreateOrUpdate ProposalConsumerBehavior.this.onCreateOrUpdate(event)
77 5655 3507 - 3660 ApplyToImplicitArgs scala.concurrent.Future.recover ProposalConsumerBehavior.this.proposalIndexerService.offer(event.id).recover[Unit](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,Unit] with java.io.Serializable { def <init>(): <$anon: Throwable => Unit> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: Unit](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (ex @ _) => ProposalConsumerBehavior.this.error(("Error presenting proposal to indexation queue: ".+(ex.getMessage()): String)) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (ex @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,Unit]))(scala.concurrent.ExecutionContext.Implicits.global)
77 7395 3554 - 3554 Apply org.make.api.proposal.ProposalConsumerBehavior.$anonfun.<init> new $anonfun()
77 6353 3536 - 3544 Select org.make.api.proposal.ProposalEvent.id event.id
77 6598 3554 - 3554 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
79 7794 3581 - 3654 Apply grizzled.slf4j.Logging.error ProposalConsumerBehavior.this.error(("Error presenting proposal to indexation queue: ".+(ex.getMessage()): String))
91 7056 3976 - 3980 Select org.make.api.proposal.ProposalConsumerBehavior.name ProposalConsumerBehavior.this.name
91 6718 3880 - 3981 Apply org.make.api.technical.KafkaConsumerBehavior.createBehavior new ProposalConsumerBehavior(proposalIndexerService, operationOfQuestionService).createBehavior(ProposalConsumerBehavior.this.name)
93 5902 4004 - 4023 Literal <nosymbol> "proposal-consumer"