How to find the first nonzero values ​​in groups? (secondary sorting using api dataset)

I am working on a dataset that represents an event stream (for example, it runs as event tracking from a website). All events have a timestamp. In one case, we often try to find the first non-empty value for a given field. So, for example, something similar to us most of all:

val eventsDf = spark.read.json(jsonEventsPath) 

case class ProjectedFields(visitId: String, userId: Int, timestamp: Long ... )

val projectedEventsDs = eventsDf.select(
    eventsDf("message.visit.id").alias("visitId"),
    eventsDf("message.property.user_id").alias("userId"),
    eventsDf("message.property.timestamp"),

    ...

).as[ProjectedFields]

projectedEventsDs.groupBy($"visitId").agg(first($"userId", true))

The problem with the code above is that the order of the data supplied to this aggregation function firstis not guaranteed. I would like it to be sorted using timestamp, to ensure that it is the 1st non-zero userId by timestamp, and not an arbitrary random invalid userId.

Is there a way to define sorting within a grouping?

Using Spark 2.10


BTW, , Spark 2.10 SPARK DataFrame: - - , , :

case class OrderedKeyValue(key: String, value: String, ordering: Int)
val ds = Seq(
  OrderedKeyValue("a", null, 1), 
  OrderedKeyValue("a", null, 2), 
  OrderedKeyValue("a", "x", 3), 
  OrderedKeyValue("a", "y", 4), 
  OrderedKeyValue("a", null, 5)
).toDS()

ds.orderBy("ordering").groupBy("key").agg(first("value", true)).collect()

Array([a,y]), Array([a,x])

+4
1

(... , !)

import org.apache.spark.sql.expressions.Window
val byKeyOrderByOrdering = Window
  .partitionBy("key")
  .orderBy("ordering")
  .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

import org.apache.spark.sql.functions.first
val firsts = ds.withColumn("first",
  first("value", ignoreNulls = true) over byKeyOrderByOrdering)

scala> firsts.show
+---+-----+--------+-----+
|key|value|ordering|first|
+---+-----+--------+-----+
|  a| null|       1|    x|
|  a| null|       2|    x|
|  a|    x|       3|    x|
|  a|    y|       4|    x|
|  a| null|       5|    x|
+---+-----+--------+-----+

. , Spark 2.2.0-SNAPSHOT ( ) rangeBetween, , , .

+2

Source: https://habr.com/ru/post/1673060/


All Articles