1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.elasticsearch
21 
22 import akka.stream.Materializer
23 import akka.stream.scaladsl.{Flow, Sink, Source}
24 import akka.{Done, NotUsed}
25 import com.sksamuel.elastic4s.Index
26 import grizzled.slf4j.Logging
27 import org.make.api.organisation.OrganisationSearchEngineComponent
28 import org.make.api.proposal.ProposalSearchEngineComponent
29 import org.make.api.user.PersistentUserServiceComponent
30 import org.make.api.userhistory.{RequestUserVotedProposals, UserHistoryCoordinatorServiceComponent}
31 import org.make.core.history.HistoryActions.VoteAndQualifications
32 import org.make.core.proposal._
33 import org.make.core.proposal.indexed.{IndexedProposal, ProposalsSearchResult}
34 import org.make.core.question.QuestionId
35 import org.make.core.user.indexed.{IndexedOrganisation, ProposalsAndVotesCountsByQuestion}
36 import org.make.core.user.{User, UserId}
37 
38 import scala.concurrent.ExecutionContext.Implicits.global
39 import scala.concurrent.Future
40 import scala.concurrent.duration.DurationDouble
41 
42 trait OrganisationIndexationStream
43     extends IndexationStream
44     with OrganisationSearchEngineComponent
45     with PersistentUserServiceComponent
46     with UserHistoryCoordinatorServiceComponent
47     with ProposalSearchEngineComponent
48     with Logging {
49   object OrganisationStream {
50     def runIndexOrganisations(
51       organisationIndexName: String
52     )(implicit mat: Materializer): Flow[Seq[User], Done, NotUsed] =
53       Flow[Seq[User]]
54         .mapAsync(singleAsync)(organisations => executeIndexOrganisations(organisations, organisationIndexName))
55 
56     def flowIndexOrganisations(organisationIndexName: String)(implicit mat: Materializer): Flow[User, Done, NotUsed] =
57       groupedOrganisations.via(runIndexOrganisations(organisationIndexName))
58   }
59 
60   // Use custom `grouped` to reduce the size of every batch. Purpose: avoid a heavy load of actor commands.
61   private def groupedOrganisations: Flow[User, Seq[User], NotUsed] = Flow[User].groupedWithin(20, 500.milliseconds)
62 
63   private def executeIndexOrganisations(organisations: Seq[User], organisationIndexName: String)(
64     implicit mat: Materializer
65   ): Future[Done] = {
66     def futureVotedProposals(organisationId: UserId): Future[Seq[ProposalId]] =
67       userHistoryCoordinatorService.retrieveVotedProposals(RequestUserVotedProposals(organisationId))
68 
69     def futureFindProposals(proposalIds: Seq[ProposalId]): Future[ProposalsSearchResult] =
70       elasticsearchProposalAPI.searchProposals(
71         SearchQuery(filters = Some(SearchFilters(proposal = Some(ProposalSearchFilter(proposalIds)))))
72       )
73 
74     def futureVotes(
75       proposalIds: Seq[ProposalId],
76       organisationId: UserId
77     ): Future[Map[ProposalId, VoteAndQualifications]] =
78       userHistoryCoordinatorService.retrieveVoteAndQualifications(organisationId, proposalIds)
79 
80     def futureProposalsCountByQuestion(organisationId: UserId): Future[Map[QuestionId, Long]] =
81       elasticsearchProposalAPI.countProposalsByQuestion(
82         maybeQuestionIds = None,
83         status = Some(Seq(ProposalStatus.Accepted)),
84         maybeUserId = Some(organisationId),
85         toEnrich = None,
86         minVotesCount = None,
87         minScore = None
88       )
89 
90     def generateProposalsAndVotesCountsByQuestion(
91       proposals: ProposalsSearchResult,
92       votesByProposals: Map[ProposalId, VoteAndQualifications],
93       proposalsCountByQuestion: Map[QuestionId, Long]
94     ): Seq[ProposalsAndVotesCountsByQuestion] = {
95       val questionIds: Set[QuestionId] = proposalsCountByQuestion.keySet ++ proposals.results.collect {
96         case proposal if votesByProposals.keys.toSet.contains(proposal.id) => proposal.question.map(_.questionId)
97       }.flatten
98 
99       questionIds.map { questionId =>
100         val votedProposalsOfQuestion: Seq[IndexedProposal] =
101           proposals.results.filter(_.question.exists(_.questionId == questionId))
102         val votesCount: Int = votesByProposals.collect {
103           case (proposalId, votes) if votedProposalsOfQuestion.exists(_.id == proposalId) => votes
104         }.size
105         val proposalsCount: Int = proposalsCountByQuestion.getOrElse(questionId, 0L).toInt
106         ProposalsAndVotesCountsByQuestion(questionId, proposalsCount, votesCount)
107       }.toSeq
108     }
109 
110     Source(organisations)
111       .mapAsync(parallelism) { organisation =>
112         val futureCountsByQuestion: Future[Seq[ProposalsAndVotesCountsByQuestion]] = for {
113           proposalIds              <- futureVotedProposals(organisation.userId)
114           proposals                <- futureFindProposals(proposalIds)
115           votesByProposals         <- futureVotes(proposalIds, organisation.userId)
116           proposalsCountByQuestion <- futureProposalsCountByQuestion(organisation.userId)
117         } yield generateProposalsAndVotesCountsByQuestion(proposals, votesByProposals, proposalsCountByQuestion)
118 
119         futureCountsByQuestion.map { countsByQuestion =>
120           IndexedOrganisation.createFromOrganisation(organisation, countsByQuestion)
121         }
122       }
123       .groupedWithin(20, 500.milliseconds)
124       .mapAsync(singleAsync) { organisations =>
125         elasticsearchOrganisationAPI
126           .indexOrganisations(organisations, Some(Index(organisationIndexName)))
127           .recoverWith {
128             case e =>
129               logger.error("Indexing of one organisation chunk failed", e)
130               Future.successful(Done)
131           }
132       }
133       .runWith(Sink.ignore)
134   }
135 }
Line Stmt Id Pos Tree Symbol Tests Code
54 18904 2195 - 2206 Select org.make.api.technical.elasticsearch.IndexationStream.singleAsync OrganisationIndexationStream.this.singleAsync
54 19436 2225 - 2288 ApplyToImplicitArgs org.make.api.technical.elasticsearch.OrganisationIndexationStream.executeIndexOrganisations OrganisationIndexationStream.this.executeIndexOrganisations(organisations, organisationIndexName)(mat)
54 19280 2161 - 2289 Apply akka.stream.scaladsl.FlowOps.mapAsync akka.stream.scaladsl.Flow.apply[Seq[org.make.core.user.User]].mapAsync[akka.Done](OrganisationIndexationStream.this.singleAsync)(((organisations: Seq[org.make.core.user.User]) => OrganisationIndexationStream.this.executeIndexOrganisations(organisations, organisationIndexName)(mat)))
57 18613 2416 - 2486 Apply akka.stream.scaladsl.Flow.via OrganisationIndexationStream.this.groupedOrganisations.via[akka.Done, akka.NotUsed](OrganisationStream.this.runIndexOrganisations(organisationIndexName)(mat))
57 18927 2441 - 2485 ApplyToImplicitArgs org.make.api.technical.elasticsearch.OrganisationIndexationStream.OrganisationStream.runIndexOrganisations OrganisationStream.this.runIndexOrganisations(organisationIndexName)(mat)
61 18648 2698 - 2714 Select scala.concurrent.duration.DurationConversions.milliseconds scala.concurrent.duration.`package`.DurationDouble(500.0).milliseconds
61 18992 2698 - 2701 Literal <nosymbol> 500.0
61 19156 2694 - 2696 Literal <nosymbol> 20
61 19215 2669 - 2715 Apply akka.stream.scaladsl.FlowOps.groupedWithin akka.stream.scaladsl.Flow.apply[org.make.core.user.User].groupedWithin(20, scala.concurrent.duration.`package`.DurationDouble(500.0).milliseconds)
67 19295 3007 - 3007 Select org.make.api.userhistory.RequestUserVotedProposals.apply$default$4 org.make.api.userhistory.RequestUserVotedProposals.apply$default$4
67 19428 3007 - 3007 Select org.make.api.userhistory.RequestUserVotedProposals.apply$default$3 org.make.api.userhistory.RequestUserVotedProposals.apply$default$3
67 18917 3007 - 3048 Apply org.make.api.userhistory.RequestUserVotedProposals.apply org.make.api.userhistory.RequestUserVotedProposals.apply(organisationId, org.make.api.userhistory.RequestUserVotedProposals.apply$default$2, org.make.api.userhistory.RequestUserVotedProposals.apply$default$3, org.make.api.userhistory.RequestUserVotedProposals.apply$default$4)
67 18857 3007 - 3007 Select org.make.api.userhistory.RequestUserVotedProposals.apply$default$2 org.make.api.userhistory.RequestUserVotedProposals.apply$default$2
67 18616 2954 - 3049 Apply org.make.api.userhistory.UserHistoryCoordinatorService.retrieveVotedProposals OrganisationIndexationStream.this.userHistoryCoordinatorService.retrieveVotedProposals(org.make.api.userhistory.RequestUserVotedProposals.apply(organisationId, org.make.api.userhistory.RequestUserVotedProposals.apply$default$2, org.make.api.userhistory.RequestUserVotedProposals.apply$default$3, org.make.api.userhistory.RequestUserVotedProposals.apply$default$4))
70 19414 3148 - 3300 Apply org.make.api.proposal.ProposalSearchEngine.searchProposals OrganisationIndexationStream.this.elasticsearchProposalAPI.searchProposals(org.make.core.proposal.SearchQuery.apply(scala.Some.apply[org.make.core.proposal.SearchFilters](org.make.core.proposal.SearchFilters.apply(scala.Some.apply[org.make.core.proposal.ProposalSearchFilter](org.make.core.proposal.ProposalSearchFilter.apply(proposalIds)), org.make.core.proposal.SearchFilters.apply$default$2, org.make.core.proposal.SearchFilters.apply$default$3, org.make.core.proposal.SearchFilters.apply$default$4, org.make.core.proposal.SearchFilters.apply$default$5, org.make.core.proposal.SearchFilters.apply$default$6, org.make.core.proposal.SearchFilters.apply$default$7, org.make.core.proposal.SearchFilters.apply$default$8, org.make.core.proposal.SearchFilters.apply$default$9, org.make.core.proposal.SearchFilters.apply$default$10, org.make.core.proposal.SearchFilters.apply$default$11, org.make.core.proposal.SearchFilters.apply$default$12, org.make.core.proposal.SearchFilters.apply$default$13, org.make.core.proposal.SearchFilters.apply$default$14, org.make.core.proposal.SearchFilters.apply$default$15, org.make.core.proposal.SearchFilters.apply$default$16, org.make.core.proposal.SearchFilters.apply$default$17, org.make.core.proposal.SearchFilters.apply$default$18, org.make.core.proposal.SearchFilters.apply$default$19, org.make.core.proposal.SearchFilters.apply$default$20, org.make.core.proposal.SearchFilters.apply$default$21, org.make.core.proposal.SearchFilters.apply$default$22, org.make.core.proposal.SearchFilters.apply$default$23, org.make.core.proposal.SearchFilters.apply$default$24, org.make.core.proposal.SearchFilters.apply$default$25, org.make.core.proposal.SearchFilters.apply$default$26, org.make.core.proposal.SearchFilters.apply$default$27, org.make.core.proposal.SearchFilters.apply$default$28, org.make.core.proposal.SearchFilters.apply$default$29, org.make.core.proposal.SearchFilters.apply$default$30, org.make.core.proposal.SearchFilters.apply$default$31)), org.make.core.proposal.SearchQuery.apply$default$2, org.make.core.proposal.SearchQuery.apply$default$3, org.make.core.proposal.SearchQuery.apply$default$4, org.make.core.proposal.SearchQuery.apply$default$5, org.make.core.proposal.SearchQuery.apply$default$6, org.make.core.proposal.SearchQuery.apply$default$7))
71 19211 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$21 org.make.core.proposal.SearchFilters.apply$default$21
71 18897 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$31 org.make.core.proposal.SearchFilters.apply$default$31
71 19277 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$15 org.make.core.proposal.SearchFilters.apply$default$15
71 19160 3255 - 3288 Apply org.make.core.proposal.ProposalSearchFilter.apply org.make.core.proposal.ProposalSearchFilter.apply(proposalIds)
71 18882 3198 - 3292 Apply org.make.core.proposal.SearchQuery.apply org.make.core.proposal.SearchQuery.apply(scala.Some.apply[org.make.core.proposal.SearchFilters](org.make.core.proposal.SearchFilters.apply(scala.Some.apply[org.make.core.proposal.ProposalSearchFilter](org.make.core.proposal.ProposalSearchFilter.apply(proposalIds)), org.make.core.proposal.SearchFilters.apply$default$2, org.make.core.proposal.SearchFilters.apply$default$3, org.make.core.proposal.SearchFilters.apply$default$4, org.make.core.proposal.SearchFilters.apply$default$5, org.make.core.proposal.SearchFilters.apply$default$6, org.make.core.proposal.SearchFilters.apply$default$7, org.make.core.proposal.SearchFilters.apply$default$8, org.make.core.proposal.SearchFilters.apply$default$9, org.make.core.proposal.SearchFilters.apply$default$10, org.make.core.proposal.SearchFilters.apply$default$11, org.make.core.proposal.SearchFilters.apply$default$12, org.make.core.proposal.SearchFilters.apply$default$13, org.make.core.proposal.SearchFilters.apply$default$14, org.make.core.proposal.SearchFilters.apply$default$15, org.make.core.proposal.SearchFilters.apply$default$16, org.make.core.proposal.SearchFilters.apply$default$17, org.make.core.proposal.SearchFilters.apply$default$18, org.make.core.proposal.SearchFilters.apply$default$19, org.make.core.proposal.SearchFilters.apply$default$20, org.make.core.proposal.SearchFilters.apply$default$21, org.make.core.proposal.SearchFilters.apply$default$22, org.make.core.proposal.SearchFilters.apply$default$23, org.make.core.proposal.SearchFilters.apply$default$24, org.make.core.proposal.SearchFilters.apply$default$25, org.make.core.proposal.SearchFilters.apply$default$26, org.make.core.proposal.SearchFilters.apply$default$27, org.make.core.proposal.SearchFilters.apply$default$28, org.make.core.proposal.SearchFilters.apply$default$29, org.make.core.proposal.SearchFilters.apply$default$30, org.make.core.proposal.SearchFilters.apply$default$31)), org.make.core.proposal.SearchQuery.apply$default$2, org.make.core.proposal.SearchQuery.apply$default$3, org.make.core.proposal.SearchQuery.apply$default$4, org.make.core.proposal.SearchQuery.apply$default$5, org.make.core.proposal.SearchQuery.apply$default$6, org.make.core.proposal.SearchQuery.apply$default$7)
71 19199 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$3 org.make.core.proposal.SearchFilters.apply$default$3
71 19109 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$24 org.make.core.proposal.SearchFilters.apply$default$24
71 18894 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$22 org.make.core.proposal.SearchFilters.apply$default$22
71 19244 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$12 org.make.core.proposal.SearchFilters.apply$default$12
71 18834 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$19 org.make.core.proposal.SearchFilters.apply$default$19
71 18843 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$10 org.make.core.proposal.SearchFilters.apply$default$10
71 19139 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$9 org.make.core.proposal.SearchFilters.apply$default$9
71 19175 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$4 org.make.core.proposal.SearchQuery.apply$default$4
71 19276 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$6 org.make.core.proposal.SearchFilters.apply$default$6
71 18597 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$3 org.make.core.proposal.SearchQuery.apply$default$3
71 18654 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$29 org.make.core.proposal.SearchFilters.apply$default$29
71 19201 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$7 org.make.core.proposal.SearchQuery.apply$default$7
71 18645 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$6 org.make.core.proposal.SearchQuery.apply$default$6
71 18950 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$25 org.make.core.proposal.SearchFilters.apply$default$25
71 18612 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$17 org.make.core.proposal.SearchFilters.apply$default$17
71 19141 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$18 org.make.core.proposal.SearchFilters.apply$default$18
71 18836 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$28 org.make.core.proposal.SearchFilters.apply$default$28
71 18944 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$16 org.make.core.proposal.SearchFilters.apply$default$16
71 18665 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$11 org.make.core.proposal.SearchFilters.apply$default$11
71 18919 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$2 org.make.core.proposal.SearchQuery.apply$default$2
71 18686 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$2 org.make.core.proposal.SearchFilters.apply$default$2
71 19197 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$30 org.make.core.proposal.SearchFilters.apply$default$30
71 18964 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$7 org.make.core.proposal.SearchFilters.apply$default$7
71 18668 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$20 org.make.core.proposal.SearchFilters.apply$default$20
71 18859 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$4 org.make.core.proposal.SearchFilters.apply$default$4
71 18635 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$8 org.make.core.proposal.SearchFilters.apply$default$8
71 18617 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$26 org.make.core.proposal.SearchFilters.apply$default$26
71 19173 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$27 org.make.core.proposal.SearchFilters.apply$default$27
71 18823 3250 - 3289 Apply scala.Some.apply scala.Some.apply[org.make.core.proposal.ProposalSearchFilter](org.make.core.proposal.ProposalSearchFilter.apply(proposalIds))
71 18862 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$13 org.make.core.proposal.SearchFilters.apply$default$13
71 18808 3198 - 3198 Select org.make.core.proposal.SearchQuery.apply$default$5 org.make.core.proposal.SearchQuery.apply$default$5
71 19418 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$14 org.make.core.proposal.SearchFilters.apply$default$14
71 19421 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$23 org.make.core.proposal.SearchFilters.apply$default$23
71 19412 3225 - 3290 Apply org.make.core.proposal.SearchFilters.apply org.make.core.proposal.SearchFilters.apply(scala.Some.apply[org.make.core.proposal.ProposalSearchFilter](org.make.core.proposal.ProposalSearchFilter.apply(proposalIds)), org.make.core.proposal.SearchFilters.apply$default$2, org.make.core.proposal.SearchFilters.apply$default$3, org.make.core.proposal.SearchFilters.apply$default$4, org.make.core.proposal.SearchFilters.apply$default$5, org.make.core.proposal.SearchFilters.apply$default$6, org.make.core.proposal.SearchFilters.apply$default$7, org.make.core.proposal.SearchFilters.apply$default$8, org.make.core.proposal.SearchFilters.apply$default$9, org.make.core.proposal.SearchFilters.apply$default$10, org.make.core.proposal.SearchFilters.apply$default$11, org.make.core.proposal.SearchFilters.apply$default$12, org.make.core.proposal.SearchFilters.apply$default$13, org.make.core.proposal.SearchFilters.apply$default$14, org.make.core.proposal.SearchFilters.apply$default$15, org.make.core.proposal.SearchFilters.apply$default$16, org.make.core.proposal.SearchFilters.apply$default$17, org.make.core.proposal.SearchFilters.apply$default$18, org.make.core.proposal.SearchFilters.apply$default$19, org.make.core.proposal.SearchFilters.apply$default$20, org.make.core.proposal.SearchFilters.apply$default$21, org.make.core.proposal.SearchFilters.apply$default$22, org.make.core.proposal.SearchFilters.apply$default$23, org.make.core.proposal.SearchFilters.apply$default$24, org.make.core.proposal.SearchFilters.apply$default$25, org.make.core.proposal.SearchFilters.apply$default$26, org.make.core.proposal.SearchFilters.apply$default$27, org.make.core.proposal.SearchFilters.apply$default$28, org.make.core.proposal.SearchFilters.apply$default$29, org.make.core.proposal.SearchFilters.apply$default$30, org.make.core.proposal.SearchFilters.apply$default$31)
71 19430 3225 - 3225 Select org.make.core.proposal.SearchFilters.apply$default$5 org.make.core.proposal.SearchFilters.apply$default$5
71 19092 3220 - 3291 Apply scala.Some.apply scala.Some.apply[org.make.core.proposal.SearchFilters](org.make.core.proposal.SearchFilters.apply(scala.Some.apply[org.make.core.proposal.ProposalSearchFilter](org.make.core.proposal.ProposalSearchFilter.apply(proposalIds)), org.make.core.proposal.SearchFilters.apply$default$2, org.make.core.proposal.SearchFilters.apply$default$3, org.make.core.proposal.SearchFilters.apply$default$4, org.make.core.proposal.SearchFilters.apply$default$5, org.make.core.proposal.SearchFilters.apply$default$6, org.make.core.proposal.SearchFilters.apply$default$7, org.make.core.proposal.SearchFilters.apply$default$8, org.make.core.proposal.SearchFilters.apply$default$9, org.make.core.proposal.SearchFilters.apply$default$10, org.make.core.proposal.SearchFilters.apply$default$11, org.make.core.proposal.SearchFilters.apply$default$12, org.make.core.proposal.SearchFilters.apply$default$13, org.make.core.proposal.SearchFilters.apply$default$14, org.make.core.proposal.SearchFilters.apply$default$15, org.make.core.proposal.SearchFilters.apply$default$16, org.make.core.proposal.SearchFilters.apply$default$17, org.make.core.proposal.SearchFilters.apply$default$18, org.make.core.proposal.SearchFilters.apply$default$19, org.make.core.proposal.SearchFilters.apply$default$20, org.make.core.proposal.SearchFilters.apply$default$21, org.make.core.proposal.SearchFilters.apply$default$22, org.make.core.proposal.SearchFilters.apply$default$23, org.make.core.proposal.SearchFilters.apply$default$24, org.make.core.proposal.SearchFilters.apply$default$25, org.make.core.proposal.SearchFilters.apply$default$26, org.make.core.proposal.SearchFilters.apply$default$27, org.make.core.proposal.SearchFilters.apply$default$28, org.make.core.proposal.SearchFilters.apply$default$29, org.make.core.proposal.SearchFilters.apply$default$30, org.make.core.proposal.SearchFilters.apply$default$31))
78 19083 3450 - 3538 Apply org.make.api.userhistory.UserHistoryCoordinatorService.retrieveVoteAndQualifications OrganisationIndexationStream.this.userHistoryCoordinatorService.retrieveVoteAndQualifications(organisationId, proposalIds)
81 19087 3642 - 3909 Apply org.make.api.proposal.ProposalSearchEngine.countProposalsByQuestion OrganisationIndexationStream.this.elasticsearchProposalAPI.countProposalsByQuestion(scala.None, scala.Some.apply[Seq[org.make.core.proposal.ProposalStatus.Accepted.type]](scala.`package`.Seq.apply[org.make.core.proposal.ProposalStatus.Accepted.type](org.make.core.proposal.ProposalStatus.Accepted)), scala.Some.apply[org.make.core.user.UserId](organisationId), scala.None, scala.None, scala.None)
82 18945 3720 - 3724 Select scala.None scala.None
83 18812 3743 - 3777 Apply scala.Some.apply scala.Some.apply[Seq[org.make.core.proposal.ProposalStatus.Accepted.type]](scala.`package`.Seq.apply[org.make.core.proposal.ProposalStatus.Accepted.type](org.make.core.proposal.ProposalStatus.Accepted))
83 19180 3748 - 3776 Apply scala.collection.SeqFactory.Delegate.apply scala.`package`.Seq.apply[org.make.core.proposal.ProposalStatus.Accepted.type](org.make.core.proposal.ProposalStatus.Accepted)
83 18599 3752 - 3775 Select org.make.core.proposal.ProposalStatus.Accepted org.make.core.proposal.ProposalStatus.Accepted
84 19401 3801 - 3821 Apply scala.Some.apply scala.Some.apply[org.make.core.user.UserId](organisationId)
85 19232 3842 - 3846 Select scala.None scala.None
86 18886 3872 - 3876 Select scala.None scala.None
87 19448 3897 - 3901 Select scala.None scala.None
95 19390 4272 - 4272 Apply org.make.api.technical.elasticsearch.OrganisationIndexationStream.$anonfun.<init> new $anonfun()
95 19452 4211 - 4403 Apply scala.collection.SetOps.++ proposalsCountByQuestion.keySet.++(proposals.results.collect[Option[org.make.core.question.QuestionId]](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[org.make.core.proposal.indexed.IndexedProposal,Option[org.make.core.question.QuestionId]] with java.io.Serializable { def <init>(): <$anon: org.make.core.proposal.indexed.IndexedProposal => Option[org.make.core.question.QuestionId]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: org.make.core.proposal.indexed.IndexedProposal, B1 >: Option[org.make.core.question.QuestionId]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[org.make.core.proposal.indexed.IndexedProposal]: org.make.core.proposal.indexed.IndexedProposal): org.make.core.proposal.indexed.IndexedProposal @unchecked) match { case (proposal @ _) if votesByProposals.keys.toSet[org.make.core.proposal.ProposalId].contains(proposal.id) => proposal.question.map[org.make.core.question.QuestionId](((x$1: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$1.questionId)) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: org.make.core.proposal.indexed.IndexedProposal): Boolean = ((x1.asInstanceOf[org.make.core.proposal.indexed.IndexedProposal]: org.make.core.proposal.indexed.IndexedProposal): org.make.core.proposal.indexed.IndexedProposal @unchecked) match { case (proposal @ _) if votesByProposals.keys.toSet[org.make.core.proposal.ProposalId].contains(proposal.id) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[org.make.core.proposal.indexed.IndexedProposal,Option[org.make.core.question.QuestionId]])).flatten[org.make.core.question.QuestionId](scala.Predef.$conforms[Option[org.make.core.question.QuestionId]]))
96 18628 4299 - 4348 Apply scala.collection.SetOps.contains votesByProposals.keys.toSet[org.make.core.proposal.ProposalId].contains(proposal.id)
96 18813 4352 - 4387 Apply scala.Option.map proposal.question.map[org.make.core.question.QuestionId](((x$1: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$1.questionId))
96 18947 4336 - 4347 Select org.make.core.proposal.indexed.IndexedProposal.id proposal.id
96 19183 4374 - 4386 Select org.make.core.proposal.indexed.IndexedProposalQuestion.questionId x$1.questionId
97 19234 4396 - 4396 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Option[org.make.core.question.QuestionId]]
97 18874 4246 - 4403 ApplyToImplicitArgs scala.collection.IterableOps.flatten proposals.results.collect[Option[org.make.core.question.QuestionId]](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[org.make.core.proposal.indexed.IndexedProposal,Option[org.make.core.question.QuestionId]] with java.io.Serializable { def <init>(): <$anon: org.make.core.proposal.indexed.IndexedProposal => Option[org.make.core.question.QuestionId]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: org.make.core.proposal.indexed.IndexedProposal, B1 >: Option[org.make.core.question.QuestionId]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[org.make.core.proposal.indexed.IndexedProposal]: org.make.core.proposal.indexed.IndexedProposal): org.make.core.proposal.indexed.IndexedProposal @unchecked) match { case (proposal @ _) if votesByProposals.keys.toSet[org.make.core.proposal.ProposalId].contains(proposal.id) => proposal.question.map[org.make.core.question.QuestionId](((x$1: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$1.questionId)) case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: org.make.core.proposal.indexed.IndexedProposal): Boolean = ((x1.asInstanceOf[org.make.core.proposal.indexed.IndexedProposal]: org.make.core.proposal.indexed.IndexedProposal): org.make.core.proposal.indexed.IndexedProposal @unchecked) match { case (proposal @ _) if votesByProposals.keys.toSet[org.make.core.proposal.ProposalId].contains(proposal.id) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[org.make.core.proposal.indexed.IndexedProposal,Option[org.make.core.question.QuestionId]])).flatten[org.make.core.question.QuestionId](scala.Predef.$conforms[Option[org.make.core.question.QuestionId]])
101 18630 4514 - 4585 Apply scala.collection.IterableOps.filter proposals.results.filter(((x$2: org.make.core.proposal.indexed.IndexedProposal) => x$2.question.exists(((x$3: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$3.questionId.==(questionId)))))
101 18791 4539 - 4584 Apply scala.Option.exists x$2.question.exists(((x$3: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$3.questionId.==(questionId)))
101 19111 4557 - 4583 Apply java.lang.Object.== x$3.questionId.==(questionId)
102 19392 4641 - 4641 Apply org.make.api.technical.elasticsearch.OrganisationIndexationStream.$anonfun.<init> new $anonfun()
103 18848 4681 - 4732 Apply scala.collection.IterableOnceOps.exists votedProposalsOfQuestion.exists(((x$4: org.make.core.proposal.indexed.IndexedProposal) => x$4.id.==(proposalId)))
103 19161 4713 - 4731 Apply java.lang.Object.== x$4.id.==(proposalId)
104 19217 4616 - 4756 Select scala.collection.IterableOnceOps.size votesByProposals.collect[org.make.core.history.HistoryActions.VoteAndQualifications](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications),org.make.core.history.HistoryActions.VoteAndQualifications] with java.io.Serializable { def <init>(): <$anon: ((org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)) => org.make.core.history.HistoryActions.VoteAndQualifications> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications), B1 >: org.make.core.history.HistoryActions.VoteAndQualifications](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)]: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications) @unchecked) match { case (_1: org.make.core.proposal.ProposalId, _2: org.make.core.history.HistoryActions.VoteAndQualifications): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)((proposalId @ _), (votes @ _)) if votedProposalsOfQuestion.exists(((x$4: org.make.core.proposal.indexed.IndexedProposal) => x$4.id.==(proposalId))) => votes case (defaultCase$ @ _) => default.apply(x2) }; final def isDefinedAt(x2: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): Boolean = ((x2.asInstanceOf[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)]: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications) @unchecked) match { case (_1: org.make.core.proposal.ProposalId, _2: org.make.core.history.HistoryActions.VoteAndQualifications): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)((proposalId @ _), (votes @ _)) if votedProposalsOfQuestion.exists(((x$4: org.make.core.proposal.indexed.IndexedProposal) => x$4.id.==(proposalId))) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications),org.make.core.history.HistoryActions.VoteAndQualifications])).size
105 19438 4791 - 4847 Select scala.Long.toInt proposalsCountByQuestion.getOrElse[Long](questionId, 0L).toInt
105 18880 4838 - 4840 Literal <nosymbol> 0L
106 19101 4856 - 4929 Apply org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion.apply org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion.apply(questionId, proposalsCount, votesCount)
107 18793 4411 - 4943 Select scala.collection.IterableOnceOps.toSeq questionIds.map[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion](((questionId: org.make.core.question.QuestionId) => { val votedProposalsOfQuestion: Seq[org.make.core.proposal.indexed.IndexedProposal] = proposals.results.filter(((x$2: org.make.core.proposal.indexed.IndexedProposal) => x$2.question.exists(((x$3: org.make.core.proposal.indexed.IndexedProposalQuestion) => x$3.questionId.==(questionId))))); val votesCount: Int = votesByProposals.collect[org.make.core.history.HistoryActions.VoteAndQualifications](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications),org.make.core.history.HistoryActions.VoteAndQualifications] with java.io.Serializable { def <init>(): <$anon: ((org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)) => org.make.core.history.HistoryActions.VoteAndQualifications> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications), B1 >: org.make.core.history.HistoryActions.VoteAndQualifications](x2: A1, default: A1 => B1): B1 = ((x2.asInstanceOf[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)]: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications) @unchecked) match { case (_1: org.make.core.proposal.ProposalId, _2: org.make.core.history.HistoryActions.VoteAndQualifications): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)((proposalId @ _), (votes @ _)) if votedProposalsOfQuestion.exists(((x$4: org.make.core.proposal.indexed.IndexedProposal) => x$4.id.==(proposalId))) => votes case (defaultCase$ @ _) => default.apply(x2) }; final def isDefinedAt(x2: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): Boolean = ((x2.asInstanceOf[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)]: (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications) @unchecked) match { case (_1: org.make.core.proposal.ProposalId, _2: org.make.core.history.HistoryActions.VoteAndQualifications): (org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications)((proposalId @ _), (votes @ _)) if votedProposalsOfQuestion.exists(((x$4: org.make.core.proposal.indexed.IndexedProposal) => x$4.id.==(proposalId))) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[(org.make.core.proposal.ProposalId, org.make.core.history.HistoryActions.VoteAndQualifications),org.make.core.history.HistoryActions.VoteAndQualifications])).size; val proposalsCount: Int = proposalsCountByQuestion.getOrElse[Long](questionId, 0L).toInt; org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion.apply(questionId, proposalsCount, votesCount) })).toSeq
133 18582 4955 - 6130 ApplyToImplicitArgs akka.stream.scaladsl.Source.runWith akka.stream.scaladsl.Source.apply[org.make.core.user.User](organisations).mapAsync[org.make.core.user.indexed.IndexedOrganisation](OrganisationIndexationStream.this.parallelism)(((organisation: org.make.core.user.User) => { val futureCountsByQuestion: scala.concurrent.Future[Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]] = futureVotedProposals(organisation.userId).flatMap[Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]](((proposalIds: Seq[org.make.core.proposal.ProposalId]) => futureFindProposals(proposalIds).flatMap[Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]](((proposals: org.make.core.proposal.indexed.ProposalsSearchResult) => futureVotes(proposalIds, organisation.userId).flatMap[Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]](((votesByProposals: Map[org.make.core.proposal.ProposalId,org.make.core.history.HistoryActions.VoteAndQualifications]) => futureProposalsCountByQuestion(organisation.userId).map[Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]](((proposalsCountByQuestion: Map[org.make.core.question.QuestionId,Long]) => generateProposalsAndVotesCountsByQuestion(proposals, votesByProposals, proposalsCountByQuestion)))(scala.concurrent.ExecutionContext.Implicits.global)))(scala.concurrent.ExecutionContext.Implicits.global)))(scala.concurrent.ExecutionContext.Implicits.global)))(scala.concurrent.ExecutionContext.Implicits.global); futureCountsByQuestion.map[org.make.core.user.indexed.IndexedOrganisation](((countsByQuestion: Seq[org.make.core.user.indexed.ProposalsAndVotesCountsByQuestion]) => org.make.core.user.indexed.IndexedOrganisation.createFromOrganisation(organisation, countsByQuestion)))(scala.concurrent.ExecutionContext.Implicits.global) })).groupedWithin(20, scala.concurrent.duration.`package`.DurationDouble(500.0).milliseconds).mapAsync[akka.Done](OrganisationIndexationStream.this.singleAsync)(((organisations: Seq[org.make.core.user.indexed.IndexedOrganisation]) => OrganisationIndexationStream.this.elasticsearchOrganisationAPI.indexOrganisations(organisations, scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(organisationIndexName))).recoverWith[akka.Done](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,scala.concurrent.Future[akka.Done.type]] with java.io.Serializable { def <init>(): <$anon: Throwable => scala.concurrent.Future[akka.Done.type]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: scala.concurrent.Future[akka.Done.type]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => { OrganisationIndexationStream.this.logger.error("Indexing of one organisation chunk failed", e); scala.concurrent.Future.successful[akka.Done.type](akka.Done) } case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,scala.concurrent.Future[akka.Done.type]]))(scala.concurrent.ExecutionContext.Implicits.global))).runWith[scala.concurrent.Future[akka.Done]](akka.stream.scaladsl.Sink.ignore)(mat)