1 /*
2  *  Make.org Core API
3  *  Copyright (C) 2018 Make.org
4  *
5  * This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU Affero General Public License as
7  *  published by the Free Software Foundation, either version 3 of the
8  *  License, or (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU Affero General Public License for more details.
14  *
15  *  You should have received a copy of the GNU Affero General Public License
16  *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
17  *
18  */
19 
20 package org.make.api.sequence
21 
22 import akka.actor.typed.{ActorRef, Behavior}
23 import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
24 import cats.data.{NonEmptyList => Nel}
25 import eu.timepit.refined.auto.autoUnwrap
26 import org.make.api.sequence.SequenceCacheActor._
27 import org.make.api.sequence.SequenceCacheManager.CacheKey
28 import org.make.api.technical.ActorProtocol
29 import org.make.api.technical.sequence.SequenceCacheConfiguration
30 import org.make.core.proposal.indexed.IndexedProposal
31 
32 import scala.util.{Failure, Success}
33 import scala.concurrent.Future
34 import scala.concurrent.duration.Deadline
35 
36 object SequenceCacheActor {
37 
38   def apply(
39     reloadProposals: () => Future[Nel[IndexedProposal]],
40     config: SequenceCacheConfiguration
41   ): Behavior[Protocol] = {
42     Behaviors.withStash(1000) { buffer =>
43       Behaviors.setup { context =>
44         fetchProposals(reloadProposals, context)()
45         new SequenceCacheActor(context, buffer, config, fetchProposals(reloadProposals, context)).init()
46       }
47     }
48   }
49 
50   def fetchProposals(reloadProposals: () => Future[Nel[IndexedProposal]], context: ActorContext[Protocol]): () => Unit =
51     () => {
52       context.pipeToSelf(reloadProposals()) {
53         case Success(proposals) => SetProposalsPoolSuccess(proposals)
54         case Failure(e)         => SetProposalsPoolFailure(e)
55       }
56     }
57 
58   sealed trait Protocol extends ActorProtocol
59 
60   sealed trait Command extends Protocol
61   sealed trait Response extends Protocol
62 
63   final case class GetProposal(replyTo: ActorRef[IndexedProposal]) extends Command
64   final case class SetProposalsPoolSuccess(proposalsPool: Nel[IndexedProposal]) extends Command
65   final case class SetProposalsPoolFailure(error: Throwable) extends Command
66   case object Expire extends Command
67 
68   def name(key: CacheKey): String = s"${key.questionId.value}-${key.kind}"
69 }
70 
71 class SequenceCacheActor private (
72   context: ActorContext[Protocol],
73   buffer: StashBuffer[Protocol],
74   config: SequenceCacheConfiguration,
75   fetchProposals: () => Unit
76 ) {
77 
78   private val untilRefresh: Int = config.proposalsPoolSize * config.cacheRefreshCycles
79 
80   private def newDeadline: Deadline = Deadline.now + config.inactivityTimeout
81 
82   def init(): Behavior[Protocol] = {
83     Behaviors.receiveMessage {
84       case SetProposalsPoolSuccess(proposalsPool) =>
85         buffer.unstashAll(cache(untilRefresh, proposalsPool, proposalsPool.iterator, newDeadline))
86       case SetProposalsPoolFailure(e) =>
87         context.log.error("Refreshing cache failed", e)
88         Behaviors.stopped
89       case cmd =>
90         buffer.stash(cmd)
91         Behaviors.same
92     }
93   }
94 
95   @SuppressWarnings(Array("org.wartremover.warts.Recursion"))
96   def cache(
97     counter: Int,
98     proposalsPool: Nel[IndexedProposal],
99     iterator: Iterator[IndexedProposal],
100     deadline: Deadline
101   ): Behavior[Protocol] = {
102     Behaviors.receiveMessage {
103       case GetProposal(replyTo) =>
104         if (counter == 0) {
105           fetchProposals()
106         }
107         if (iterator.hasNext) {
108           replyTo ! iterator.next()
109           cache(counter - 1, proposalsPool, iterator, newDeadline)
110         } else {
111           val it = proposalsPool.iterator
112           replyTo ! it.next()
113           cache(counter - 1, proposalsPool, it, newDeadline)
114         }
115       case Expire =>
116         if (deadline.isOverdue()) {
117           Behaviors.stopped
118         } else {
119           Behaviors.same
120         }
121       case SetProposalsPoolSuccess(proposalsPool) =>
122         cache(untilRefresh, proposalsPool, proposalsPool.iterator, newDeadline)
123       case SetProposalsPoolFailure(e) =>
124         context.log.error("Refreshing cache failed", e)
125         Behaviors.stopped
126     }
127   }
128 }
Line Stmt Id Pos Tree Symbol Tests Code
42 29122 1529 - 1771 Apply akka.actor.typed.scaladsl.Behaviors.withStash org.make.api.sequence.sequencecacheactortest akka.actor.typed.scaladsl.Behaviors.withStash[org.make.api.sequence.SequenceCacheActor.Protocol](1000)(((buffer: akka.actor.typed.scaladsl.StashBuffer[org.make.api.sequence.SequenceCacheActor.Protocol]) => akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.sequence.SequenceCacheActor.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.sequence.SequenceCacheActor.Protocol]) => { SequenceCacheActor.this.fetchProposals(reloadProposals, context).apply(); new SequenceCacheActor(context, buffer, config, SequenceCacheActor.this.fetchProposals(reloadProposals, context)).init() }))))
42 29813 1549 - 1553 Literal <nosymbol> org.make.api.sequence.sequencecacheactortest 1000
43 29604 1573 - 1765 Apply akka.actor.typed.scaladsl.Behaviors.setup akka.actor.typed.scaladsl.Behaviors.setup[org.make.api.sequence.SequenceCacheActor.Protocol](((context: akka.actor.typed.scaladsl.ActorContext[org.make.api.sequence.SequenceCacheActor.Protocol]) => { SequenceCacheActor.this.fetchProposals(reloadProposals, context).apply(); new SequenceCacheActor(context, buffer, config, SequenceCacheActor.this.fetchProposals(reloadProposals, context)).init() }))
44 29055 1610 - 1652 Apply scala.Function0.apply SequenceCacheActor.this.fetchProposals(reloadProposals, context).apply()
45 30357 1661 - 1757 Apply org.make.api.sequence.SequenceCacheActor.init new SequenceCacheActor(context, buffer, config, SequenceCacheActor.this.fetchProposals(reloadProposals, context)).init()
52 28314 1935 - 1952 Apply scala.Function0.apply reloadProposals.apply()
52 28465 1916 - 2095 Apply akka.actor.typed.scaladsl.ActorContext.pipeToSelf context.pipeToSelf[cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]](reloadProposals.apply())(((x0$1: scala.util.Try[cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]]) => x0$1 match { case (value: cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]): scala.util.Success[cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]]((proposals @ _)) => SequenceCacheActor.this.SetProposalsPoolSuccess.apply(proposals) case (exception: Throwable): scala.util.Failure[cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]]((e @ _)) => SequenceCacheActor.this.SetProposalsPoolFailure.apply(e) }))
53 29756 1991 - 2025 Apply org.make.api.sequence.SequenceCacheActor.SetProposalsPoolSuccess.apply SequenceCacheActor.this.SetProposalsPoolSuccess.apply(proposals)
54 28881 2061 - 2087 Apply org.make.api.sequence.SequenceCacheActor.SetProposalsPoolFailure.apply SequenceCacheActor.this.SetProposalsPoolFailure.apply(e)
78 29549 2813 - 2865 Apply scala.Int.* eu.timepit.refined.auto.autoUnwrap[eu.timepit.refined.api.Refined, Int, eu.timepit.refined.numeric.Positive](SequenceCacheActor.this.config.proposalsPoolSize)(api.this.RefType.refinedRefType).*(SequenceCacheActor.this.config.cacheRefreshCycles)
78 29818 2813 - 2837 Select org.make.api.technical.sequence.SequenceCacheConfiguration.proposalsPoolSize SequenceCacheActor.this.config.proposalsPoolSize
78 30362 2840 - 2865 Select org.make.api.technical.sequence.SequenceCacheConfiguration.cacheRefreshCycles SequenceCacheActor.this.config.cacheRefreshCycles
78 29022 2820 - 2820 Select eu.timepit.refined.api.RefType.refinedRefType api.this.RefType.refinedRefType
80 29108 2920 - 2944 Select org.make.api.technical.sequence.SequenceCacheConfiguration.inactivityTimeout SequenceCacheActor.this.config.inactivityTimeout
80 28321 2905 - 2944 Apply scala.concurrent.duration.Deadline.+ scala.concurrent.duration.Deadline.now.+(SequenceCacheActor.this.config.inactivityTimeout)
83 29643 2987 - 3361 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.sequence.SequenceCacheActor.Protocol](((x0$1: org.make.api.sequence.SequenceCacheActor.Protocol) => x0$1 match { case (proposalsPool: cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]): org.make.api.sequence.SequenceCacheActor.SetProposalsPoolSuccess((proposalsPool @ _)) => SequenceCacheActor.this.buffer.unstashAll(SequenceCacheActor.this.cache(SequenceCacheActor.this.untilRefresh, proposalsPool, proposalsPool.iterator, SequenceCacheActor.this.newDeadline)) case (error: Throwable): org.make.api.sequence.SequenceCacheActor.SetProposalsPoolFailure((e @ _)) => { SequenceCacheActor.this.context.log.error("Refreshing cache failed", e); akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol] } case (cmd @ _) => { SequenceCacheActor.this.buffer.stash(cmd); akka.actor.typed.scaladsl.Behaviors.same[org.make.api.sequence.SequenceCacheActor.Protocol] } }))
85 28497 3152 - 3163 Select org.make.api.sequence.SequenceCacheActor.newDeadline SequenceCacheActor.this.newDeadline
85 29029 3075 - 3165 Apply akka.actor.typed.scaladsl.StashBuffer.unstashAll SequenceCacheActor.this.buffer.unstashAll(SequenceCacheActor.this.cache(SequenceCacheActor.this.untilRefresh, proposalsPool, proposalsPool.iterator, SequenceCacheActor.this.newDeadline))
85 28857 3128 - 3150 Select cats.data.NonEmptyList.iterator proposalsPool.iterator
85 29793 3093 - 3164 Apply org.make.api.sequence.SequenceCacheActor.cache SequenceCacheActor.this.cache(SequenceCacheActor.this.untilRefresh, proposalsPool, proposalsPool.iterator, SequenceCacheActor.this.newDeadline)
85 29640 3099 - 3111 Select org.make.api.sequence.SequenceCacheActor.untilRefresh SequenceCacheActor.this.untilRefresh
87 30370 3215 - 3262 Apply org.slf4j.Logger.error SequenceCacheActor.this.context.log.error("Refreshing cache failed", e)
88 29533 3271 - 3288 TypeApply akka.actor.typed.scaladsl.Behaviors.stopped akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol]
90 29191 3315 - 3332 Apply akka.actor.typed.scaladsl.StashBuffer.stash SequenceCacheActor.this.buffer.stash(cmd)
91 28283 3341 - 3355 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.sequence.SequenceCacheActor.Protocol]
102 28983 3597 - 4417 Apply akka.actor.typed.scaladsl.Behaviors.receiveMessage akka.actor.typed.scaladsl.Behaviors.receiveMessage[org.make.api.sequence.SequenceCacheActor.Protocol](((x0$1: org.make.api.sequence.SequenceCacheActor.Protocol) => x0$1 match { case (replyTo: akka.actor.typed.ActorRef[org.make.core.proposal.indexed.IndexedProposal]): org.make.api.sequence.SequenceCacheActor.GetProposal((replyTo @ _)) => { if (counter.==(0)) SequenceCacheActor.this.fetchProposals.apply() else (); if (iterator.hasNext) { typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(iterator.next()); SequenceCacheActor.this.cache(counter.-(1), proposalsPool, iterator, SequenceCacheActor.this.newDeadline) } else { val it: Iterator[org.make.core.proposal.indexed.IndexedProposal] = proposalsPool.iterator; typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(it.next()); SequenceCacheActor.this.cache(counter.-(1), proposalsPool, it, SequenceCacheActor.this.newDeadline) } } case org.make.api.sequence.SequenceCacheActor.Expire => if (deadline.isOverdue()) akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol] else akka.actor.typed.scaladsl.Behaviors.same[org.make.api.sequence.SequenceCacheActor.Protocol] case (proposalsPool: cats.data.NonEmptyList[org.make.core.proposal.indexed.IndexedProposal]): org.make.api.sequence.SequenceCacheActor.SetProposalsPoolSuccess((proposalsPool @ _)) => SequenceCacheActor.this.cache(SequenceCacheActor.this.untilRefresh, proposalsPool, proposalsPool.iterator, SequenceCacheActor.this.newDeadline) case (error: Throwable): org.make.api.sequence.SequenceCacheActor.SetProposalsPoolFailure((e @ _)) => { SequenceCacheActor.this.context.log.error("Refreshing cache failed", e); akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol] } }))
104 28951 3667 - 3667 Literal <nosymbol> ()
104 30329 3667 - 3667 Block <nosymbol> ()
104 28862 3671 - 3683 Apply scala.Int.== counter.==(0)
105 29892 3697 - 3713 Block scala.Function0.apply SequenceCacheActor.this.fetchProposals.apply()
105 30286 3697 - 3713 Apply scala.Function0.apply SequenceCacheActor.this.fetchProposals.apply()
107 29602 3736 - 3752 Select scala.collection.Iterator.hasNext iterator.hasNext
107 29856 3754 - 3868 Block <nosymbol> { typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(iterator.next()); SequenceCacheActor.this.cache(counter.-(1), proposalsPool, iterator, SequenceCacheActor.this.newDeadline) }
108 28387 3766 - 3791 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(iterator.next())
108 29150 3776 - 3791 Apply scala.collection.Iterator.next iterator.next()
109 28822 3846 - 3857 Select org.make.api.sequence.SequenceCacheActor.newDeadline SequenceCacheActor.this.newDeadline
109 30290 3802 - 3858 Apply org.make.api.sequence.SequenceCacheActor.cache SequenceCacheActor.this.cache(counter.-(1), proposalsPool, iterator, SequenceCacheActor.this.newDeadline)
109 29646 3808 - 3819 Apply scala.Int.- counter.-(1)
110 28832 3874 - 4018 Block <nosymbol> { val it: Iterator[org.make.core.proposal.indexed.IndexedProposal] = proposalsPool.iterator; typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(it.next()); SequenceCacheActor.this.cache(counter.-(1), proposalsPool, it, SequenceCacheActor.this.newDeadline) }
111 29019 3895 - 3917 Select cats.data.NonEmptyList.iterator proposalsPool.iterator
112 30333 3938 - 3947 Apply scala.collection.Iterator.next it.next()
112 29570 3928 - 3947 Apply akka.actor.typed.ActorRef.ActorRefOps.! typed.this.ActorRef.ActorRefOps[org.make.core.proposal.indexed.IndexedProposal](replyTo).!(it.next())
113 29731 3958 - 4008 Apply org.make.api.sequence.SequenceCacheActor.cache SequenceCacheActor.this.cache(counter.-(1), proposalsPool, it, SequenceCacheActor.this.newDeadline)
113 28791 3964 - 3975 Apply scala.Int.- counter.-(1)
113 28318 3996 - 4007 Select org.make.api.sequence.SequenceCacheActor.newDeadline SequenceCacheActor.this.newDeadline
116 30252 4052 - 4072 Apply scala.concurrent.duration.Deadline.isOverdue deadline.isOverdue()
117 29863 4086 - 4103 TypeApply akka.actor.typed.scaladsl.Behaviors.stopped akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol]
117 29025 4086 - 4103 Block akka.actor.typed.scaladsl.Behaviors.stopped akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol]
119 29573 4131 - 4145 Block akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.sequence.SequenceCacheActor.Protocol]
119 30405 4131 - 4145 TypeApply akka.actor.typed.scaladsl.Behaviors.same akka.actor.typed.scaladsl.Behaviors.same[org.make.api.sequence.SequenceCacheActor.Protocol]
122 28814 4217 - 4288 Apply org.make.api.sequence.SequenceCacheActor.cache SequenceCacheActor.this.cache(SequenceCacheActor.this.untilRefresh, proposalsPool, proposalsPool.iterator, SequenceCacheActor.this.newDeadline)
122 29732 4276 - 4287 Select org.make.api.sequence.SequenceCacheActor.newDeadline SequenceCacheActor.this.newDeadline
122 28280 4252 - 4274 Select cats.data.NonEmptyList.iterator proposalsPool.iterator
122 28752 4223 - 4235 Select org.make.api.sequence.SequenceCacheActor.untilRefresh SequenceCacheActor.this.untilRefresh
124 30259 4338 - 4385 Apply org.slf4j.Logger.error SequenceCacheActor.this.context.log.error("Refreshing cache failed", e)
125 29796 4394 - 4411 TypeApply akka.actor.typed.scaladsl.Behaviors.stopped akka.actor.typed.scaladsl.Behaviors.stopped[org.make.api.sequence.SequenceCacheActor.Protocol]