1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.technical
21 
22 import akka.NotUsed
23 import akka.stream.scaladsl.Source
24 
25 import scala.concurrent.{ExecutionContext, Future}
26 
27 object StreamUtils {
28   def asyncPageToPageSource[T](
29     pageFunc: Int => Future[Seq[T]]
30   )(implicit executionContext: ExecutionContext): Source[Seq[T], NotUsed] = {
31     Source.unfoldAsync(0) { offset =>
32       val futureResults: Future[Seq[T]] = pageFunc(offset)
33       futureResults.map { results =>
34         if (results.isEmpty) {
35           None
36         } else {
37           Some((offset + results.size, results))
38         }
39       }
40 
41     }
42   }
43 }
Line Stmt Id Pos Tree Symbol Tests Code
31 369 1056 - 1322 Apply akka.stream.scaladsl.Source.unfoldAsync org.make.api.technical.streamutilstest,org.make.api.sessionhistory.sessionhistorycoordinatortest,org.make.api.technical.crm.crmservicecomponenttest akka.stream.scaladsl.Source.unfoldAsync[Int, Seq[T]](0)(((offset: Int) => { val futureResults: scala.concurrent.Future[Seq[T]] = pageFunc.apply(offset); futureResults.map[Option[(Int, Seq[T])]](((results: Seq[T]) => if (results.isEmpty) scala.None else scala.Some.apply[(Int, Seq[T])](scala.Tuple2.apply[Int, Seq[T]](offset.+(results.size), results))))(executionContext) }))
31 384 1075 - 1076 Literal <nosymbol> org.make.api.technical.streamutilstest,org.make.api.sessionhistory.sessionhistorycoordinatortest,org.make.api.technical.crm.crmservicecomponenttest 0
32 360 1132 - 1148 Apply scala.Function1.apply pageFunc.apply(offset)
33 334 1155 - 1315 ApplyToImplicitArgs scala.concurrent.Future.map futureResults.map[Option[(Int, Seq[T])]](((results: Seq[T]) => if (results.isEmpty) scala.None else scala.Some.apply[(Int, Seq[T])](scala.Tuple2.apply[Int, Seq[T]](offset.+(results.size), results))))(executionContext)