1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.elasticsearch
21 
22 import akka.stream.scaladsl.Flow
23 import akka.NotUsed
24 import com.sksamuel.elastic4s.Index
25 import grizzled.slf4j.Logging
26 import org.make.api.operation.{OperationOfQuestionSearchEngineComponent, OperationServiceComponent}
27 import org.make.api.question.QuestionServiceComponent
28 import org.make.core.elasticsearch.IndexationStatus
29 import org.make.core.operation.indexed.IndexedOperationOfQuestion
30 import org.make.core.operation.{OperationOfQuestion, SimpleOperation}
31 import org.make.core.question.Question
32 
33 import scala.concurrent.ExecutionContext.Implicits.global
34 import scala.concurrent.Future
35 
36 trait OperationOfQuestionIndexationStream
37     extends IndexationStream
38     with OperationOfQuestionSearchEngineComponent
39     with QuestionServiceComponent
40     with OperationServiceComponent
41     with Logging {
42 
43   object OperationOfQuestionStream {
44 
45     def flowIndexOperationOfQuestions(
46       operationOfQuestionIndexName: String
47     ): Flow[OperationOfQuestion, IndexationStatus, NotUsed] =
48       grouped[OperationOfQuestion].via(runIndexOperationOfQuestion(operationOfQuestionIndexName))
49 
50     def runIndexOperationOfQuestion(
51       operationOfQuestionIndexName: String
52     ): Flow[Seq[OperationOfQuestion], IndexationStatus, NotUsed] = {
53       Flow[Seq[OperationOfQuestion]]
54         .mapAsync(singleAsync)(
55           operationOfQuestions => executeIndexOperationOfQuestions(operationOfQuestions, operationOfQuestionIndexName)
56         )
57     }
58 
59     private def executeIndexOperationOfQuestions(
60       operationOfQuestions: Seq[OperationOfQuestion],
61       operationOfQuestionIndexName: String
62     ): Future[IndexationStatus] = {
63 
64       val futureQuestion: Future[Seq[Question]] =
65         questionService.getQuestions(operationOfQuestions.map(_.questionId))
66       val futureOperation: Future[Seq[SimpleOperation]] =
67         operationService.findSimple()
68 
69       val futureIndexedOperationOfQuestion = for {
70         questions  <- futureQuestion
71         operations <- futureOperation
72       } yield (questions, operations)
73 
74       futureIndexedOperationOfQuestion.map {
75         case (questions, operations) =>
76           operationOfQuestions.map { operationOfQuestion =>
77             for {
78               question  <- questions.find(_.questionId == operationOfQuestion.questionId)
79               operation <- operations.find(_.operationId == operationOfQuestion.operationId)
80             } yield IndexedOperationOfQuestion
81               .createFromOperationOfQuestion(operationOfQuestion, operation, question)
82           }
83       }.map(_.flatten).flatMap { operationOfQuestions =>
84         elasticsearchOperationOfQuestionAPI
85           .indexOperationOfQuestions(operationOfQuestions, Some(Index(operationOfQuestionIndexName)))
86       }
87     }
88   }
89 
90 }
Line Stmt Id Pos Tree Symbol Tests Code
48 18650 1813 - 1870 Apply org.make.api.technical.elasticsearch.OperationOfQuestionIndexationStream.OperationOfQuestionStream.runIndexOperationOfQuestion OperationOfQuestionStream.this.runIndexOperationOfQuestion(operationOfQuestionIndexName)
48 19227 1780 - 1871 Apply akka.stream.scaladsl.Flow.via OperationOfQuestionIndexationStream.this.grouped[org.make.core.operation.OperationOfQuestion].via[org.make.core.elasticsearch.IndexationStatus, akka.NotUsed](OperationOfQuestionStream.this.runIndexOperationOfQuestion(operationOfQuestionIndexName))
54 18891 2077 - 2088 Select org.make.api.technical.elasticsearch.IndexationStream.singleAsync OperationOfQuestionIndexationStream.this.singleAsync
54 19252 2028 - 2219 Apply akka.stream.scaladsl.FlowOps.mapAsync akka.stream.scaladsl.Flow.apply[Seq[org.make.core.operation.OperationOfQuestion]].mapAsync[org.make.core.elasticsearch.IndexationStatus](OperationOfQuestionIndexationStream.this.singleAsync)(((operationOfQuestions: Seq[org.make.core.operation.OperationOfQuestion]) => OperationOfQuestionStream.this.executeIndexOperationOfQuestions(operationOfQuestions, operationOfQuestionIndexName)))
55 18719 2125 - 2209 Apply org.make.api.technical.elasticsearch.OperationOfQuestionIndexationStream.OperationOfQuestionStream.executeIndexOperationOfQuestions OperationOfQuestionStream.this.executeIndexOperationOfQuestions(operationOfQuestions, operationOfQuestionIndexName)
65 18925 2523 - 2535 Select org.make.core.operation.OperationOfQuestion.questionId x$1.questionId
65 18590 2498 - 2536 Apply scala.collection.IterableOps.map operationOfQuestions.map[org.make.core.question.QuestionId](((x$1: org.make.core.operation.OperationOfQuestion) => x$1.questionId))
65 19325 2469 - 2537 Apply org.make.api.question.QuestionService.getQuestions OperationOfQuestionIndexationStream.this.questionService.getQuestions(operationOfQuestions.map[org.make.core.question.QuestionId](((x$1: org.make.core.operation.OperationOfQuestion) => x$1.questionId)))
67 18642 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$1 qual$1.findSimple$default$1
67 19013 2604 - 2620 Select org.make.api.operation.OperationServiceComponent.operationService OperationOfQuestionIndexationStream.this.operationService
67 18872 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$3 qual$1.findSimple$default$3
67 18743 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$4 qual$1.findSimple$default$4
67 19290 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$5 qual$1.findSimple$default$5
67 18913 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$6 qual$1.findSimple$default$6
67 19194 2621 - 2621 Select org.make.api.operation.OperationService.findSimple$default$2 qual$1.findSimple$default$2
67 18593 2604 - 2633 Apply org.make.api.operation.OperationService.findSimple qual$1.findSimple(x$1, x$2, x$3, x$4, x$5, x$6)
70 19223 2705 - 2705 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
70 18877 2680 - 2798 ApplyToImplicitArgs scala.concurrent.Future.flatMap futureQuestion.flatMap[(Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation])](((questions: Seq[org.make.core.question.Question]) => futureOperation.map[(Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation])](((operations: Seq[org.make.core.operation.SimpleOperation]) => scala.Tuple2.apply[Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation]](questions, operations)))(scala.concurrent.ExecutionContext.Implicits.global)))(scala.concurrent.ExecutionContext.Implicits.global)
71 19018 2742 - 2742 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
71 18644 2731 - 2798 ApplyToImplicitArgs scala.concurrent.Future.map futureOperation.map[(Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation])](((operations: Seq[org.make.core.operation.SimpleOperation]) => scala.Tuple2.apply[Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation]](questions, operations)))(scala.concurrent.ExecutionContext.Implicits.global)
72 19157 2775 - 2798 Apply scala.Tuple2.apply scala.Tuple2.apply[Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation]](questions, operations)
74 18902 2843 - 2843 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
76 19225 2895 - 3291 Apply scala.collection.IterableOps.map operationOfQuestions.map[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]](((operationOfQuestion: org.make.core.operation.OperationOfQuestion) => questions.find(((x$2: org.make.core.question.Question) => x$2.questionId.==(operationOfQuestion.questionId))).flatMap[org.make.core.operation.indexed.IndexedOperationOfQuestion](((question: org.make.core.question.Question) => operations.find(((x$3: org.make.core.operation.SimpleOperation) => x$3.operationId.==(operationOfQuestion.operationId))).map[org.make.core.operation.indexed.IndexedOperationOfQuestion](((operation: org.make.core.operation.SimpleOperation) => org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion(operationOfQuestion, operation, question)))))))
78 19294 3005 - 3051 Apply java.lang.Object.== x$2.questionId.==(operationOfQuestion.questionId)
78 18736 3021 - 3051 Select org.make.core.operation.OperationOfQuestion.questionId operationOfQuestion.questionId
78 18688 2957 - 3279 Apply scala.Option.flatMap questions.find(((x$2: org.make.core.question.Question) => x$2.questionId.==(operationOfQuestion.questionId))).flatMap[org.make.core.operation.indexed.IndexedOperationOfQuestion](((question: org.make.core.question.Question) => operations.find(((x$3: org.make.core.operation.SimpleOperation) => x$3.operationId.==(operationOfQuestion.operationId))).map[org.make.core.operation.indexed.IndexedOperationOfQuestion](((operation: org.make.core.operation.SimpleOperation) => org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion(operationOfQuestion, operation, question)))))
79 19004 3067 - 3279 Apply scala.Option.map operations.find(((x$3: org.make.core.operation.SimpleOperation) => x$3.operationId.==(operationOfQuestion.operationId))).map[org.make.core.operation.indexed.IndexedOperationOfQuestion](((operation: org.make.core.operation.SimpleOperation) => org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion(operationOfQuestion, operation, question)))
79 18623 3096 - 3144 Apply java.lang.Object.== x$3.operationId.==(operationOfQuestion.operationId)
79 18939 3113 - 3144 Select org.make.core.operation.OperationOfQuestion.operationId operationOfQuestion.operationId
81 19159 3166 - 3279 Apply org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion(operationOfQuestion, operation, question)
83 18924 3303 - 3303 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
83 19297 3304 - 3313 ApplyToImplicitArgs scala.collection.IterableOps.flatten x$4.flatten[org.make.core.operation.indexed.IndexedOperationOfQuestion](scala.Predef.$conforms[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]])
83 18651 3323 - 3323 Select scala.concurrent.ExecutionContext.Implicits.global scala.concurrent.ExecutionContext.Implicits.global
83 19210 2806 - 3502 ApplyToImplicitArgs scala.concurrent.Future.flatMap futureIndexedOperationOfQuestion.map[Seq[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]]](((x0$1: (Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation])) => x0$1 match { case (_1: Seq[org.make.core.question.Question], _2: Seq[org.make.core.operation.SimpleOperation]): (Seq[org.make.core.question.Question], Seq[org.make.core.operation.SimpleOperation])((questions @ _), (operations @ _)) => operationOfQuestions.map[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]](((operationOfQuestion: org.make.core.operation.OperationOfQuestion) => questions.find(((x$2: org.make.core.question.Question) => x$2.questionId.==(operationOfQuestion.questionId))).flatMap[org.make.core.operation.indexed.IndexedOperationOfQuestion](((question: org.make.core.question.Question) => operations.find(((x$3: org.make.core.operation.SimpleOperation) => x$3.operationId.==(operationOfQuestion.operationId))).map[org.make.core.operation.indexed.IndexedOperationOfQuestion](((operation: org.make.core.operation.SimpleOperation) => org.make.core.operation.indexed.IndexedOperationOfQuestion.createFromOperationOfQuestion(operationOfQuestion, operation, question))))))) }))(scala.concurrent.ExecutionContext.Implicits.global).map[Seq[org.make.core.operation.indexed.IndexedOperationOfQuestion]](((x$4: Seq[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]]) => x$4.flatten[org.make.core.operation.indexed.IndexedOperationOfQuestion](scala.Predef.$conforms[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]])))(scala.concurrent.ExecutionContext.Implicits.global).flatMap[org.make.core.elasticsearch.IndexationStatus](((operationOfQuestions: Seq[org.make.core.operation.indexed.IndexedOperationOfQuestion]) => OperationOfQuestionIndexationStream.this.elasticsearchOperationOfQuestionAPI.indexOperationOfQuestions(operationOfQuestions, scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(operationOfQuestionIndexName)))))(scala.concurrent.ExecutionContext.Implicits.global)
83 18737 3306 - 3306 TypeApply scala.Predef.$conforms scala.Predef.$conforms[Option[org.make.core.operation.indexed.IndexedOperationOfQuestion]]
85 19151 3452 - 3493 Apply scala.Some.apply scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(operationOfQuestionIndexName))
85 19007 3357 - 3494 Apply org.make.api.operation.OperationOfQuestionSearchEngine.indexOperationOfQuestions OperationOfQuestionIndexationStream.this.elasticsearchOperationOfQuestionAPI.indexOperationOfQuestions(operationOfQuestions, scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(operationOfQuestionIndexName)))
85 18626 3457 - 3492 Apply com.sksamuel.elastic4s.Index.apply com.sksamuel.elastic4s.Index.apply(operationOfQuestionIndexName)