Parallel data processing that exceeds memory size

Is there an easy way to use concurrent scala collections without loading the complete collection into memory?

For example, I have a large collection, and I would like to perform a certain operation (folding) in parallel only on a small fragment that is stored in memory than on another fragment and so on, and finally recombine the results from all the chunks.

I know that actors can be used, but it would be very nice to use steam collections.

I wrote a solution, but this is not nice:

def split[A](list: Iterable[A], chunkSize: Int): Iterable[Iterable[A]] = { new Iterator[Iterable[A]] { var rest = list def hasNext = !rest.isEmpty def next = { val chunk = rest.take(chunkSize) rest = rest.drop(chunkSize) chunk } }.toIterable } def foldPar[A](acc: A)(list: Iterable[A], chunkSize: Int, combine: ((A, A) => A)): A = { val chunks: Iterable[Iterable[A]] = split(list, chunkSize) def combineChunk: ((A,Iterable[A]) => A) = { case (res, entries) => entries.par.fold(res)(combine) } chunks.foldLeft(acc)(combineChunk) } val chunkSize = 10000000 val x = 1 to chunkSize*10 def sum: ((Int,Int) => Int) = {case (acc,n) => acc + n } foldPar(0)(x,chunkSize,sum) 
+6
source share
1 answer

Your idea is very neat, and it is a pity that there is no such function (AFAIK).

I just rephrased your idea into a slightly shorter code. Firstly, I believe that for parallel folding it is useful to use the concept of monoid - it is a structure with an associative operation and a zero element. Associativity is important because we do not know the order in which we combine the result, which is calculated in parallel. And the zero element is important, so that we can break the calculations into blocks and start adding each of zero. This is nothing new, it's just what fold expects for Scala collections.

 // The function defined by Monoid apply must be associative // and zero its identity element. trait Monoid[A] extends Function2[A,A,A] { val zero: A } 

Further, Scala Iterator already has a useful grouped(Int): GroupedIterator[Seq[A]] method grouped(Int): GroupedIterator[Seq[A]] , which slices an iterator in a fixed-size sequence. This is very similar to your split . This allows us to cut the input data into blocks of a fixed size, and then apply the methods of the parallel Scala collection to them:

 def parFold[A](c: Iterator[A], blockSize: Int)(implicit monoid: Monoid[A]): A = c.grouped(blockSize).map(_.par.fold(monoid.zero)(monoid)) .fold(monoid.zero)(monoid); 

We add each block using a parallel collection structure, and then (without any parallelization) combine the intermediate results.

Example:

 // Example: object SumMonoid extends Monoid[Long] { override val zero: Long = 0; override def apply(x: Long, y: Long) = x + y; } val it = Iterator.range(1, 10000001).map(_.toLong) println(parFold(it, 100000)(SumMonoid)); 
+4
source

Source: https://habr.com/ru/post/948434/


All Articles