Scala Parallel Stream Memory Consumption

I wrote a Scala application (2.9.1-1) that needs to process several million rows from a database query. I convert ResultSet to Stream using the technique shown in the answer to one of my previous questions :

 class Record(...) val resultSet = statement.executeQuery(...) new Iterator[Record] { def hasNext = resultSet.next() def next = new Record(resultSet.getString(1), resultSet.getInt(2), ...) }.toStream.foreach { record => ... } 

and it worked very well.

Since the body of the foreach closure is very intense for the processor, and as a proof of the practicality of functional programming, if I add .par to the foreach , the closures will run in parallel, without any others, except to make sure that the cover body is thread safe (it written in a functional style without modifiable data, except for printing in a thread-safe journal).

However, I am worried about memory consumption. Is .par reason that the entire result set is loaded into RAM, or is the parallel operation loading only as many lines as there are active threads? I have allocated 4G for the JVM (64-bit with -Xmx4g ), but in the future I will run it on even more lines and worry about what I will end up with from the memory.

Is there a better way for this kind of parallel processing in a functional manner? I showed this application to my employees as an example of the value of functional programming and multi-core machines.

+6
source share
2 answers

If you look at the scaladoc Stream , you will notice that the par definition class is a sign of Parallelizable ... and if you look at the source code of this characteristic , you will notice that it takes every element from the original collection and puts them in a combiner, so you You will load each line in ParSeq :

  def par: ParRepr = { val cb = parCombiner for (x <- seq) cb += x cb.result } /** The default `par` implementation uses the combiner provided by this method * to create a new parallel collection. * * @return a combiner for the parallel collection of type `ParRepr` */ protected[this] def parCombiner: Combiner[A, ParRepr] 

A possible solution is to explicitly parallelize your calculations, for example, using actors. You can take a look at this example from the akka documentation, for example, which may be useful in your context.

+4
source

The new akka stream is the fix you are looking for:

 import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Source, Sink} def iterFromQuery() : Iterator[Record] = { val resultSet = statement.executeQuery(...) new Iterator[Record] { def hasNext = resultSet.next() def next = new Record(...) } } def cpuIntensiveFunction(record : Record) = { ... } implicit val actorSystem = ActorSystem() implicit val materializer = ActorMaterializer() implicit val execContext = actorSystem.dispatcher val poolSize = 10 //number of Records in memory at once val stream = Source(iterFromQuery).runWith(Sink.foreachParallel(poolSize)(cpuIntensiveFunction)) stream onComplete {_ => actorSystem.shutdown()} 
-1
source

Source: https://habr.com/ru/post/911402/


All Articles