Akka Consolidates Concurrent Database Queries

I want to be able to make parallel queries to multiple data repositories and consolidate the results. I am trying to understand if my approach is really valid or if there is a better way to approach this problem. I am definitely new to Akka / Spray / Scala and really want to better understand how to properly create these components. Any suggestions / tips would be greatly appreciated. Trying to wrap your head around using actors and futures for this type of implementation.

Atomization Service:

trait DemoService extends HttpService with Actor with ActorLogging { implicit val timeout = Timeout(5 seconds) // needed for `?` below val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor") val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor") val messageApiRouting = path("summary" / Segment / Segment) { (dataset, timeslice) => onComplete(getDbResponses(dbMaster, dataset, timeslice)) { case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse") case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}") } } /** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval * * @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing * @param dataset The dataset for which the summary has been requested * @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested */ def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = { log.debug(s"dataset: $dataset timeslice: $timeslice") val dbMessage = DbMessage("summary", dataset + timeslice) (mongoActor ? dbMessage).mapTo[DbMessageResponse] } def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = { log.debug(s"dataset: $dataset timeslice: $timeslice") val dbMessage = DbMessage("summary", dataset + timeslice) (dbActor ? dbMessage).mapTo[SummaryResponse] } def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = { mongoSummary.response + redisSummary.response } } 

Akka Actor / Future db query layouts:

 class DbMasterActor extends Actor with ActorLogging { private var originalSender: ActorRef = _ //TODO: Need to add routing to the config to limit instances val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor") def receive = { case msg: DbMessage => { this.originalSender = sender msg.query match { case "summary" => { getDbResults().onComplete{ case Success(result) => originalSender ! result case Failure(ex) => log.error(ex.getMessage) } } } } //If not match log an error case _ => log.error("Received unknown message: {} ") } def getDbResults(): Future[SummaryResponse] = { log.debug("hitting db results") val mongoResult = Future{ Thread.sleep(500); "Mongo"} val redisResult = Future{ Thread.sleep(800); "redis"} for{ mResult <- mongoResult rResult <- redisResult } yield SummaryResponse(mResult, rResult) } } 
+2
source share
1 answer

After reading Jamie Allen's Effective Acca, I will try to apply his Cameo sentence.

Slideshare: http://www.slideshare.net/shinolajla/effective-akka-scalaio

Github: https://github.com/jamie-allen/effective_akka

I think that what I created will work, but does not seem to be the best approach based on Jamie's comments in his conversations. I will update / edit back to this post what I implemented (or try).

Consolidated Actor (Cameo Actor):

 object SummaryResponseHandler { case object DbRetrievalTimeout def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = { Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender)) } } class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef) extends Actor with ActorLogging { import SummaryResponseHandler._ var mongoSummary, redisSummary: Option[String] = None def receive = LoggingReceive { case MongoSummary(summary) => log.debug(s"Received mongo summary: $summary") mongoSummary = summary collectSummaries case RedisSummary(summary) => log.debug(s"Received redis summary: $summary") redisSummary = summary collectSummaries case DbRetrievalTimeout => log.debug("Timeout occurred") sendResponseAndShutdown(DbRetrievalTimeout) } def collectSummaries = (mongoSummary, redisSummary) match { case (Some(m), Some(r)) => log.debug(s"Values received for both databases") timeoutMessager.cancel sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary)) case _ => } def sendResponseAndShutdown(response: Any) = { originalSender ! response log.debug("Stopping context capturing actor") context.stop(self) } import context.dispatcher val timeoutMessager = context.system.scheduler.scheduleOnce( 250 milliseconds, self, DbRetrievalTimeout) } class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging { def receive = { case GetSummary(dataSet) => log.debug("received dataSet") val originalSender = sender val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler") mongoDb.tell(GetSummary(dataSet), handler) redisDb.tell(GetSummary(dataSet), handler) case _ => log.debug(s"Unknown result $GetSummary(datset)") } } 

General:

 case class GetSummary(dataSet: String) case class DataSetSummary( mongo: Option[String], redis: Option[String] ) case class MongoSummary( summary: Option[String] ) case class RedisSummary( summary: Option[String] ) trait MongoProxy extends Actor with ActorLogging trait RedisProxy extends Actor with ActorLogging 

Mock Stubs:

 class MongoProxyStub extends RedisProxy { val summaryData = Map[String, String]( "dataset1" -> "MongoData1", "dataset2" -> "MongoData2") def receive = LoggingReceive { case GetSummary(dataSet: String) => log.debug(s"Received GetSummary for ID: $dataSet") summaryData.get(dataSet) match { case Some(data) => sender ! MongoSummary(Some(data)) case None => sender ! MongoSummary(Some("")) } } } class RedisProxyStub extends MongoProxy{ val summaryData = Map[String, String]( "dataset1" -> "RedisData1", "dataset2" -> "RedisData2") def receive = LoggingReceive { case GetSummary(dataSet: String) => log.debug(s"Received GetSummary for ID: $dataSet") summaryData.get(dataSet) match { case Some(data) => sender ! RedisSummary(Some(data)) case None => sender ! RedisSummary(Some("")) } } } 

Download (you should use the test, but just want to run it from the download):

 object Boot extends App{ val system = ActorSystem("DbSystem") val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo") val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis") val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1") implicit val timeout = Timeout(5 seconds) val future = summaryRetrieverActor ? GetSummary("dataset1") val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary] println(Some(result.mongo).x) println(result.redis) system.shutdown() } 

Application Configuration:

 akka.loglevel = "DEBUG" akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] akka.actor.debug.autoreceive = on akka.actor.debug.lifecycle = on akka.actor.debug.receive = on akka.actor.debug.event-stream = on 
0
source

Source: https://habr.com/ru/post/987998/


All Articles