People,
I encounter this problem while I try to combine 2 large data frames (100 GB + each) in spark mode with one key identifier per line.
I am using Spark 1.6 for EMR, and here is what I do:
val df1 = sqlContext.read.json("hdfs:///df1/") val df2 = sqlContext.read.json("hdfs:///df2/") // clean up and filter steps later df1.registerTempTable("df1") df2.registerTempTable("df2") val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4") df3.write.json("hdfs:///df3/")
This is basically the essence of what I am doing, among other cleaning and filtering steps between them, in order to eventually join df1 and df2.
The error I see is:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Configuration and links:
I use 13 node 60 GB each cluster with installed executors and driver memory with overhead, respectively. Things I tried to configure:
spark.sql.broadcastTimeoutspark.sql.shuffle.partitions
I also tried using a large cluster, but did not help. This link says that if the Shuffle partition is larger than 2 GB, this error occurs. But I tried to increase the number of partitions to a very high value, but no luck.
I suspect this may be due to lazy loading. When I do 10 operations on DF, they are performed only in the last step. I tried adding .persist() at different storage levels for DF, but that fails. I also tried dropping temporary tables by emptying all previous DFs for cleaning.
However, the code works if I break it into two parts - writing the last temporary data (2 frames of data) to disk, exiting. Restart to join only two DFs.
I used to get this error:
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
But when I adjusted spark.sql.broadcastTimeout , I started getting the first error.
Thank any help in this case. If necessary, I can add additional information.