Spark filter (delete) based on values from another data frame
I have a "large" dataset ( huge_df ) s> 20 columns. One of the columns is the id field (generated using pyspark.sql.functions.monotonically_increasing_id() ).
Using some criteria, I generate a second data frame ( filter_df ) consisting of id values that I want to filter later from huge_df .
I am currently using SQL syntax for this:
filter_df.createOrReplaceTempView('filter_view') huge_df = huge_df.where('id NOT IN (SELECT id FROM filter_view)') Question 1: Is there a way to do this using only Python, i.e. Without having to register a TempView ?
Question 2: Is there a completely different way to accomplish the same thing?
You can use join
huge_df = huge_df.join(filter_df, huge_df.id == filter_df.id, "left_outer").where(filter_df.id.isNull()).select([col(c) for c in huge_df.columns] However, this will lead to an expensive shuffle.
The logic is simple: the left ones join the filter_df in the id fields and check if filter_df is zero - if it is null, it means that in the filter_df file
there is no such lineHere is another way to do this -
# Sample data hugedf = spark.createDataFrame([[1,'a'],[2,'b'],[3,'c'],[4,'d']],schema=(['k1','v1'])) fildf = spark.createDataFrame([[1,'b'],[3,'c']],schema=(['k2','v2'])) from pyspark.sql.functions import col hugedf\ .select('k1')\ .subtract(fildf.select('k2'))\ .toDF('d1')\ .join(hugedf,col('d1')==hugedf.k1)\ .drop('d1')\ .show() the logic is simple, subtract the id values found in the filterDf file from the id values found in largeDF, leaving identifier values that are not in filterDF,
I designated the subtracted values as the column “d1” for the sake of clarity only, and then joined the dynamicDF table to the values of d1 and reset d1 to give the final result.