To the limitation of large RDD

I read a lot of images and I would like to work on a tiny subset to develop them. As a result, I am trying to understand how and could happen:

In [1]: d = sqlContext.read.parquet('foo')
In [2]: d.map(lambda x: x.photo_id).first()
Out[2]: u'28605'

In [3]: d.limit(1).map(lambda x: x.photo_id)
Out[3]: PythonRDD[31] at RDD at PythonRDD.scala:43

In [4]: d.limit(1).map(lambda x: x.photo_id).first()
// still running...

.. so what's going on? I expected limit () to work much faster than what we had in [2], but this is not the case * .

Below I will describe my understanding and please correct me, because, obviously, I am missing something:

  • d is RDD pairs (I know this from the diagram), and I am talking with a display function:

    i) Take each pair (which will be called xand will return the attribute to me photo_id).

    ii) () RDD, first(), , $ RDD.

  • [3] d RDD 1, , , , d 1 . Out [3] RDD, .

  • [4] [3] RDD...

, [4], , , , , limit(), , :

enter image description here


Edit:

tiny_d = d.limit(1).map(lambda x: x.photo_id)
tiny_d.map(lambda x: x.photo_id).first()

PipelinedRDD, , , , .

, ( , , !).


* [2] , [4] > 3h .

$ - .

+4
1

, Spark 2.0

case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line

, Dataset.first Dataset.limit(1).collect, :

scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
   +- *MapElements <function1>, obj#123: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
         +- Scan ExistingRDD[x#74]

scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
   +- *MapElements <function1>, obj#130: int
      +- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
         +- *GlobalLimit 1
            +- Exchange SinglePartition
               +- *LocalLimit 1
                  +- Scan ExistingRDD[x#74]

CollectLimitExec. , , 1 , , , . , , , . .

CollectLimitExec , . LocalLimit (1) , 1 , . CollectLimitExec 1 .

+3

Source: https://habr.com/ru/post/1649838/


All Articles