, Spark 2.0
case class my (x: Int)
val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
val df1 = spark.createDataFrame(rdd)
val df2 = df1.limit(1)
df1.map { r => r.getAs[Int](0) }.first
df2.map { r => r.getAs[Int](0) }.first // Much slower than the previous line
, Dataset.first Dataset.limit(1).collect, :
scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#124]
+- *MapElements <function1>, obj#123: int
+- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
+- Scan ExistingRDD[x#74]
scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
== Physical Plan ==
CollectLimit 1
+- *SerializeFromObject [input[0, int, true] AS value#131]
+- *MapElements <function1>, obj#130: int
+- *DeserializeToObject createexternalrow(x#74, StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
+- *GlobalLimit 1
+- Exchange SinglePartition
+- *LocalLimit 1
+- Scan ExistingRDD[x#74]
CollectLimitExec. , , 1 , , , . , , , . .
CollectLimitExec , . LocalLimit (1) , 1 , . CollectLimitExec 1 .