I am working on a dataset that represents an event stream (for example, it runs as event tracking from a website). All events have a timestamp. In one case, we often try to find the first non-empty value for a given field. So, for example, something similar to us most of all:
val eventsDf = spark.read.json(jsonEventsPath)
case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )
val projectedEventsDs = eventsDf.select(
eventsDf("message.visit.id").alias("visitId"),
eventsDf("message.property.user_id").alias("userId"),
eventsDf("message.property.timestamp"),
...
).as[ProjectedFields]
projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))
The problem with the code above is that the order of the data supplied to this aggregation function firstis not guaranteed. I would like it to be sorted using timestamp, to ensure that it is the 1st non-zero userId by timestamp, and not an arbitrary random invalid userId.
Is there a way to define sorting within a grouping?
Using Spark 2.10
BTW, , Spark 2.10 SPARK DataFrame: - - , , :
case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
OrderedKeyValue("a", null, 1),
OrderedKeyValue("a", null, 2),
OrderedKeyValue("a", "x", 3),
OrderedKeyValue("a", "y", 4),
OrderedKeyValue("a", null, 5)
).toDS()
ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()
Array([a,y]), Array([a,x])