How to Implement Exactly Once When Spark Streaming + Kafka Integration

I have a question. There is a guide on how to implement exactly one, here is the code: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))
 //=====================================================
 //separate line
 //=====================================================
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

But what if I want to use the 'reduceByKeyAndWindow' int on a separate line, like this:

 val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)


val lines: DStream[String] = stream.map(record => record.value)
lines.map(row => {
  (row.split(",")(1), 1)
}).reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(30), Seconds(5))
  .foreachRDD(rdd => {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //mycode start
    rdd.foreach(println)
    //mycaode end
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  })

I am trying to do this, but I have an error:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges

Anyhelp? Thanks in advance!

+4
source share

Source: https://habr.com/ru/post/1681011/


All Articles