Spark: checking row history in a DataFrame

I have patient data as in this format

+---+-----+----+----------+
| id| name|code|      date|
+---+-----+----+----------+
|  1|Shaun|B121|2012-03-21|
|  3|Shaun|B120|2010-10-29|
|  2|Shaun|B121|2011-02-14|
|  4| John|B121|2011-09-29|
|  5| John|B120|2011-09-30|
|  6| John|B111|2012-09-30|
|  7| John|B121|2013-09-29|
+---+-----+----+----------+

I want to check for each row the code B121, that if in the history the code B120 is applied to the patient or not if the set level is applied to 1 otherwise 2 and level 0 for the lines with code B120. After that, the result should look as follows.

In MySQL, I used the cursor for this.

+---+-----+----+----------+-----+
| id| name|code|      date|level|
+---+-----+----+----------+-----+
|  3|Shaun|B120|2010-10-29|    0|
|  2|Shaun|B121|2011-02-14|    1|
|  1|Shaun|B121|2012-03-21|    1|
|  6| John|B111|2012-09-30|    0|
|  5| John|B120|2011-09-30|    0|
|  4| John|B121|2011-09-29|    2|
|  7| John|B121|2013-09-29|    1|
+---+-----+----+----------+-----+

Edited . I have a new line with code B111, but I want to check the history with code B120.

I tried this solution

val window = Window.partitionBy("name").orderBy("date")
val lagCol = lag(col("date"), 1).over(window)

val pDF = df.withColumn("level", lagCol);

But it gives the following result:

 id   name   code   date         level
 1    Shaun  B121   2012-03-21   2011-02-14
 2    Shaun  B121   2011-02-14   2010-10-19
 3    Shaun  B120   2010-10-19   Null
 5    John   B121   2013-09-29   2011-09-29
 4    John   B121   2011-09-29   Null

It checks the previous line, whether it is the code B120 or B121, but I want to check the previous line with the code B120. I do not know how to use the delay function correctly. How am i doing this?

0
1

lag Window .

dataframe

+---+-----+----+----------+
|id |name |code|date      |
+---+-----+----+----------+
|1  |Shaun|B121|2012-03-21|
|2  |Shaun|B121|2011-02-14|
|3  |Shaun|B120|2010-10-19|
|4  |John |B121|2011-09-29|
|5  |John |B121|2013-09-29|
+---+-----+----+----------+

import org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("name").orderBy("code", "date")

df.withColumn("temp", first("code").over(windowSpec))
    .withColumn("level", when($"temp" === "B120" && $"code" === "B121", 1).otherwise(when($"temp" === "B120" && $"code" === "B120", 0).otherwise(lit(2))))
    .drop("temp")

+---+-----+----+----------+-----+
|id |name |code|date      |level|
+---+-----+----+----------+-----+
|3  |Shaun|B120|2010-10-19|0    |
|2  |Shaun|B121|2011-02-14|1    |
|1  |Shaun|B121|2012-03-21|1    |
|4  |John |B121|2011-09-29|2    |
|5  |John |B121|2013-09-29|2    |
+---+-----+----+----------+-----+

dataframe

+---+-----+----+----------+
|id |name |code|date      |
+---+-----+----+----------+
|1  |Shaun|B121|2012-03-21|
|2  |Shaun|B121|2011-02-14|
|3  |Shaun|B120|2010-10-29|
|4  |John |B121|2011-09-29|
|5  |John |B120|2011-09-30|
|6  |John |B111|2012-09-30|
|7  |John |B121|2013-09-29|
+---+-----+----+----------+

udf rquirement

import org.apache.spark.sql.functions._
def updateLevel = udf((array: mutable.WrappedArray[mutable.WrappedArray[String]]) => {
  val containsB120 = array.filter(ar => ar.contains("B120")).map(ar => (ar(1), ar(2)))
  var code = ""
  var date = "1970-01-01"
  if(containsB120.size > 0) {
    code = containsB120(0)._1
    date = containsB120(0)._2
  }
  val returnArray = array.map(row => {
    println(row(2), date, code)
    if(java.sql.Date.valueOf(row(2)).getTime > java.sql.Date.valueOf(date).getTime && code == "B120" && row(1) == "B121") {
      Array(row(0).toString, row(1).toString, row(2).toString, "1")
    }
    else if(java.sql.Date.valueOf(row(2)).getTime <= java.sql.Date.valueOf(date).getTime && row(1) == "B121" ) {
      Array(row(0).toString, row(1).toString, row(2).toString, "2")
    }
    else {
      Array(row(0).toString, row(1).toString, row(2).toString, "0")
    }
  })
  returnArray
})

udf function array collect_list udf explode, dataframe.

df.orderBy("date").withColumn("tempArray", array("id", "code", "date"))
    .groupBy("name")
    .agg(collect_list("tempArray").as("tempArray"))
    .withColumn("tempArray", explode(updateLevel($"tempArray")))
    .select($"tempArray"(0).as("id"), $"name", $"tempArray"(1).as("code"), $"tempArray"(2).as("date"), $"tempArray"(3).as("level"))

dataframe

+---+-----+----+----------+-----+
|id |name |code|date      |level|
+---+-----+----+----------+-----+
|3  |Shaun|B120|2010-10-29|0    |
|2  |Shaun|B121|2011-02-14|1    |
|1  |Shaun|B121|2012-03-21|1    |
|4  |John |B121|2011-09-29|2    |
|5  |John |B120|2011-09-30|0    |
|6  |John |B111|2012-09-30|0    |
|7  |John |B121|2013-09-29|1    |
+---+-----+----+----------+-----+

,

+4

Source: https://habr.com/ru/post/1682661/


All Articles