Slow or incomplete saveAparquetFile from EMR Spark to S3

I have a piece of code that creates a DataFrame and saved to S3. Below, a DataFrame of 1000 rows and 100 columns filled with math.Random . I run this in a cluster with 4 x r3.8xlarge work nodes and configure a lot of memory. I tried with the maximum number of performers and one performer per node.

 // create some random data for performance and scalability testing val df = sqlContext.range(0,1000).map(x => Row.fromSeq((1 to 100).map(y => math.Random))) df.saveAsParquetFile("s3://kirk/my_file.parquet") 

My problem is that I can create a much larger DataFrame in memory than I can save on S3.

For example, you can build and query 1 billion rows and 1000 columns, but 100 million rows and 100 columns fail when I write in S3 this way. I don't get great messages from the Spark context, but the job will fail because too many tasks have failed.

Is there any configuration to save the file more efficiently? Should I configure Spark differently to saveAsParquetFile ?

This is a stacktrace from an artist:

 15/09/09 18:10:26 ERROR sources.InsertIntoHadoopFsRelation: Aborting task. java.lang.OutOfMemoryError: Java heap space at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87) at parquet.column.values.dictionary.IntList.<init>(IntList.java:83) at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85) at parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549) at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88) at parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74) at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68) at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56) at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178) at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369) at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108) at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94) at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282) at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252) at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83) at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229) at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470) at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 
+1
source share
1 answer

I think you need to redistribute your framework (you must have at least numberOfWorkerInstances * numberOfCoresOnEachInstance number of partitions) to allow concurrent writing to S3.

0
source

Source: https://habr.com/ru/post/1262807/


All Articles