After reading Jamie Allen's Effective Acca, I will try to apply his Cameo sentence.
Slideshare: http://www.slideshare.net/shinolajla/effective-akka-scalaio
Github: https://github.com/jamie-allen/effective_akka
I think that what I created will work, but does not seem to be the best approach based on Jamie's comments in his conversations. I will update / edit back to this post what I implemented (or try).
Consolidated Actor (Cameo Actor):
object SummaryResponseHandler { case object DbRetrievalTimeout def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = { Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender)) } } class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef) extends Actor with ActorLogging { import SummaryResponseHandler._ var mongoSummary, redisSummary: Option[String] = None def receive = LoggingReceive { case MongoSummary(summary) => log.debug(s"Received mongo summary: $summary") mongoSummary = summary collectSummaries case RedisSummary(summary) => log.debug(s"Received redis summary: $summary") redisSummary = summary collectSummaries case DbRetrievalTimeout => log.debug("Timeout occurred") sendResponseAndShutdown(DbRetrievalTimeout) } def collectSummaries = (mongoSummary, redisSummary) match { case (Some(m), Some(r)) => log.debug(s"Values received for both databases") timeoutMessager.cancel sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary)) case _ => } def sendResponseAndShutdown(response: Any) = { originalSender ! response log.debug("Stopping context capturing actor") context.stop(self) } import context.dispatcher val timeoutMessager = context.system.scheduler.scheduleOnce( 250 milliseconds, self, DbRetrievalTimeout) } class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging { def receive = { case GetSummary(dataSet) => log.debug("received dataSet") val originalSender = sender val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler") mongoDb.tell(GetSummary(dataSet), handler) redisDb.tell(GetSummary(dataSet), handler) case _ => log.debug(s"Unknown result $GetSummary(datset)") } }
General:
case class GetSummary(dataSet: String) case class DataSetSummary( mongo: Option[String], redis: Option[String] ) case class MongoSummary( summary: Option[String] ) case class RedisSummary( summary: Option[String] ) trait MongoProxy extends Actor with ActorLogging trait RedisProxy extends Actor with ActorLogging
Mock Stubs:
class MongoProxyStub extends RedisProxy { val summaryData = Map[String, String]( "dataset1" -> "MongoData1", "dataset2" -> "MongoData2") def receive = LoggingReceive { case GetSummary(dataSet: String) => log.debug(s"Received GetSummary for ID: $dataSet") summaryData.get(dataSet) match { case Some(data) => sender ! MongoSummary(Some(data)) case None => sender ! MongoSummary(Some("")) } } } class RedisProxyStub extends MongoProxy{ val summaryData = Map[String, String]( "dataset1" -> "RedisData1", "dataset2" -> "RedisData2") def receive = LoggingReceive { case GetSummary(dataSet: String) => log.debug(s"Received GetSummary for ID: $dataSet") summaryData.get(dataSet) match { case Some(data) => sender ! RedisSummary(Some(data)) case None => sender ! RedisSummary(Some("")) } } }
Download (you should use the test, but just want to run it from the download):
object Boot extends App{ val system = ActorSystem("DbSystem") val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo") val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis") val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1") implicit val timeout = Timeout(5 seconds) val future = summaryRetrieverActor ? GetSummary("dataset1") val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary] println(Some(result.mongo).x) println(result.redis) system.shutdown() }
Application Configuration:
akka.loglevel = "DEBUG" akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.actor.debug.autoreceive = on akka.actor.debug.lifecycle = on akka.actor.debug.receive = on akka.actor.debug.event-stream = on