Spark Streaming context starts to freeze

Everything,

I am trying to use Kinesis with Spark Streaming on Spark 1.6.0 through Databricks, and my ssc.start() command is hanging.

I use the following function to create a Spark Streaming context:

 def creatingFunc(sc: SparkContext): StreamingContext = { // Create a StreamingContext val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) // Creata a Kinesis stream val kinesisStream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisStreamName, kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName, InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds), StorageLevel.MEMORY_AND_DISK_SER_2, config.awsAccessKeyId, config.awsSecretKey) kinesisStream.print() ssc.remember(Minutes(1)) ssc.checkpoint(checkpointDir) ssc } 

However, when I run the following to start the streaming context:

 // Stop any existing StreamingContext val stopActiveContext = true if (stopActiveContext) { StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } } // Get or create a streaming context. val ssc = StreamingContext.getActiveOrCreate(() => main.creatingFunc(sc)) // This starts the streaming context in the background. ssc.start() 

The last bit, ssc.start() , hangs indefinitely without causing any log messages. I am running this in a newly grouped cluster without any other connected laptops, so there are no other streaming contexts.

Any thoughts?

Also, here are the libraries I use (from my build.sbt file):

 "org.apache.spark" % "spark-core_2.10" % "1.6.0" "org.apache.spark" % "spark-sql_2.10" % "1.6.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0" "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" 

Edit 1: After a recent code run, I got the following error:

 java.rmi.RemoteException: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours; nested exception is: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:71) at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:40) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.listStatus(DatabricksFileSystem.scala:189) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:228) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForDriver(WriteAheadLogUtils.scala:98) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:254) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:252) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.createWriteAheadLog(ReceivedBlockTracker.scala:252) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:75) at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) Caused by: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours at com.databricks.rpc.ReliableJettyClient.retryOnNetworkError(ReliableJettyClient.scala:138) at com.databricks.rpc.ReliableJettyClient.sendIdempotent(ReliableJettyClient.scala:46) at com.databricks.backend.daemon.data.client.DbfsClient.doSend(DbfsClient.scala:83) at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:60) at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:40) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.listStatus(DatabricksFileSystem.scala:189) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:228) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForDriver(WriteAheadLogUtils.scala:98) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:254) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:252) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.createWriteAheadLog(ReceivedBlockTracker.scala:252) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:75) at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122) 
+5
source share

Source: https://habr.com/ru/post/1247321/


All Articles