Spark mapPartitions vs transitional lazy val

I was wondering what distinguishes the functionality of sparks mapPartitions from the transitional lazy val.
Since each section mainly runs on a different node, each instance of the transitional lazy val will be created on each node (assuming it in the object).

For instance:

 class NotSerializable(v: Int) { def foo(a: Int) = ??? } object OnePerPartition { @transient lazy val obj: NotSerializable = new NotSerializable(10) } object Test extends App{ val conf = new SparkConf().setMaster("local[2]").setAppName("test") val sc = new SparkContext(conf) val rdd: RDD[Int] = sc.parallelize(1 to 100000) rdd.map(OnePerPartition.obj.foo) // ---------- VS ---------- rdd.mapPartitions(itr => { val obj = new NotSerializable(10) itr.map(obj.foo) }) } 

One may ask why you want this ... I would like to create a general container concept to run my logic for any universal collection implementation ( RDD , List , scalding pipe , etc.)
They all have the concept of a map, but mapPartition unique to spark .

+5
source share
1 answer

First of all you do not need transient lazy here. Using the object wrapper is enough to make this work, and you can write it as:

 object OnePerExecutor { val obj: NotSerializable = new NotSerializable(10) } 

There is a fundamental difference between wrapping an object and initializing NotSerializable inside mapPartitions . It:

 rdd.mapPartitions(iter => { val ns = NotSerializable(1) ??? }) 

creates one instance of NotSerializable for each section.

The object wrapper, on the other hand, creates a single NotSerializable instance for each JVM executor. As a result, this example:

  • It can be used to process several partitions.
  • Access can be performed simultaneously by several streams of performers.
  • Life expectancy exceeds the call to the function where it is used.

This means that it must be thread safe, and any method calls should be free of side effects.

+2
source

Source: https://habr.com/ru/post/1260239/


All Articles