1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.elasticsearch
21 
22 import akka.stream.scaladsl.Flow
23 import akka.{Done, NotUsed}
24 import com.sksamuel.elastic4s.Index
25 import grizzled.slf4j.Logging
26 import org.make.api.idea.IdeaSearchEngineComponent
27 import org.make.api.proposal.ProposalSearchEngineComponent
28 import org.make.core.idea.indexed.IndexedIdea
29 import org.make.core.idea.{Idea, IdeaId}
30 
31 import scala.concurrent.ExecutionContext.Implicits.global
32 import scala.concurrent.Future
33 
34 trait IdeaIndexationStream
35     extends IndexationStream
36     with IdeaSearchEngineComponent
37     with ProposalSearchEngineComponent
38     with Logging {
39   object IdeaStream {
40     def runIndexIdeas(ideaIndexName: String): Flow[Seq[Idea], Done, NotUsed] =
41       Flow[Seq[Idea]].mapAsync(parallelism)(ideas => executeIndexIdeas(ideas, ideaIndexName))
42 
43     def flowIndexIdeas(ideaIndexName: String): Flow[Idea, Done, NotUsed] =
44       grouped[Idea].via(runIndexIdeas(ideaIndexName))
45 
46   }
47 
48   private def executeIndexIdeas(ideas: Seq[Idea], ideaIndexName: String): Future[Done] = {
49     elasticsearchProposalAPI
50       .countProposalsByIdea(ideas.map(_.ideaId))
51       .flatMap { countProposalsByIdea =>
52         def proposalsCount(ideaId: IdeaId): Int = countProposalsByIdea.getOrElse(ideaId, 0L).toInt
53         elasticsearchIdeaAPI
54           .indexIdeas(
55             ideas.map(idea => IndexedIdea.createFromIdea(idea, proposalsCount(idea.ideaId))),
56             Some(Index(ideaIndexName))
57           )
58       }
59       .recoverWith {
60         case e =>
61           logger.error("Indexing ideas failed", e)
62           Future.successful(Done)
63       }
64   }
65 }
Line Stmt Id Pos Tree Symbol Tests Code
41 19304 1487 - 1498 Select org.make.api.technical.elasticsearch.IndexationStream.parallelism IdeaIndexationStream.this.parallelism
41 18786 1462 - 1549 Apply akka.stream.scaladsl.FlowOps.mapAsync akka.stream.scaladsl.Flow.apply[Seq[org.make.core.idea.Idea]].mapAsync[akka.Done](IdeaIndexationStream.this.parallelism)(((ideas: Seq[org.make.core.idea.Idea]) => IdeaIndexationStream.this.executeIndexIdeas(ideas, ideaIndexName)))
41 19120 1509 - 1548 Apply org.make.api.technical.elasticsearch.IdeaIndexationStream.executeIndexIdeas IdeaIndexationStream.this.executeIndexIdeas(ideas, ideaIndexName)
44 19310 1650 - 1678 Apply org.make.api.technical.elasticsearch.IdeaIndexationStream.IdeaStream.runIndexIdeas IdeaStream.this.runIndexIdeas(ideaIndexName)
44 19011 1632 - 1679 Apply akka.stream.scaladsl.Flow.via IdeaIndexationStream.this.grouped[org.make.core.idea.Idea].via[akka.Done, akka.NotUsed](IdeaStream.this.runIndexIdeas(ideaIndexName))
50 18637 1844 - 1852 Select org.make.core.idea.Idea.ideaId x$1.ideaId
50 19368 1834 - 1853 Apply scala.collection.IterableOps.map ideas.map[org.make.core.idea.IdeaId](((x$1: org.make.core.idea.Idea) => x$1.ideaId))
51 19071 1870 - 1870 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
52 18720 1946 - 1994 Select scala.Long.toInt countProposalsByIdea.getOrElse[Long](ideaId, 0L).toInt
52 19069 1985 - 1987 Literal <nosymbol> 0L
54 19373 2003 - 2191 Apply org.make.api.idea.IdeaSearchEngine.indexIdeas IdeaIndexationStream.this.elasticsearchIdeaAPI.indexIdeas(ideas.map[org.make.core.idea.indexed.IndexedIdea](((idea: org.make.core.idea.Idea) => org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, proposalsCount(idea.ideaId)))), scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(ideaIndexName)))
55 19286 2125 - 2136 Select org.make.core.idea.Idea.ideaId idea.ideaId
55 18768 2077 - 2138 Apply org.make.core.idea.indexed.IndexedIdea.createFromIdea org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, proposalsCount(idea.ideaId))
55 19345 2059 - 2139 Apply scala.collection.IterableOps.map ideas.map[org.make.core.idea.indexed.IndexedIdea](((idea: org.make.core.idea.Idea) => org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, proposalsCount(idea.ideaId))))
55 19122 2110 - 2137 Apply org.make.api.technical.elasticsearch.IdeaIndexationStream.proposalsCount proposalsCount(idea.ideaId)
56 19014 2158 - 2178 Apply com.sksamuel.elastic4s.Index.apply com.sksamuel.elastic4s.Index.apply(ideaIndexName)
56 18677 2153 - 2179 Apply scala.Some.apply scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(ideaIndexName))
59 18771 2219 - 2219 Apply org.make.api.technical.elasticsearch.IdeaIndexationStream.$anonfun.<init> new $anonfun()
59 18986 1781 - 2331 ApplyToImplicitArgs scala.concurrent.Future.recoverWith IdeaIndexationStream.this.elasticsearchProposalAPI.countProposalsByIdea(ideas.map[org.make.core.idea.IdeaId](((x$1: org.make.core.idea.Idea) => x$1.ideaId))).flatMap[akka.Done](((countProposalsByIdea: Map[org.make.core.idea.IdeaId,Long]) => { def proposalsCount(ideaId: org.make.core.idea.IdeaId): Int = countProposalsByIdea.getOrElse[Long](ideaId, 0L).toInt; IdeaIndexationStream.this.elasticsearchIdeaAPI.indexIdeas(ideas.map[org.make.core.idea.indexed.IndexedIdea](((idea: org.make.core.idea.Idea) => org.make.core.idea.indexed.IndexedIdea.createFromIdea(idea, proposalsCount(idea.ideaId)))), scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(ideaIndexName))) }))(scala.concurrent.ExecutionContext.Implicits.global).recoverWith[akka.Done](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Throwable,scala.concurrent.Future[akka.Done]] with java.io.Serializable { def <init>(): <$anon: Throwable => scala.concurrent.Future[akka.Done]> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Throwable, B1 >: scala.concurrent.Future[akka.Done]](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => { IdeaIndexationStream.this.logger.error("Indexing ideas failed", e); scala.concurrent.Future.successful[akka.Done.type](akka.Done) } case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Throwable): Boolean = ((x1.asInstanceOf[Throwable]: Throwable): Throwable @unchecked) match { case (e @ _) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Throwable,scala.concurrent.Future[akka.Done]]))(scala.concurrent.ExecutionContext.Implicits.global)
59 19347 2219 - 2219 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
61 18709 2249 - 2289 Apply grizzled.slf4j.Logger.error IdeaIndexationStream.this.logger.error("Indexing ideas failed", e)
62 19093 2300 - 2323 Apply scala.concurrent.Future.successful scala.concurrent.Future.successful[akka.Done.type](akka.Done)
62 19289 2318 - 2322 Select akka.Done akka.Done