Avoid the performance impact of one partition mode in Spark window functions

My question is launched using the case of calculating the differences between consecutive lines in a spark data block.

For example, I have:

>>> df.show() +-----+----------+ |index| col1| +-----+----------+ | 0.0|0.58734024| | 1.0|0.67304325| | 2.0|0.85154736| | 3.0| 0.5449719| +-----+----------+ 

If I decide to calculate them using the "Window" functions, then I can do it like this:

 >>> winSpec = Window.partitionBy(df.index >= 0).orderBy(df.index.asc()) >>> import pyspark.sql.functions as f >>> df.withColumn('diffs_col1', f.lag(df.col1, -1).over(winSpec) - df.col1).show() +-----+----------+-----------+ |index| col1| diffs_col1| +-----+----------+-----------+ | 0.0|0.58734024|0.085703015| | 1.0|0.67304325| 0.17850411| | 2.0|0.85154736|-0.30657548| | 3.0| 0.5449719| null| +-----+----------+-----------+ 

Question : I explicitly split the file frame in one section. What is the effect of this effect and, if so, why is this and how can I avoid it? Because when I do not specify a section, I get the following warning:

 16/12/24 13:52:27 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. 
+8
source share
1 answer

In practice, the performance impact will be almost the same as if you omitted the partitionBy clause altogether. All records will be shuffled into one section, sorted locally and sequentially repeated one after another.

The difference is only in the number of sections created in total. We illustrate this by using a simple data set with 10 sections and 1000 entries:

 df = spark.range(0, 1000, 1, 10).toDF("index").withColumn("col1", f.randn(42)) 

If you define a frame without a sentence section

 w_unpart = Window.orderBy(f.col("index").asc()) 

and use it with lag

 df_lag_unpart = df.withColumn( "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") ) 

there will be only one section:

 df_lag_unpart.rdd.glom().map(len).collect() 
 [1000] 

Compared to this dummy index frame definition (simplified bit compared to your code:

 w_part = Window.partitionBy(f.lit(0)).orderBy(f.col("index").asc()) 

will use the number of partitions equal to spark.sql.shuffle.partitions :

 spark.conf.set("spark.sql.shuffle.partitions", 11) df_lag_part = df.withColumn( "diffs_col1", f.lag("col1", 1).over(w_part) - f.col("col1") ) df_lag_part.rdd.glom().count() 
 11 

with only one non-empty partition:

 df_lag_part.rdd.glom().filter(lambda x: x).count() 
 1 

Unfortunately, there is no universal solution that can be used to solve this problem in PySpark. This is just an integrated implementation mechanism in combination with distributed processing technology.

Since the index column is sequential, you can create an artificial partitioning key with a fixed number of records per block:

 rec_per_block = df.count() // int(spark.conf.get("spark.sql.shuffle.partitions")) df_with_block = df.withColumn( "block", (f.col("index") / rec_per_block).cast("int") ) 

and use it to determine the frame specification:

 w_with_block = Window.partitionBy("block").orderBy("index") df_lag_with_block = df_with_block.withColumn( "diffs_col1", f.lag("col1", 1).over(w_with_block) - f.col("col1") ) 

This will use the expected number of partitions:

 df_lag_with_block.rdd.glom().count() 
 11 

with approximately uniform distribution of data (we cannot avoid hash collisions):

 df_lag_with_block.rdd.glom().map(len).collect() 
 [0, 180, 0, 90, 90, 0, 90, 90, 100, 90, 270] 

but with a few spaces at block boundaries:

 df_lag_with_block.where(f.col("diffs_col1").isNull()).count() 
 12 

Since borders are easy to calculate:

 from itertools import chain boundary_idxs = sorted(chain.from_iterable( # Here we depend on sequential identifiers # This could be generalized to any monotonically increasing # id by taking min and max per block (idx - 1, idx) for idx in df_lag_with_block.groupBy("block").min("index") .drop("block").rdd.flatMap(lambda x: x) .collect()))[2:] # The first boundary doesn't carry useful inf. 

You can always choose:

 missing = df_with_block.where(f.col("index").isin(boundary_idxs)) 

and fill them out separately:

 # We use window without partitions here. Since number of records # will be small this won't be a performance issue # but will generate "Moving all data to a single partition" warning missing_with_lag = missing.withColumn( "diffs_col1", f.lag("col1", 1).over(w_unpart) - f.col("col1") ).select("index", f.col("diffs_col1").alias("diffs_fill")) 

and join :

 combined = (df_lag_with_block .join(missing_with_lag, ["index"], "leftouter") .withColumn("diffs_col1", f.coalesce("diffs_col1", "diffs_fill"))) 

to get the desired result:

 mismatched = combined.join(df_lag_unpart, ["index"], "outer").where( combined["diffs_col1"] != df_lag_unpart["diffs_col1"] ) assert mismatched.count() == 0 
+7
source

Source: https://habr.com/ru/post/1013494/


All Articles