Apache Spark: efficient use of mapPartitions in Java

In an early-release textbook called High Performance Spark, Spark developers note that:

So that Spark can flexibly spill some writes to disk, it is important to represent your functions inside mapPartitions in such a way that your functions do not force you to load the entire section in memory (for example, implicitly converting to a list). Iterators have many methods, we can write a functional transformation style, or you can create your own iterator. When a transformation directly accepts and returns an iterator without forcing another collection, we call these transformations an iterator into an iterator.

However, the tutorial lacks good examples using mapPartitions or similar method variations. And there are some good code examples that exist on the Internet, most of which are Scala. For example, we see this Scala code using mapPartitions written by zero323 on How to add columns to org.apache.spark.sql.Row inside mapPartitions .

 def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow) sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show 

Unfortunately, Java does not provide anything good, like iter.map(...) for iterators. Thus, the question arises, how to effectively use iterator iterator transformations using mapPartitions without completely allocating RDD to disk as a list?

 JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> { ArrayList<OutObj> out = new ArrayList<>(); while(iter.hasNext()) { InObj current = iter.next(); out.add(someChange(current)); } return out.iterator(); }); 

This seems to be the general syntax for using mapPartitions in Java examples, but I don’t see how this would be most efficient, assuming you have JavaRDD with tens of thousands of entries (or even more ... since Spark for big data). In the end, you will get a list of all the objects in the iterator to return it back to the iterator (which asks you to say that some kind of map function will be much more efficient).

Note : while these 8 lines of code using mapPartitions can be written as 1 line using map or flatMap , I intentionally use mapPartitions to take advantage of the fact that it works on every section and not on every element in RDD .

Any ideas please?

+5
source share
1 answer

One way to prevent the “materialization” of the entire section is to convert the Iterator to a stream, and then use the Stream API (for example, map ).

How to convert an iterator to a stream? offers several good ways to convert Iterator to Stream , therefore, using one of the suggested options there we can end up with:

 rdd.mapPartitions((Iterator<InObj> iter) -> { Iterable<InObj> iterable = () -> iter; return StreamSupport.stream(iterable.spliterator(), false) .map(s -> transformRow(s)) // or whatever transformation .iterator(); }); 

What should be the Iterator-Iterator transformation, because all intermediate APIs used ( Iterable , Stream ) are evaluated lazily.

EDIT : I have not tested it myself, but the OP commented, and I quote that "there is no increase in efficiency with Stream over the list." I do not know why this is so, and I do not know whether this will be true in general, but it is worth mentioning.

+5
source

Source: https://habr.com/ru/post/1264898/


All Articles