As for Spark 1.6+, the only API that supports partition configuration is the creation of RDD:
/** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T]
Despite the fact that it is very useful in some cases (for example, when RDD.compute () has access to some local resources, and not just to HDFS). This is the only place where such a setting is open, but it will be quickly discarded after the first shuffle (where the lower section will inherit preferredLocation from their largest parent)
// snippet from org.apache.spark.rdd.ShuffledRDD.scala override protected def getPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]] tracker.getPreferredLocationsForShuffle(dep, partition.index) }
and cogroup / join (where the downstream section will use the first parent that has an explicit Partitioner).
I am wondering if this project is intentional, or if there is a better solution for colocation. What do you think would be the best solution to specify preferredLocation for shuffled / merged / cogrouped / join RDD? Do I have to write my own instances of RDD to achieve this?
Thank you very much for your understanding.
UPDATE I suggested a possible solution that, to my surprise, does not work:
In Apache Spark cogroup, how to make sure 1 RDD of> 2 operands is not moved?
so I delete the answer, if you have something that works, you are welcome to share it here, otherwise we will have to wait for https://issues.apache.org/jira/browse/SPARK-18078