What is the most efficient sorting method in PySpark?

I have been analyzing U.S. domestic flight lead times since 2015. I need to group by tail number and store a list sorted by date of all flights for each tail number in the database, which will be received according to my application, I'm not sure which of the two options for achieving this is the best.

# Load the parquet file on_time_dataframe = sqlContext.read.parquet('../data/on_time_performance.parquet') # Filter down to the fields we need to identify and link to a flight flights = on_time_dataframe.rdd.map(lambda x: (x.Carrier, x.FlightDate, x.FlightNum, x.Origin, x.Dest, x.TailNum) ) 

I can achieve this by decreasing sorting ...

 # Group flights by tail number, sorted by date, then flight number, then origin/dest flights_per_airplane = flights\ .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ .reduceByKey(lambda a, b: sorted(a + b, key=lambda x: (x[1],x[2],x[3],x[4]))) 

Or I can achieve this in a subsequent map task ...

 # Do same in a map step, more efficient or does pySpark know how to optimize the above? flights_per_airplane = flights\ .map(lambda nameTuple: (nameTuple[5], [nameTuple]))\ .reduceByKey(lambda a, b: a + b)\ .map(lambda tuple: ( tuple[0], sorted(tuple[1], key=lambda x: (x[1],x[2],x[3],x[4]))) ) 

Doing this in a cut seems really inefficient, but in fact both are very slow. sorted () looks like a way to do this in PySpark docs, so I wonder if PySpark can do this kosher inside? Which option is the most effective or the best choice for any other reason?

My code also makes sense here: https://gist.github.com/rjurney/af27f70c76dc6c6ae05c465271331ade

If you are interested in finding out the data, this is from the Bureau of Transport Statistics, here: http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

+5
source share
1 answer

Unfortunately, both methods are incorrect before you start sorting, and there is no efficient and easy way to do this in Spark. However, the first one is significantly worse than the other.

Why are both ways wrong? Because it's just another groupByKey , and it's just an expensive operation. There are several ways to improve the situation (especially to avoid reducing the side of the card), but at the end of the day you just need to pay the price for a full shuffle, and if you don't see any glitches, it's probably not worth the hassle.

However, the second approach is much better algorithmically *. If you want to keep the sorted structure all the way, as in the first attempt, you should be allocated the tools ( aggregateByKey with bisect.insort would be a good choice), but nothing will work here.

If grouped output is a complex requirement, the best thing to do is keyBy , groupByKey and sort. This will not improve performance compared to the second solution, but may improve readability:

 (flights .keyBy(lambda x: x[5]) .groupByKey() .mapValues(lambda vs: sorted(vs, key=lambda x: x[1:5]))) 

* Even if you assume the best scenario for Timsort , the first approach is N times O (N), and the second is O (N log N) in the worst case.

+3
source

Source: https://habr.com/ru/post/1246308/


All Articles