I have a dataset with ~ 5M rows x 20 columns containing the groupID and rowID. My goal is to check if (some) columns contain more than a fixed fraction (say, 50%) of missing (zero) values within the group. If this is found, for this column the entire column is set (missing).
df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...} # currently len(check_columns) = 8
for col, _ in check_columns.items():
total = (df
.groupBy('groupID').count()
.toDF('groupID', 'n_total')
)
missing = (df
.where(F.col(col).isNull())
.groupBy('groupID').count()
.toDF('groupID', 'n_missing')
)
# count_missing = count_missing.persist() # PERSIST TRY 1
# print('col {} found {} missing'.format(col, missing.count())) # missing.count() is b/w 1k-5k
poor_df = (total
.join(missing, 'groupID')
.withColumn('freq', F.col('n_missing') / F.col('n_total'))
.where(F.col('freq') > 0.5)
.select('groupID')
.toDF('poor_groupID')
)
df = (df
.join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
.withColumn(col, (F.when(F.col('poor_groupID').isNotNull(), None)
.otherwise(df[col])
)
)
.select(df.columns)
)
stats = (missing
.withColumnRenamed('n_missing', 'cnt')
.collect() # FAIL 1
)
# df = df.persist() # PERSIST TRY 2
print(df.count()) # FAIL 2
I initially assigned 1G spark.driver.memoryand 4G spark.executor.memory, eventually increasing spark.driver.memoryto 10G.
Problem (s):
The loop works like a charm during the first iterations, but towards the end, around the 6th or 7th iterations, I see that my processor load is decreasing (using 1 instead of 6 cores). Along with this, the execution time for one iteration is significantly increased. At some point, I get an OutOfMemory error:
spark.driver.memory < 4G: at collect() (FAIL 1)4G <= spark.driver.memory < 10G: count() (FAIL 2)
FAIL 1 ( ):
[...]
py4j.protocol.Py4JJavaError: An error occurred while calling o1061.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
[...]
( , < 50k
< 1G ). Spark
(app-XXX.driver.BlockManager.memory.memUsed_MB) :
600M 1200M , a > 300M .
( , 2G , .)
, (
a dict(), ).
:
- OutOfMemory
?
- 10G
spark.driver.memory, ?
() , , :
- OOM, -
(b/c )?
count() OOM - ,
exector (s) ( )?- ( , )
?
BTW: Spark 2.1.0 .
2017-04-28
, :
cfg = SparkConfig()
cfg.set('spark.driver.extraJavaOptions', '-XX:+HeapDumpOnOutOfMemoryError')
8G spark.driver.memory,
Eclipse MAT. , (~ 4G ):
java.lang.Thread
- char (2G)
- scala.collection.IndexedSeqLike
- scala.collection.mutable.WrappedArray (1G)
- java.lang.String (1G)
org.apache.spark.sql.execution.ui.SQLListener
- org.apache.spark.sql.execution.ui.SQLExecutionUIData
(various of up to 1G in size)
- java.lang.String
- ...
,
cfg.set('spark.ui.enabled', 'false')
, OOM. ,
,
cfg.set('spark.ui.retainedJobs', '1')
cfg.set('spark.ui.retainedStages', '1')
cfg.set('spark.ui.retainedTasks', '1')
cfg.set('spark.sql.ui.retainedExecutions', '1')
cfg.set('spark.ui.retainedDeadExecutors', '1')
.
2017-05-18
Spark pyspark.sql.DataFrame.checkpoint. persist, dataframe. , .