1 /* 2 * Make.org Core API 3 * Copyright (C) 2018 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api.proposal 21 22 import akka.stream._ 23 import akka.stream.scaladsl.{Keep, RestartFlow, Sink, Source, SourceQueueWithComplete} 24 import org.make.api.technical.ActorSystemComponent 25 import org.make.api.technical.elasticsearch.{ElasticsearchConfigurationComponent, ProposalIndexationStream} 26 import org.make.core.proposal.ProposalId 27 28 import scala.concurrent.ExecutionContext.Implicits.global 29 import scala.concurrent.Future 30 import scala.concurrent.duration.DurationInt 31 32 trait ProposalIndexerServiceComponent { 33 def proposalIndexerService: ProposalIndexerService 34 } 35 36 trait ProposalIndexerService { 37 def offer(proposalId: ProposalId): Future[Unit] 38 } 39 40 trait DefaultProposalIndexerServiceComponent 41 extends ProposalIndexerServiceComponent 42 with ElasticsearchConfigurationComponent 43 with ProposalIndexationStream { 44 this: ActorSystemComponent => 45 46 override lazy val proposalIndexerService: DefaultProposalIndexerService = new DefaultProposalIndexerService 47 48 class DefaultProposalIndexerService extends ProposalIndexerService { 49 lazy val bufferSize: Int = 50 elasticsearchConfiguration.entityBufferSize * elasticsearchConfiguration.entityBulkSize 51 val backoff: RestartSettings = RestartSettings(minBackoff = 1.second, maxBackoff = 20.seconds, randomFactor = 0.2) 52 val dispatcher: String = "make-api.elasticSearch.dispatcher" 53 lazy val proposalIndexationQueue: SourceQueueWithComplete[ProposalId] = 54 Source 55 .queue[ProposalId](bufferSize = bufferSize, OverflowStrategy.backpressure) 56 .via(RestartFlow.withBackoff(backoff) { () => 57 ProposalStream.indexOrUpdateFlow 58 }) 59 .withAttributes(ActorAttributes.dispatcher(dispatcher)) 60 .toMat(Sink.ignore)(Keep.left) 61 .run() 62 63 override def offer(proposalId: ProposalId): Future[Unit] = { 64 proposalIndexationQueue.offer(proposalId).flatMap { 65 case QueueOfferResult.Enqueued => Future.unit 66 case QueueOfferResult.Dropped => 67 Future.failed(QueueOfferException(s"Item with id ${proposalId.value} dropped from indexation queue")) 68 case QueueOfferResult.QueueClosed => 69 Future.failed(QueueOfferException("Proposal indexation queue closed. You might want to restart it.")) 70 case QueueOfferResult.Failure(ex) => Future.failed(ex) 71 } 72 } 73 } 74 } 75 76 final case class QueueOfferException(message: String) extends Exception(message)
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 51 | 18996 | 1997 - 2007 | Select | scala.concurrent.duration.DurationConversions.seconds | scala.concurrent.duration.`package`.DurationInt(20).seconds | |
| 51 | 19375 | 1945 - 2028 | Apply | akka.stream.RestartSettings.apply | akka.stream.RestartSettings.apply(scala.concurrent.duration.`package`.DurationInt(1).second, scala.concurrent.duration.`package`.DurationInt(20).seconds, 0.2) | |
| 51 | 19318 | 1997 - 1999 | Literal | <nosymbol> | 20 | |
| 51 | 19099 | 1974 - 1975 | Literal | <nosymbol> | 1 | |
| 51 | 18788 | 1974 - 1982 | Select | scala.concurrent.duration.DurationConversions.second | scala.concurrent.duration.`package`.DurationInt(1).second | |
| 51 | 18805 | 2024 - 2027 | Literal | <nosymbol> | 0.2 | |
| 52 | 19037 | 2058 - 2093 | Literal | <nosymbol> | "make-api.elasticSearch.dispatcher" | |
| 64 | 18842 | 2614 - 2614 | Select | scala.concurrent.ExecutionContext.Implicits.global | scala.concurrent.ExecutionContext.Implicits.global | |
| 64 | 19359 | 2564 - 3050 | ApplyToImplicitArgs | scala.concurrent.Future.flatMap | DefaultProposalIndexerService.this.proposalIndexationQueue.offer(proposalId).flatMap[Unit](((x0$1: akka.stream.QueueOfferResult) => x0$1 match { case akka.stream.QueueOfferResult.Enqueued => scala.concurrent.Future.unit case akka.stream.QueueOfferResult.Dropped => scala.concurrent.Future.failed[Nothing](QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String))) case akka.stream.QueueOfferResult.QueueClosed => scala.concurrent.Future.failed[Nothing](QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it.")) case (cause: Throwable): akka.stream.QueueOfferResult.Failure((ex @ _)) => scala.concurrent.Future.failed[Nothing](ex) }))(scala.concurrent.ExecutionContext.Implicits.global) | |
| 65 | 18712 | 2658 - 2669 | Select | scala.concurrent.Future.unit | scala.concurrent.Future.unit | |
| 67 | 19081 | 2721 - 2822 | Apply | scala.concurrent.Future.failed | scala.concurrent.Future.failed[Nothing](QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String))) | |
| 67 | 19302 | 2735 - 2821 | Apply | org.make.api.proposal.QueueOfferException.apply | QueueOfferException.apply(("Item with id ".+(proposalId.value).+(" dropped from indexation queue"): String)) | |
| 69 | 19321 | 2878 - 2979 | Apply | scala.concurrent.Future.failed | scala.concurrent.Future.failed[Nothing](QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it.")) | |
| 69 | 18752 | 2892 - 2978 | Apply | org.make.api.proposal.QueueOfferException.apply | QueueOfferException.apply("Proposal indexation queue closed. You might want to restart it.") | |
| 70 | 18997 | 3025 - 3042 | Apply | scala.concurrent.Future.failed | scala.concurrent.Future.failed[Nothing](ex) |