1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.proposal
21 
22 import akka.stream._
23 import akka.stream.scaladsl.{Keep, RestartFlow, Sink, Source, SourceQueueWithComplete}
24 import org.make.api.technical.ActorSystemComponent
25 import org.make.api.technical.elasticsearch.{ElasticsearchConfigurationComponent, ProposalIndexationStream}
26 import org.make.core.proposal.ProposalId
27 
28 import scala.concurrent.ExecutionContext.Implicits.global
29 import scala.concurrent.Future
30 import scala.concurrent.duration.DurationInt
31 
32 trait ProposalIndexerServiceComponent {
33   def proposalIndexerService: ProposalIndexerService
34 }
35 
36 trait ProposalIndexerService {
37   def offer(proposalId: ProposalId): Future[Unit]
38 }
39 
40 trait DefaultProposalIndexerServiceComponent
41     extends ProposalIndexerServiceComponent
42     with ElasticsearchConfigurationComponent
43     with ProposalIndexationStream {
44   this: ActorSystemComponent =>
45 
46   override lazy val proposalIndexerService: DefaultProposalIndexerService = new DefaultProposalIndexerService
47 
48   class DefaultProposalIndexerService extends ProposalIndexerService {
49     lazy val bufferSize: Int =
50       elasticsearchConfiguration.entityBufferSize * elasticsearchConfiguration.entityBulkSize
51     val backoff: RestartSettings = RestartSettings(minBackoff = 1.second, maxBackoff = 20.seconds, randomFactor = 0.2)
52     val dispatcher: String = "make-api.elasticSearch.dispatcher"
53     lazy val proposalIndexationQueue: SourceQueueWithComplete[ProposalId] =
54       Source
55         .queue[ProposalId](bufferSize = bufferSize, OverflowStrategy.backpressure)
56         .via(RestartFlow.withBackoff(backoff) { () =>
57           ProposalStream.indexOrUpdateFlow
58         })
59         .withAttributes(ActorAttributes.dispatcher(dispatcher))
60         .toMat(Sink.ignore)(Keep.left)
61         .run()
62 
63     override def offer(proposalId: ProposalId): Future[Unit] = {
64       proposalIndexationQueue.offer(proposalId).flatMap {
65         case QueueOfferResult.Enqueued => Future.unit
66         case QueueOfferResult.Dropped =>
67           Future.failed(QueueOfferException(s"Item with id ${proposalId.value} dropped from indexation queue"))
68         case QueueOfferResult.QueueClosed =>
69           Future.failed(QueueOfferException("Proposal indexation queue closed. You might want to restart it."))
70         case QueueOfferResult.Failure(ex) => Future.failed(ex)
71       }
72     }
73   }
74 }
75 
76 final case class QueueOfferException(message: String) extends Exception(message)
Line Stmt Id Pos Tree Symbol Tests Code
51 18996 1997 - 2007 Select scala.concurrent.duration.DurationConversions.seconds scala.concurrent.duration.`package`.DurationInt(20).seconds
51 19375 1945 - 2028 Apply akka.stream.RestartSettings.apply akka.stream.RestartSettings.apply(scala.concurrent.duration.`package`.DurationInt(1).second, scala.concurrent.duration.`package`.DurationInt(20).seconds, 0.2)
51 19318 1997 - 1999 Literal <nosymbol> 20
51 19099 1974 - 1975 Literal <nosymbol> 1
51 18788 1974 - 1982 Select scala.concurrent.duration.DurationConversions.second scala.concurrent.duration.`package`.DurationInt(1).second
51 18805 2024 - 2027 Literal <nosymbol> 0.2
52 19037 2058 - 2093 Literal <nosymbol> "make-api.elasticSearch.dispatcher"
64 18842 2614 - 2614 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
64 19359 2564 - 3050 ApplyToImplicitArgs scala.concurrent.Future.flatMap DefaultProposalIndexerService.this.proposalIndexationQueue.offer(proposalId).flatMap[Unit](((x0$1: akka.stream.QueueOfferResult) => x0$1 match { case akka.stream.QueueOfferResult.Enqueued => scala.concurrent.Future.unit case akka.stream.QueueOfferResult.Dropped => scala.concurrent.Future.failed[Nothing](QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String))) case akka.stream.QueueOfferResult.QueueClosed => scala.concurrent.Future.failed[Nothing](QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it.")) case (cause: Throwable): akka.stream.QueueOfferResult.Failure((ex @ _)) => scala.concurrent.Future.failed[Nothing](ex) }))(scala.concurrent.ExecutionContext.Implicits.global)
65 18712 2658 - 2669 Select scala.concurrent.Future.unit scala.concurrent.Future.unit
67 19081 2721 - 2822 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String)))
67 19302 2735 - 2821 Apply org.make.api.proposal.QueueOfferException.apply QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String))
69 19321 2878 - 2979 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it."))
69 18752 2892 - 2978 Apply org.make.api.proposal.QueueOfferException.apply QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it.")
70 18997 3025 - 3042 Apply scala.concurrent.Future.failed scala.concurrent.Future.failed[Nothing](ex)