1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical.elasticsearch
21 
22 import akka.NotUsed
23 import akka.stream.scaladsl.Flow
24 import scala.concurrent.duration.DurationInt
25 
26 trait IndexationStream {
27   //TODO: Load these values from conf
28   val parallelism = 5
29   val singleAsync = 1
30 
31   def filterIsDefined[T]: Flow[Option[T], T, NotUsed] = Flow[Option[T]].collect { case Some(t) => t }
32   def filterIsEmpty[T](item: T): Flow[Option[T], T, NotUsed] = Flow[Option[T]].filter(_.isEmpty).map(_ => item)
33   def grouped[T]: Flow[T, Seq[T], NotUsed] = Flow[T].groupedWithin(10, 20.milliseconds)
34 
35 }
Line Stmt Id Pos Tree Symbol Tests Code
28 19021 973 - 974 Literal <nosymbol> org.make.api.technical.elasticsearch.indexationcomponenttest 5
29 18659 995 - 996 Literal <nosymbol> org.make.api.technical.elasticsearch.indexationcomponenttest 1
31 19222 1078 - 1078 Apply org.make.api.technical.elasticsearch.IndexationStream.$anonfun.<init> new $anonfun()
31 18889 1054 - 1099 Apply akka.stream.scaladsl.FlowOps.collect akka.stream.scaladsl.Flow.apply[Option[T]].collect[T](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Option[T],T] with java.io.Serializable { def <init>(): <$anon: Option[T] => T> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Option[T], B1 >: T](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Option[T]]: Option[T]): Option[T] @unchecked) match { case (value: T): Some[T]((t @ _)) => t case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Option[T]): Boolean = ((x1.asInstanceOf[Option[T]]: Option[T]): Option[T] @unchecked) match { case (value: T): Some[T]((t @ _)) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Option[T],T]))
32 18728 1186 - 1195 Select scala.Option.isEmpty x$1.isEmpty
32 19266 1163 - 1211 Apply akka.stream.scaladsl.FlowOps.map akka.stream.scaladsl.Flow.apply[Option[T]].filter(((x$1: Option[T]) => x$1.isEmpty)).map[T](((x$2: Option[T]) => item))
33 18942 1279 - 1281 Literal <nosymbol> 10
33 18971 1257 - 1299 Apply akka.stream.scaladsl.FlowOps.groupedWithin akka.stream.scaladsl.Flow.apply[T].groupedWithin(10, scala.concurrent.duration.`package`.DurationInt(20).milliseconds)
33 18606 1283 - 1285 Literal <nosymbol> 20
33 19323 1283 - 1298 Select scala.concurrent.duration.DurationConversions.milliseconds scala.concurrent.duration.`package`.DurationInt(20).milliseconds