1 /* 2 * Make.org Core API 3 * Copyright (C) 2018 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api.technical.elasticsearch 21 22 import akka.NotUsed 23 import akka.stream.scaladsl.Flow 24 import scala.concurrent.duration.DurationInt 25 26 trait IndexationStream { 27 //TODO: Load these values from conf 28 val parallelism = 5 29 val singleAsync = 1 30 31 def filterIsDefined[T]: Flow[Option[T], T, NotUsed] = Flow[Option[T]].collect { case Some(t) => t } 32 def filterIsEmpty[T](item: T): Flow[Option[T], T, NotUsed] = Flow[Option[T]].filter(_.isEmpty).map(_ => item) 33 def grouped[T]: Flow[T, Seq[T], NotUsed] = Flow[T].groupedWithin(10, 20.milliseconds) 34 35 }
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 28 | 19021 | 973 - 974 | Literal | <nosymbol> | org.make.api.technical.elasticsearch.indexationcomponenttest | 5 |
| 29 | 18659 | 995 - 996 | Literal | <nosymbol> | org.make.api.technical.elasticsearch.indexationcomponenttest | 1 |
| 31 | 19222 | 1078 - 1078 | Apply | org.make.api.technical.elasticsearch.IndexationStream.$anonfun.<init> | new $anonfun() | |
| 31 | 18889 | 1054 - 1099 | Apply | akka.stream.scaladsl.FlowOps.collect | akka.stream.scaladsl.Flow.apply[Option[T]].collect[T](({ @SerialVersionUID(value = 0) final <synthetic> class $anonfun extends scala.runtime.AbstractPartialFunction[Option[T],T] with java.io.Serializable { def <init>(): <$anon: Option[T] => T> = { $anonfun.super.<init>(); () }; final override def applyOrElse[A1 <: Option[T], B1 >: T](x1: A1, default: A1 => B1): B1 = ((x1.asInstanceOf[Option[T]]: Option[T]): Option[T] @unchecked) match { case (value: T): Some[T]((t @ _)) => t case (defaultCase$ @ _) => default.apply(x1) }; final def isDefinedAt(x1: Option[T]): Boolean = ((x1.asInstanceOf[Option[T]]: Option[T]): Option[T] @unchecked) match { case (value: T): Some[T]((t @ _)) => true case (defaultCase$ @ _) => false } }; new $anonfun() }: PartialFunction[Option[T],T])) | |
| 32 | 18728 | 1186 - 1195 | Select | scala.Option.isEmpty | x$1.isEmpty | |
| 32 | 19266 | 1163 - 1211 | Apply | akka.stream.scaladsl.FlowOps.map | akka.stream.scaladsl.Flow.apply[Option[T]].filter(((x$1: Option[T]) => x$1.isEmpty)).map[T](((x$2: Option[T]) => item)) | |
| 33 | 18942 | 1279 - 1281 | Literal | <nosymbol> | 10 | |
| 33 | 18971 | 1257 - 1299 | Apply | akka.stream.scaladsl.FlowOps.groupedWithin | akka.stream.scaladsl.Flow.apply[T].groupedWithin(10, scala.concurrent.duration.`package`.DurationInt(20).milliseconds) | |
| 33 | 18606 | 1283 - 1285 | Literal | <nosymbol> | 20 | |
| 33 | 19323 | 1283 - 1298 | Select | scala.concurrent.duration.DurationConversions.milliseconds | scala.concurrent.duration.`package`.DurationInt(20).milliseconds |