I want to bring spark and sparking to kafka exactly once. But, as the dock says, "Output operations (for example, foreachRDD) have at least once semantics, that is, converted data can be written to an external object more than once in the event of a worker failure.".
To perform transactional updates, the spark recommends using the batch time (available in foreachRDD) and the RDD section index to create the identifier. This identifier uniquely identifies blob data in a streaming application. Code below:
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val **uniqueId** = generateUniqueId(time.milliseconds, partitionId)
}
}
But how to use uniqueId in kafka to make transactional transactions.
thank