Spark DataSet Filter Performance

I experimented with various ways of filtering a typed dataset. It turns out that performance can be completely different.

The data set was created based on 1.6 GB data series with 33 columns and 4226047 rows. A DataSet is created by loading csv data and matching it with the case class.

val df = spark.read.csv(csvFile).as[FireIncident] 

The filter on UnitId = 'B02' should return 47980 lines. I checked three ways, as shown below: 1) Use a typed column (~ 500 ms on the local host)

 df.where($"UnitID" === "B02").count() 

2) Use a temporary table and sql query (~ same as option 1)

 df.createOrReplaceTempView("FireIncidentsSF") spark.sql("SELECT * FROM FireIncidentsSF WHERE UnitID='B02'").count() 

3) Use a field with a strong typed class (14 987 ms, i.e. 30 times slower)

 df.filter(_.UnitID.orNull == "B02").count() 

I tested it again using the python API for the same dataset, the time is 17 046 ms, which is comparable to the performance of the scala API.

 df.filter(df['UnitID'] == 'B02').count() 

Can someone shed some light on how 3) and the python API is executed differently than the first two options?

+5
source share
2 answers

This is because of step 3 here .

In the first two sparks, you don't need to deserialize the entire Java / Scala object - it just looks at one column and moves around.

Thirdly, since you are using a lambda function, the spark cannot say that you just want one field, so it pulls all 33 fields out of memory for each row, so you can check one field.

I'm not sure why the fourth is so slow. It looks like it will work the same as the first.

+4
source

When python starts up, what happens is that your code is first loaded into the JVM, interpreted, and then finally compiled into bytecode. When using the Scala API, Scala initially runs on the JVM, so you cut out all the python loading code into the JVM part.

0
source

Source: https://habr.com/ru/post/1261591/


All Articles