In a cogroup transform, for example. RDD1.cogroup (RDD2, ...), I assumed that Spark only moves / moves RDD2 and keeps the RDD1 partition and storage in memory if:
- RDD1 has an explicit delimiter
- RDD1 is cached.
In my other projects, most of the shuffling seems to be consistent with this assumption. So yesterday I wrote a short scala program to prove it once and for all:
// sc is the SparkContext val rdd1 = sc.parallelize(1 to 10, 4).map(v => v->v) .partitionBy(new HashPartitioner(4)) rdd1.persist().count() val rdd2 = sc.parallelize(1 to 10, 4).map(v => (11-v)->v) val cogrouped = rdd1.cogroup(rdd2).map { v => v._2._1.head -> v._2._2.head } val zipped = cogrouped.zipPartitions(rdd1, rdd2) { (itr1, itr2, itr3) => itr1.zipAll(itr2.map(_._2), 0->0, 0).zipAll(itr3.map(_._2), (0->0)->0, 0) .map { v => (v._1._1._1, v._1._1._2, v._1._2, v._2) } } zipped.collect().foreach(println)
If rdd1 does not move, the first zipped column should have the same value as the third column, so I ran oops programs:
(4,7,4,1) (8,3,8,2) (1,10,1,3) (9,2,5,4) (5,6,9,5) (6,5,2,6) (10,1,6,7) (2,9,10,0) (3,8,3,8) (7,4,7,9) (0,0,0,10)
The assumption is incorrect. Spark probably did some internal optimization and decided that regenerating rdd1 partitions was much faster than storing them in the cache.
So the question is: if my software requirement not to move RDD1 (and store it in the cache) is due to other reasons than speed (for example, resource localization), or in some cases Spark internal optimization is not preferable, is there any way to instruct explicitly Does the framework not move an operand in all operations like cogroup? It also includes pool, outer join, and group c.
Thank you very much for your help. As long as I use the broadcast connection as a not-so-scalable workaround, it won't last long before my cluster crashes. I expect a solution consistent with the principles of distributed computing.