lag Window .
dataframe
+---+-----+----+----------+
|id |name |code|date |
+---+-----+----+----------+
|1 |Shaun|B121|2012-03-21|
|2 |Shaun|B121|2011-02-14|
|3 |Shaun|B120|2010-10-19|
|4 |John |B121|2011-09-29|
|5 |John |B121|2013-09-29|
+---+-----+----+----------+
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("name").orderBy("code", "date")
df.withColumn("temp", first("code").over(windowSpec))
.withColumn("level", when($"temp" === "B120" && $"code" === "B121", 1).otherwise(when($"temp" === "B120" && $"code" === "B120", 0).otherwise(lit(2))))
.drop("temp")
+---+-----+----+----------+-----+
|id |name |code|date |level|
+---+-----+----+----------+-----+
|3 |Shaun|B120|2010-10-19|0 |
|2 |Shaun|B121|2011-02-14|1 |
|1 |Shaun|B121|2012-03-21|1 |
|4 |John |B121|2011-09-29|2 |
|5 |John |B121|2013-09-29|2 |
+---+-----+----+----------+-----+
dataframe
+---+-----+----+----------+
|id |name |code|date |
+---+-----+----+----------+
|1 |Shaun|B121|2012-03-21|
|2 |Shaun|B121|2011-02-14|
|3 |Shaun|B120|2010-10-29|
|4 |John |B121|2011-09-29|
|5 |John |B120|2011-09-30|
|6 |John |B111|2012-09-30|
|7 |John |B121|2013-09-29|
+---+-----+----+----------+
udf rquirement
import org.apache.spark.sql.functions._
def updateLevel = udf((array: mutable.WrappedArray[mutable.WrappedArray[String]]) => {
val containsB120 = array.filter(ar => ar.contains("B120")).map(ar => (ar(1), ar(2)))
var code = ""
var date = "1970-01-01"
if(containsB120.size > 0) {
code = containsB120(0)._1
date = containsB120(0)._2
}
val returnArray = array.map(row => {
println(row(2), date, code)
if(java.sql.Date.valueOf(row(2)).getTime > java.sql.Date.valueOf(date).getTime && code == "B120" && row(1) == "B121") {
Array(row(0).toString, row(1).toString, row(2).toString, "1")
}
else if(java.sql.Date.valueOf(row(2)).getTime <= java.sql.Date.valueOf(date).getTime && row(1) == "B121" ) {
Array(row(0).toString, row(1).toString, row(2).toString, "2")
}
else {
Array(row(0).toString, row(1).toString, row(2).toString, "0")
}
})
returnArray
})
udf function array collect_list udf explode, dataframe.
df.orderBy("date").withColumn("tempArray", array("id", "code", "date"))
.groupBy("name")
.agg(collect_list("tempArray").as("tempArray"))
.withColumn("tempArray", explode(updateLevel($"tempArray")))
.select($"tempArray"(0).as("id"), $"name", $"tempArray"(1).as("code"), $"tempArray"(2).as("date"), $"tempArray"(3).as("level"))
dataframe
+---+-----+----+----------+-----+
|id |name |code|date |level|
+---+-----+----+----------+-----+
|3 |Shaun|B120|2010-10-29|0 |
|2 |Shaun|B121|2011-02-14|1 |
|1 |Shaun|B121|2012-03-21|1 |
|4 |John |B121|2011-09-29|2 |
|5 |John |B120|2011-09-30|0 |
|6 |John |B111|2012-09-30|0 |
|7 |John |B121|2013-09-29|1 |
+---+-----+----+----------+-----+
,