Apache Spark OutOfMemoryError (HeapSpace)

I have a dataset with ~ 5M rows x 20 columns containing the groupID and rowID. My goal is to check if (some) columns contain more than a fixed fraction (say, 50%) of missing (zero) values ​​within the group. If this is found, for this column the entire column is set (missing).

df = spark.read.parquet('path/to/parquet/')
check_columns = {'col1': ..., 'col2': ..., ...}  # currently len(check_columns) = 8

for col, _ in check_columns.items():
    total = (df
             .groupBy('groupID').count()
             .toDF('groupID', 'n_total')
             )

    missing = (df
               .where(F.col(col).isNull())
               .groupBy('groupID').count()
               .toDF('groupID', 'n_missing')
               )
    # count_missing = count_missing.persist()  # PERSIST TRY 1
    # print('col {} found {} missing'.format(col, missing.count()))  # missing.count() is b/w 1k-5k

    poor_df = (total
               .join(missing, 'groupID')
               .withColumn('freq', F.col('n_missing') / F.col('n_total'))
               .where(F.col('freq') > 0.5)
               .select('groupID')
               .toDF('poor_groupID')
               )

    df = (df
          .join(poor_df, df['groupID'] == poor_df['poor_groupID'], 'left_outer')
          .withColumn(col, (F.when(F.col('poor_groupID').isNotNull(), None)
                            .otherwise(df[col])
                            )
                    )
        .select(df.columns)
        )

    stats = (missing
             .withColumnRenamed('n_missing', 'cnt')
             .collect()  # FAIL 1
             )

    # df = df.persist()  # PERSIST TRY 2

print(df.count())  # FAIL 2

I initially assigned 1G spark.driver.memoryand 4G spark.executor.memory, eventually increasing spark.driver.memoryto 10G.

Problem (s): The loop works like a charm during the first iterations, but towards the end, around the 6th or 7th iterations, I see that my processor load is decreasing (using 1 instead of 6 cores). Along with this, the execution time for one iteration is significantly increased. At some point, I get an OutOfMemory error:

  • spark.driver.memory < 4G: at collect() (FAIL 1)
  • 4G <= spark.driver.memory < 10G: count() (FAIL 2)

FAIL 1 ( ):

[...]
py4j.protocol.Py4JJavaError: An error occurred while calling o1061.collectToPython.
: java.lang.OutOfMemoryError: Java heap space
[...]

( , < 50k < 1G ). Spark (app-XXX.driver.BlockManager.memory.memUsed_MB) : 600M 1200M , a > 300M . ( , 2G , .)

, ( a dict(), ).

:

  • OutOfMemory ?
  • 10G spark.driver.memory, ?

() , , :

  • OOM, - (b/c )?
  • count() OOM - , exector (s) ( )?
  • ( , ) ?

BTW: Spark 2.1.0 .

2017-04-28

, :

cfg = SparkConfig()
cfg.set('spark.driver.extraJavaOptions', '-XX:+HeapDumpOnOutOfMemoryError')

8G spark.driver.memory, Eclipse MAT. , (~ 4G ):

java.lang.Thread
    - char (2G)
    - scala.collection.IndexedSeqLike
        - scala.collection.mutable.WrappedArray (1G)
    - java.lang.String (1G)

org.apache.spark.sql.execution.ui.SQLListener
    - org.apache.spark.sql.execution.ui.SQLExecutionUIData 
      (various of up to 1G in size)
        - java.lang.String
    - ...

,

cfg.set('spark.ui.enabled', 'false')

, OOM. , ,

cfg.set('spark.ui.retainedJobs', '1')
cfg.set('spark.ui.retainedStages', '1')
cfg.set('spark.ui.retainedTasks', '1')
cfg.set('spark.sql.ui.retainedExecutions', '1')
cfg.set('spark.ui.retainedDeadExecutors', '1')

.

2017-05-18

Spark pyspark.sql.DataFrame.checkpoint. persist, dataframe. , .

+4

Source: https://habr.com/ru/post/1675721/


All Articles