Everything,
I am trying to use Kinesis with Spark Streaming on Spark 1.6.0 through Databricks, and my ssc.start() command is hanging.
I use the following function to create a Spark Streaming context:
def creatingFunc(sc: SparkContext): StreamingContext = {
However, when I run the following to start the streaming context:
// Stop any existing StreamingContext val stopActiveContext = true if (stopActiveContext) { StreamingContext.getActive.foreach { _.stop(stopSparkContext = false) } } // Get or create a streaming context. val ssc = StreamingContext.getActiveOrCreate(() => main.creatingFunc(sc)) // This starts the streaming context in the background. ssc.start()
The last bit, ssc.start() , hangs indefinitely without causing any log messages. I am running this in a newly grouped cluster without any other connected laptops, so there are no other streaming contexts.
Any thoughts?
Also, here are the libraries I use (from my build.sbt file):
"org.apache.spark" % "spark-core_2.10" % "1.6.0" "org.apache.spark" % "spark-sql_2.10" % "1.6.0" "org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0" "org.apache.spark" % "spark-streaming_2.10" % "1.6.0"
Edit 1: After a recent code run, I got the following error:
java.rmi.RemoteException: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours; nested exception is: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:71) at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:40) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.listStatus(DatabricksFileSystem.scala:189) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:228) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForDriver(WriteAheadLogUtils.scala:98) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:254) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:252) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.createWriteAheadLog(ReceivedBlockTracker.scala:252) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:75) at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) Caused by: java.util.concurrent.TimeoutException: Timed out retrying send to http://10.210.224.74:7070: 2 hours at com.databricks.rpc.ReliableJettyClient.retryOnNetworkError(ReliableJettyClient.scala:138) at com.databricks.rpc.ReliableJettyClient.sendIdempotent(ReliableJettyClient.scala:46) at com.databricks.backend.daemon.data.client.DbfsClient.doSend(DbfsClient.scala:83) at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:60) at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:40) at com.databricks.backend.daemon.data.client.DatabricksFileSystem.listStatus(DatabricksFileSystem.scala:189) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.initializeOrRecover(FileBasedWriteAheadLog.scala:228) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.<init>(FileBasedWriteAheadLog.scala:72) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at org.apache.spark.streaming.util.WriteAheadLogUtils$$anonfun$2.apply(WriteAheadLogUtils.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLog(WriteAheadLogUtils.scala:140) at org.apache.spark.streaming.util.WriteAheadLogUtils$.createLogForDriver(WriteAheadLogUtils.scala:98) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:254) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker$$anonfun$createWriteAheadLog$1.apply(ReceivedBlockTracker.scala:252) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.createWriteAheadLog(ReceivedBlockTracker.scala:252) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.<init>(ReceivedBlockTracker.scala:75) at org.apache.spark.streaming.scheduler.ReceiverTracker.<init>(ReceiverTracker.scala:106) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:80) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:610) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:606) at org.apache.spark.util.ThreadUtils$$anon$1.run(ThreadUtils.scala:122)
source share