Spark: Size exceeds Integer.MAX_VALUE When merging two large DFs

People,

I encounter this problem while I try to combine 2 large data frames (100 GB + each) in spark mode with one key identifier per line.

I am using Spark 1.6 for EMR, and here is what I do:

val df1 = sqlContext.read.json("hdfs:///df1/") val df2 = sqlContext.read.json("hdfs:///df2/") // clean up and filter steps later df1.registerTempTable("df1") df2.registerTempTable("df2") val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4") df3.write.json("hdfs:///df3/") 

This is basically the essence of what I am doing, among other cleaning and filtering steps between them, in order to eventually join df1 and df2.

The error I see is:

 java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 

Configuration and links:
I use 13 node 60 GB each cluster with installed executors and driver memory with overhead, respectively. Things I tried to configure:

  • spark.sql.broadcastTimeout
  • spark.sql.shuffle.partitions

I also tried using a large cluster, but did not help. This link says that if the Shuffle partition is larger than 2 GB, this error occurs. But I tried to increase the number of partitions to a very high value, but no luck.

I suspect this may be due to lazy loading. When I do 10 operations on DF, they are performed only in the last step. I tried adding .persist() at different storage levels for DF, but that fails. I also tried dropping temporary tables by emptying all previous DFs for cleaning.

However, the code works if I break it into two parts - writing the last temporary data (2 frames of data) to disk, exiting. Restart to join only two DFs.

I used to get this error:

 Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

But when I adjusted spark.sql.broadcastTimeout , I started getting the first error.

Thank any help in this case. If necessary, I can add additional information.

+5
source share
1 answer

In sparking, there is a shuffle block larger than 2 GB. This is because, Spark stores locks as ByteBuffer . Here is how you distribute it:

ByteBuffer.allocate(int capacity)

Like, ByteBuffers are limited to Integer.MAX_SIZE (2 GB), so shuffle blocks! The solution is to increase the number of partitions either with spark.sql.shuffle.partitions in SparkSQL, or with rdd.partition() or rdd.colease() for rdd, so that the size of each partition is <= 2GB.

You mentioned that you tried to increase the number of partitions, but still failed. Can you check the partition size> 2 GB. Just make sure that the number of partitions indicated is enough to make each block size <2 GB

+4
source

Source: https://habr.com/ru/post/1260344/


All Articles