1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.elasticsearch
21 
22 import akka.NotUsed
23 import akka.stream.scaladsl.Flow
24 import com.sksamuel.elastic4s.Index
25 import grizzled.slf4j.Logging
26 import org.make.api.post.PostSearchEngineComponent
27 import org.make.core.elasticsearch.IndexationStatus
28 import org.make.core.post.Post
29 import org.make.core.post.indexed.IndexedPost
30 
31 import scala.concurrent.Future
32 
33 trait PostIndexationStream extends IndexationStream with PostSearchEngineComponent with Logging {
34 
35   object PostStream {
36 
37     def flowIndexPosts(postIndexName: String): Flow[Post, IndexationStatus, NotUsed] =
38       grouped[Post].via(runIndexPost(postIndexName))
39 
40     def runIndexPost(postIndexName: String): Flow[Seq[Post], IndexationStatus, NotUsed] = {
41       Flow[Seq[Post]].mapAsync(singleAsync)(posts => executeIndexPosts(posts, postIndexName))
42     }
43 
44     private def executeIndexPosts(posts: Seq[Post], postIndexName: String): Future[IndexationStatus] = {
45       elasticsearchPostAPI
46         .indexPosts(posts.map(post => IndexedPost.createFromPost(post)), Some(Index(postIndexName)))
47     }
48 
49   }
50 }
Line Stmt Id Pos Tree Symbol Tests Code
38 18810 1338 - 1384 Apply akka.stream.scaladsl.Flow.via PostIndexationStream.this.grouped[org.make.core.post.Post].via[org.make.core.elasticsearch.IndexationStatus, akka.NotUsed](PostStream.this.runIndexPost(postIndexName))
38 19162 1356 - 1383 Apply org.make.api.technical.elasticsearch.PostIndexationStream.PostStream.runIndexPost PostStream.this.runIndexPost(postIndexName)
41 18885 1484 - 1571 Apply akka.stream.scaladsl.FlowOps.mapAsync akka.stream.scaladsl.Flow.apply[Seq[org.make.core.post.Post]].mapAsync[org.make.core.elasticsearch.IndexationStatus](PostIndexationStream.this.singleAsync)(((posts: Seq[org.make.core.post.Post]) => PostStream.this.executeIndexPosts(posts, postIndexName)))
41 19218 1531 - 1570 Apply org.make.api.technical.elasticsearch.PostIndexationStream.PostStream.executeIndexPosts PostStream.this.executeIndexPosts(posts, postIndexName)
41 19376 1509 - 1520 Select org.make.api.technical.elasticsearch.IndexationStream.singleAsync PostIndexationStream.this.singleAsync
46 18583 1784 - 1810 Apply scala.Some.apply scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(postIndexName))
46 19084 1731 - 1782 Apply scala.collection.IterableOps.map posts.map[org.make.core.post.indexed.IndexedPost](((post: org.make.core.post.Post) => org.make.core.post.indexed.IndexedPost.createFromPost(post)))
46 19433 1749 - 1781 Apply org.make.core.post.indexed.IndexedPost.createFromPost org.make.core.post.indexed.IndexedPost.createFromPost(post)
46 19164 1690 - 1811 Apply org.make.api.post.PostSearchEngine.indexPosts PostIndexationStream.this.elasticsearchPostAPI.indexPosts(posts.map[org.make.core.post.indexed.IndexedPost](((post: org.make.core.post.Post) => org.make.core.post.indexed.IndexedPost.createFromPost(post))), scala.Some.apply[com.sksamuel.elastic4s.Index](com.sksamuel.elastic4s.Index.apply(postIndexName)))
46 18778 1789 - 1809 Apply com.sksamuel.elastic4s.Index.apply com.sksamuel.elastic4s.Index.apply(postIndexName)