Say I have a data structure like this, where ts is some timestamp
case class Record(ts: Long, id: Int, value: Int)
Given the large number of these records, I want to end the record with the highest timestamp for each identifier. Using the RDD api, I think the following code does its job:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Similarly, this is my attempt with datasets:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
I am trying to figure out how to achieve something similar using dataframes, but to no avail - I understand that I can group with:
records.groupBy($"id")
But this gives me a RelationGroupedDataSet, and I don’t understand what aggregation function I need to write to achieve what I want - all the approximate clusters that I saw focus on returning only one column, aggregated, and not the whole row.
Is it possible to achieve this with data frames?