I run Spark on Amazon EMR in client mode with YARN using pyspark to process data from two input files (200 GB total).
The task combines the data together (using reduceByKey ), makes some maps and filters and saves them in S3 in the Parquet format. Although the work uses Dataframes for storage, all of our actual transformations and actions are performed on RDD.
Note. I have included a detailed account of my current configurations and values โโthat I have already experimented with after the Failures section.
code
Code relating to errors that we see, there is a step reduceByKey . I included a few lines of context to show one previous map function and save operations that actually run reduceByKey on RDD:
# Populate UC Property Type united_rdd = united_rdd.map(converter.convert_uc_property_type(uc_property_type_mappings)) # Reduce by listingIdSha united_rdd = united_rdd.reduceByKey(converter.merge_listings) # Filter by each geoId and write the output to storage schema = convert_struct(ListingRevision) for geo in GEO_NORMALIZATION_ENABLED_GEOS: regional_rdd = (united_rdd.filter(lambda (id_sha, (listing_revision, geo_id)): geo_id == geo) .map(lambda (id_sha, (listing_revision, geo_id)): listing_revision)) regional_df = regional_rdd.map(lambda obj: thrift_to_row(obj, schema)).toDF(schema) # Write to Disk/S3 regional_df.write.format(output_format).mode("overwrite").save(os.path.join(output_dir, geo)) # Write to Mongo (regional_df.write.format("com.mongodb.spark.sql.DefaultSource") .option("spark.mongodb.output.uri", mongo_uri) .option("collection", "{}_{}".format(geo, config.MONGO_OUTPUT_COLLECTION_SUFFIX)) .mode("overwrite").save())
Failures
The described task is not performed due to the fact that the performers have run out of physical memory. Several artists experience this failure, but here is one example printed in the STERR EMR step and also displayed in the Spark History Server interface:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2787 in stage 3.0 failed 4 times, most recent failure: Lost task 2787.3 in stage 3.0 (TID 5792, ip-10-0-10-197.ec2.internal): ExecutorLostFailure (executor 47 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1923) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143) ... 29 more
After discovering this, I delved into separate node YARN and container logs and found a YARN log message using a spike in physical memory usage and java.lang.OutOfMemory exception in container logs (included in the order described below).
Java OutOfMemory Error from container logs:
17/03/28 21:41:44 WARN TransportChannelHandler: Exception in connection from ip-10-0-10-70.ec2.internal/10.0.10.70:7337 java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645) at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228) at io.netty.buffer.PoolArena.allocate(PoolArena.java:212) at io.netty.buffer.PoolArena.allocate(PoolArena.java:132) at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155) at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146) at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107) at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)
YARN Recognition of the use of extreme physical memory:
2017-03-28 21:42:48,986 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 6310 for container-id container_1490736006967_0001_01_000015: 20.3 GB of 20 GB physical memory used; 24.9 GB of 100 GB virtual memory used
In general, I seem to be running out of memory during the shuffle, despite the fact that I allocated more than half of my artistโs memory to a bunch of space and experimented with a wide range of memory and kernel settings. Have I missed anything else that I can try? Based on a few other helpful posts I read ( e.g. ), they are the most common culprits for physical memory problems. Is it possible that this can lead to data skew? I experimented with measuring the distribution of partitions for smaller subsets of data, and it looked fine, but I can't do it with all the data for this job, as it never ends.
Configuration
EMR Spark Submit Team:
spark-submit \ --deploy-mode client /home/hadoop/src/python/uc_spark/data_sources/realtytrac/assr_hist_extractor.py \ --dataset_settings development \ --mongo_uri <Internal Mongo URI> \ --aws_access_key_id <S3 Access Key> \ --aws_secret_key <S3 Secret Key> \ --listing_normalization_server <Internal EC2 Address>:9502
The appropriate configuration of Spark Environment: spark.executor.memory is 8 GB (out of 20 GB of available memory for each artist). spark.yarn.executor.memoryOverhead - 12 GB spark.executor.cores - 1 (The lowest I tried with the hope that it would work.) spark.default.parallelism - 1024 (automatically configured based on other parameters. I tried 4099 to no avail.)
I work with 64 m3.2xlarge machines with a total of 1.41 TB of memory.
NOTE. I experimented with a wide range of values โโfor all memory parameters, except that the driver memory was out of luck.
Update 1
I reorganized my code to use a Dataframe join instead of an RDD union to merge two input files. As soon as I did this, I made two important discoveries:
A rightOuter join, unlike our standard leftOuter , reduces output size but fixes the problem. With this in mind, I am quite sure that we have a small subset of skewed data, excluded the union rightOuter . Unfortunately, I need to do more research to find out if lost data matters; we are still studying.
Using Dataframes led to a more apparent failure in this process:
FetchFailed(BlockManagerId(80, ip-10-0-10-145.ec2.internal, 7337), shuffleId=2, mapId=35, reduceId=435, message= org.apache.spark.shuffle.FetchFailedException: Too large frame: 3095111448 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504) at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877) at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269) Caused by: java.lang.IllegalArgumentException: Too large frame: 3095111448 at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) )
We lose during shuffling due to the fact that one section receives too much data, 3 GB of "frame".
I will spend the rest of the day exploring how to cancel our data, and whether we can do a leftOuter .