Why use DataFrame.select over DataFrame.rdd.map (or vice versa)?

Is there any “mechanical” difference between using select on a DataFrame to collect the information we need and map each row of the underlying RDD for the same purpose?

By "mechanical" I mean a mechanism that performs operations. Implementation details, in other words.

Which of the proposed two is better / more effective?

 df = # create dataframe ... df.select("col1", "col2", ...) 

or

 df = # create dataframe ... df.rdd.map(lambda row: (row[0], row[1], ...)) 

I am doing performance testing, so I find out which is faster, but I would like to know what are the differences in implementation and the pros / cons.

+5
source share
2 answers

In this simplified example with DataFrame.select and DataFrame.rdd.map I think the difference can be almost negligible.

After you have already uploaded your dataset and performed only projection. In the end, both had to deserialize the data from the Spark InternalRow column format in order to calculate the result for the action.

You can check what happens with the DataFrame.select on explain(extended = true) , where you will learn about physical plans (and the physical plane too).

 scala> spark.version res4: String = 2.1.0-SNAPSHOT scala> spark.range(5).select('id).explain(extended = true) == Parsed Logical Plan == 'Project [unresolvedalias('id, None)] +- Range (0, 5, step=1, splits=Some(4)) == Analyzed Logical Plan == id: bigint Project [id#17L] +- Range (0, 5, step=1, splits=Some(4)) == Optimized Logical Plan == Range (0, 5, step=1, splits=Some(4)) == Physical Plan == *Range (0, 5, step=1, splits=Some(4)) 

Compare the physical plan (i.e. SparkPlan ) with what you are doing with rdd.map (via toDebugString ) and you will find out what could be “better”.

 scala> spark.range(5).rdd.toDebugString res5: String = (4) MapPartitionsRDD[8] at rdd at <console>:24 [] | MapPartitionsRDD[7] at rdd at <console>:24 [] | MapPartitionsRDD[6] at rdd at <console>:24 [] | MapPartitionsRDD[5] at rdd at <console>:24 [] | ParallelCollectionRDD[4] at rdd at <console>:24 [] 

(again, in this contrived example, I think that there is no winner - both are most effective).

Note that the DataFrame indeed a Dataset[Row] , which uses RowEncoder to encode (that is, serialize) the data into the binary format of the InternalRow column. If you were to execute more statements in the pipeline, you could get much better performance with Dataset binding than RDD , just because there are low-level backstage plans for logical query plans and a binary column format.

There are many optimizations, and trying to defeat them can often be a waste of time. You will need to know the insides of Spark by heart in order to get better performance (and the price would certainly be readable).

There are many, and I highly recommend watching the Deep Dive talk in the Catalog Optimizer by Herman van Hovell to find out and appreciate all the optimizations.

I think this is ... "Stay away from RDD if you don't know what you are doing."

+1
source

RDD is just a line of transformation and action graphs.

The DataFrame has a logical plan that is optimized by the Catalyst logical query optimizer for internal use before performing an action.

What does this mean in your case?

If you have a DataFrame, you should use select - any additional work, such as filtering, combining, etc., will be optimized. An optimized DataFrame can be 10 times faster than a regular RDD. In other words, before executing select Spark will try to execute the query faster. This will not be done when using dataFrame.rdd.map ()

One more thing: the rdd value rdd calculated lazily by doing:

 lazy val rdd: RDD[T] = { val objectType = exprEnc.deserializer.dataType val deserialized = CatalystSerde.deserialize[T](logicalPlan) sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows => rows.map(_.get(0, objectType).asInstanceOf[T]) } } 

So, Spark will use RDD, map and cast materials. The DAG of both versions will be almost the same as in the question, so the performance will be similar. However, in more complex cases, the benefits of using datasets will be very noticeable, as Spark PMC writes on the Databricks blog, the dataset can be 100 times faster after Catalyst optimization

Keep in mind that DataFrame = Dataset [Row] and uses RDD in the background - but the RDD plot is created after optimization

Note : Spark integrates the API. Spark ML is now DataFrame oriented, the old API should not be used. Streaming moves to Structured Streaming. Therefore, even if you do not have a big performance improvement in your case, consider using DataFrames. This would be a better solution for future development and, of course, would be faster than using a simple RDD

+2
source

Source: https://habr.com/ru/post/1260350/


All Articles