Streaming Kafka10 / Spark 2.1.0 / DCOS / Mesosphere
Ugg I spent all day on this and must have read this post a dozen times. I tried spark 2.0.0, 2.0.1, Kafka 8, Kafka 10. Stay away from Kafka 8 and spark 2.0.x, and all is dependencies. Start with below. It is working.
SBT:
"org.apache.hadoop" % "hadoop-aws" % "2.7.3" excludeAll ExclusionRule(organization = "org.apache.hadoop", name = "hadoop-common"), "org.apache.spark" %% "spark-core" % "2.1.0", "org.apache.spark" %% "spark-sql" % "2.1.0" , "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.1.0", "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
Kafka / Spark Streaming working code:
val spark = SparkSession .builder() .appName("ingest") .master("local[4]") .getOrCreate() import spark.implicits._ val ssc = new StreamingContext(spark.sparkContext, Seconds(2)) val topics = Set("water2").toSet val kafkaParams = Map[String, String]( "metadata.broker.list" -> "broker:port,broker:port", "bootstrap.servers" -> "broker:port,broker:port", "group.id" -> "somegroup", "auto.commit.interval.ms" -> "1000", "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "true" ) val messages = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) messages.foreachRDD(rdd => { if (rdd.count() >= 1) { rdd.map(record => (record.key, record.value)) .toDS() .withColumnRenamed("_2", "value") .drop("_1") .show(5, false) println(rdd.getClass) } }) ssc.start() ssc.awaitTermination()
Please, if you see this, I can get some reputation points. :)