1 /* 2 * Make.org Core API 3 * Copyright (C) 2018 Make.org 4 * 5 * This program is free software: you can redistribute it and/or modify 6 * it under the terms of the GNU Affero General Public License as 7 * published by the Free Software Foundation, either version 3 of the 8 * License, or (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU Affero General Public License for more details. 14 * 15 * You should have received a copy of the GNU Affero General Public License 16 * along with this program. If not, see <https://www.gnu.org/licenses/>. 17 * 18 */ 19 20 package org.make.api.technical 21 22 import akka.NotUsed 23 import akka.stream.scaladsl.Source 24 25 import scala.concurrent.{ExecutionContext, Future} 26 27 object StreamUtils { 28 def asyncPageToPageSource[T]( 29 pageFunc: Int => Future[Seq[T]] 30 )(implicit executionContext: ExecutionContext): Source[Seq[T], NotUsed] = { 31 Source.unfoldAsync(0) { offset => 32 val futureResults: Future[Seq[T]] = pageFunc(offset) 33 futureResults.map { results => 34 if (results.isEmpty) { 35 None 36 } else { 37 Some((offset + results.size, results)) 38 } 39 } 40 41 } 42 } 43 }
| Line | Stmt Id | Pos | Tree | Symbol | Tests | Code |
|---|---|---|---|---|---|---|
| 31 | 369 | 1056 - 1322 | Apply | akka.stream.scaladsl.Source.unfoldAsync | org.make.api.technical.streamutilstest,org.make.api.sessionhistory.sessionhistorycoordinatortest,org.make.api.technical.crm.crmservicecomponenttest | akka.stream.scaladsl.Source.unfoldAsync[Int, Seq[T]](0)(((offset: Int) => { val futureResults: scala.concurrent.Future[Seq[T]] = pageFunc.apply(offset); futureResults.map[Option[(Int, Seq[T])]](((results: Seq[T]) => if (results.isEmpty) scala.None else scala.Some.apply[(Int, Seq[T])](scala.Tuple2.apply[Int, Seq[T]](offset.+(results.size), results))))(executionContext) })) |
| 31 | 384 | 1075 - 1076 | Literal | <nosymbol> | org.make.api.technical.streamutilstest,org.make.api.sessionhistory.sessionhistorycoordinatortest,org.make.api.technical.crm.crmservicecomponenttest | 0 |
| 32 | 360 | 1132 - 1148 | Apply | scala.Function1.apply | pageFunc.apply(offset) | |
| 33 | 334 | 1155 - 1315 | ApplyToImplicitArgs | scala.concurrent.Future.map | futureResults.map[Option[(Int, Seq[T])]](((results: Seq[T]) => if (results.isEmpty) scala.None else scala.Some.apply[(Int, Seq[T])](scala.Tuple2.apply[Int, Seq[T]](offset.+(results.size), results))))(executionContext) |