, . , , , , , , , OOM. - . , (, Iterator, scala.io.Source.fromX) , work-pulling . , , , (, , , , ).
. , :
import akka.actor._
import akka.routing.RoundRobinLike
import akka.routing.RoundRobinRouter
import scala.io.Source
import akka.routing.Broadcast
object FileReadMaster{
case class ProcessFile(filePath:String)
case class ProcessLines(lines:List[String], last:Boolean = false)
case class LinesProcessed(lines:List[String], last:Boolean = false)
case object WorkAvailable
case object GimmeeWork
}
class FileReadMaster extends Actor{
import FileReadMaster._
val workChunkSize = 10
val workersCount = 10
def receive = waitingToProcess
def waitingToProcess:Receive = {
case ProcessFile(path) =>
val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList
val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers)))
val it = Source.fromFile(path).getLines
workersPool ! Broadcast(WorkAvailable)
context.become(processing(it, workersPool, workers.size))
workers foreach (context watch _)
}
def processing(it:Iterator[String], workers:ActorRef, workersRunning:Int):Receive = {
case ProcessFile(path) =>
sender ! Status.Failure(new Exception("already processing!!!"))
case GimmeeWork if it.hasNext =>
val lines = List.fill(workChunkSize){
if (it.hasNext) Some(it.next)
else None
}.flatten
sender ! ProcessLines(lines, it.hasNext)
if (!it.hasNext) workers ! Broadcast(PoisonPill)
case GimmeeWork =>
case LinesProcessed(lines, last) =>
case Terminated(ref) if workersRunning == 1 =>
case Terminated(ref) =>
context.become(processing(it, workers, workersRunning - 1))
}
}
class FileReadWorker extends Actor{
import FileReadMaster._
def receive = {
case ProcessLines(lines, last) =>
sender ! LinesProcessed(lines.map(_.reverse), last)
sender ! GimmeeWork
case WorkAvailable =>
sender ! GimmeeWork
}
}
, . , . , . , , , , . , .
, , , , . , , .