Large task size for a simple program

I am trying to run a simple program using Spark

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dat = (1 to 10000000).toList
    val data = sc.parallelize(dat).cache()
    for(i <- 1 to 100){
      println(data.reduce(_ + _))
    }
  }   
}

After each iteration, the following error message appears:

WARN TaskSetManager: Stage 0 contains a task with a very large size (9767 KB). The maximum recommended task size is 100 KB.

Increasing data size increases the size of the specified task. This tells me that the driver sends the "dat" object to all executors, but I can’t understand for life why, since the only operation on my RDD is reduced, which in principle does not close. Any ideas?

+4
source share
2 answers

, Spark parallelize Spark , . . , flatMap, . . :

import org.apache.spark.{SparkContext, SparkConf}

object LargeTaskTest extends App {

  val conf = new SparkConf().setAppName("DataTest").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val dat = (0 to 99).toList
  val data = sc.parallelize(dat).cache().flatMap(i => (1 to 1000000).map(j => j * 100 + i))
  println(data.count()) //100000000
  println(data.reduce(_ + _))
  sc.stop()
}

EDIT:

, , . parallelize ParallelCollectionRDD:

def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L730

ParallelCollectionRDD , numSlices:

  override def getPartitions: Array[Partition] = {
    val slices = ParallelCollectionRDD.slice(data, numSlices).toArray
    slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
  }

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L96

numSlices sc.defaultParallelism, 4. , , .

SparkContext.parallelize @note Parallelize acts lazily ParallelCollectionRDD ;

//TODO: , RDD //. , DFS split// .

, , , , , , , . - , .

+5

node. sc.parallelize 100 . , - :

data.map(el=> el%100 -> el).reduceByKey(_+_)

.

data.mapPartitions(p => Iterator(p.reduce(_ + _))).reduce(_ + _)

sum:)

-2

Source: https://habr.com/ru/post/1617231/


All Articles