I recently tested the Spark Streaming app. Stress testing absorbs around 20,000 messages per second with message sizes ranging between 200 bytes - 1K in Kafka, where Spark Streaming reads batches every 4 seconds.
Our Spark cluster runs on version 1.6.1 with a stand-alone cluster manager, and we use Scala 2.10.6 for our code.
After about 15-20 hours, one of the executors who initiates the breakpoint (runs at intervals of 40 seconds) gets stuck with the following stack trace and never completes:
java.net.SocketInputStream.socketRead0 (native method) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketInputStream.java : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java∗93) sun.security.ssl.InputRecord.read (InputRecord.java Decor32 ) sun.security.ssl.SSLSocketImpl.readRecord (SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake (SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake (SSLSocket40pl3java .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java//33) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntryjava. apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect (DefaultRequestDirector.java:610) org.apache.http.efplcl. execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java82) apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest (RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestStorageService.perform 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageJervice.79.79. service.StorageService.getObjectDetails (StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails (StorageService.java∗75) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Java) reflect.GeneratedMethodAccessor32.invoke (Unknown source) sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop. io.retry.RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (Unknown Source) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus (NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache. spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.Apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.Apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run (Task.scala: 89) org.apache.spark. executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) lang.Thread.run (Thread.java:745)
, , , "".
, streaming-job-executor-0 , :
java.lang.Object.wait( ) java.lang.Object.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala: 73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob(SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob(SparkContext.scala: 1845) org.apache.spark.SparkContext.runJob(SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala: 58) org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $$ anonfun $doCheckpoint $1.Apply $ $ (RDD.scala: 1682) org.apache.spark.rdd.RDD $$ anonfun $doCheckpoint $1.Apply(RDD.scala: 1679) org.apache.spark.rdd.RDD $$ anonfun $doCheckpoint $1.Apply(RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $.withScope(RDDOperationScope.scala: 150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala: 1678) org.apache.spark.rdd.RDD $$ anonfun $doCheckpoint $1 $$ anonfun $ $MCV $ $1.Apply(RDD.scala: 1684) org.apache.spark.rdd.RDD $$ anonfun $doCheckpoint $1 $$ anonfun $ $MCV $ $1.Apply(RDD.scala: 1684) scala.collection.immutable.List.foreach(List.scala: 318)
- ?