In Apache Spark cogroup, how to make sure 1 RDD of> 2 operands is not moved?

In a cogroup transform, for example. RDD1.cogroup (RDD2, ...), I assumed that Spark only moves / moves RDD2 and keeps the RDD1 partition and storage in memory if:

  • RDD1 has an explicit delimiter
  • RDD1 is cached.

In my other projects, most of the shuffling seems to be consistent with this assumption. So yesterday I wrote a short scala program to prove it once and for all:

// sc is the SparkContext val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v) .partitionBy(new HashPartitioner(4)) rdd1.persist().count() val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v) val cogrouped = rdd1.cogroup(rdd2).map { v => v._2._1.head -> v._2._2.head } val zipped = cogrouped.zipPartitions(rdd1, rdd2) { (itr1, itr2, itr3) => itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0) .map { v => (v._1._1._1, v._1._1._2, v._1._2, v._2) } } zipped.collect().foreach(println) 

If rdd1 does not move, the first zipped column should have the same value as the third column, so I ran oops programs:

 (4,7,4,1) (8,3,8,2) (1,10,1,3) (9,2,5,4) (5,6,9,5) (6,5,2,6) (10,1,6,7) (2,9,10,0) (3,8,3,8) (7,4,7,9) (0,0,0,10) 

The assumption is incorrect. Spark probably did some internal optimization and decided that regenerating rdd1 partitions was much faster than storing them in the cache.

So the question is: if my software requirement not to move RDD1 (and store it in the cache) is due to other reasons than speed (for example, resource localization), or in some cases Spark internal optimization is not preferable, is there any way to instruct explicitly Does the framework not move an operand in all operations like cogroup? It also includes pool, outer join, and group c.

Thank you very much for your help. As long as I use the broadcast connection as a not-so-scalable workaround, it won't last long before my cluster crashes. I expect a solution consistent with the principles of distributed computing.

+1
source share
1 answer

If rdd1 does not move, the first zipped column should have the same value as the third column

This assumption is simply incorrect. The creation of CoGroupedRDD is associated not only with shuffling, but also with the creation of internal structures necessary for matching the corresponding records. Internal Spark will use its own ExternalAppendOnlyMap , which uses a custom implementation of the open hash table ( AppendOnlyMap ), which does not provide any order guarantees.

If you check the debug line:

 zipped.toDebugString 
 (4) ZippedPartitionsRDD3[8] at zipPartitions at <console>:36 [] | MapPartitionsRDD[7] at map at <console>:31 [] | MapPartitionsRDD[6] at cogroup at <console>:31 [] | CoGroupedRDD[5] at cogroup at <console>:31 [] | ShuffledRDD[2] at partitionBy at <console>:27 [] | CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B +-(4) MapPartitionsRDD[1] at map at <console>:26 [] | ParallelCollectionRDD[0] at parallelize at <console>:26 [] +-(4) MapPartitionsRDD[4] at map at <console>:29 [] | ParallelCollectionRDD[3] at parallelize at <console>:29 [] | ShuffledRDD[2] at partitionBy at <console>:27 [] | CachedPartitions: 4; MemorySize: 512.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B +-(4) MapPartitionsRDD[1]... 

you will see that Spark really uses CachedPartitions to calculate zipped RDD . If you also skip map transformations that remove the delimiter, you will see that coGroup reuses the delimiter provided by rdd1 :

 rdd1.cogroup(rdd2).partitioner == rdd1.partitioner 
 Boolean = true 
+2
source

Source: https://habr.com/ru/post/1013364/


All Articles