AWS connection timeout when running Spark job on EMR

I am trying to introduce a simple spark in an Amazon EMR cluster. My cluster has 5 instances of M4.2xlarge (1 master, 4 slaves), each of which has 16 vCPUs and 32 gigabytes of memory.

This is my code:

def main(args : Array[String]): Unit = { val sparkConfig = new SparkConf() .set("hive.exec.dynamic.partition", "true") .set("hive.exec.dynamic.partition.mode", "nonstrict") .set("hive.s3.max-client-retries", "50") .set("hive.s3.max-error-retries", "50") .set("hive.s3.max-connections", "100") .set("hive.s3.connect-timeout", "5m") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .set("spark.kryo.classesToRegister", "org.apache.spark.graphx.impl.VertexAttributeBlock") .set("spark.broadcast.compress", "true") val spark = SparkSession.builder() .appName("Spark Hive Example") .enableHiveSupport() .config(sparkConfig) .getOrCreate() // Set Kryo for serializing GraphXUtils.registerKryoClasses(sparkConfig) val res = spark.sql("SELECT col1, col2, col3 FROM table1 limit 10000") val edgesRDD = res.rdd.map(row => Edge(row.getString(0).hashCode, row.getString(1).hashCode, row(2).asInstanceOf[String])) val res_two = spark.sql("SELECT col1 FROM table2 where col1 is not NULL and col1 != '' limit 100000") val vertexRDD: RDD[(VertexId, String)] = res_two.rdd.map(row => (row.getString(0).hashCode, row(0).asInstanceOf[String])) val graph = Graph(vertexRDD, edgesRDD) val connectedComponents = graph.connectedComponents().vertices 

Both tables1 and table2 are external tables supported by S3 on the bush. When I run this program, my work fails with the following error:

 Job aborted due to stage failure: Task 827 in stage 0.0 failed 4 times, most recent failure: Lost task 827.3 in stage 0.0 (TID 921, xxx.internal, executor 3): com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1069) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1035) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24) at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94) at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211) at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy35.retrieveMetadata(Unknown Source) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:1194) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:773) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:166) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:355) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316) at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:237) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getReader(OrcInputFormat.java:1204) at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1113) at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:246) at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:245) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:203) at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:286) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:263) at sun.reflect.GeneratedMethodAccessor19.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy37.get(Unknown Source) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:184) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1190) at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030) ... 59 more 

Not sure if it comes from hadoop or when reading from the hive, but I saw a similar problem here , so I added the following options in my spark-submit command:

 --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" --conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.sessionCacheSize=1000 -Djavax.net.ssl.sessionCacheTimeout=60" 

Still not working. Does anyone know what is going on?

+5
source share
2 answers

TL; DR: the property to be set is fs.s3.maxConnections in the emrfs-site.xml configuration file. By default, it is 50. We got exactly the same error / stack trace as you did, so I set it to 5000, which fixed the problem and had no harmful effects.

From what I can tell, the root cause are InputFormat implementations that misuse try ... finally, to ensure that connections are closed when exceptions occur. It is noteworthy that older versions of Hive, including v1.2.1, compiled by Spark, demonstrate this error. Hive 2.x is massively reorganizing OrcInputFormat, although I have not yet confirmed that the error is fixed, and I do not know when / when / how you can compile Spark against Hive 2.x.

A workaround increases the size of the connection pool, as suggested in another answer, but both the property and its location are very different from the "classic" S3 file systems (s3 / s3a / s3n). Of course, this is not documented anywhere, and the emrfs jar needs to be decompiled to tease ...

+3
source

I do not use EMRFS, but I know that other sparks / hadoop S3 clients use a pool of http connections for their requests to S3, and the messages “timeout waiting for pool” invariably mean “pool is not large” is enough. See if you can you will find out what emrfs options are designed to increase the size of the pool.You need at least one for each workflow running in your process, and I would double it in the hope that emrfs will parallelize the loading of blocks as the s3a client does.

0
source

Source: https://habr.com/ru/post/1271377/


All Articles