SPARK, DataFrame: timestamp column difference by consecutive rows

I have a DateFrame as follows:

+---+---------------------+---------------------+ |id |initDate |endDate | +---+---------------------+---------------------+ |138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0| |138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0| |138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0| |138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0| |138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0| |138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0| |138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0| |138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0| |138|2016-10-04 00:00:00.0|null | +---+---------------------+---------------------+ 

The rows are ordered by the id then initDate in ascending order. Both the initDate and endDate are of type Timestamp. To illustrate, I just showed the entries that belong to the same id value.

My goal is to add a new column, indicating for each id difference (in days) between the initDate each row and the endDate previous row .

If there is no previous line, then the value will be -1.

The result should look like this:

 +---+---------------------+---------------------+----------+ |id |initDate |endDate |difference| +---+---------------------+---------------------+----------+ |138|2016-04-15 00:00:00.0|2016-04-28 00:00:00.0|-1 | |138|2016-05-09 00:00:00.0|2016-05-23 00:00:00.0|11 | |138|2016-06-04 00:00:00.0|2016-06-18 00:00:00.0|12 | |138|2016-06-18 00:00:00.0|2016-07-02 00:00:00.0|0 | |138|2016-07-09 00:00:00.0|2016-07-23 00:00:00.0|7 | |138|2016-07-27 00:00:00.0|2016-08-10 00:00:00.0|4 | |138|2016-08-18 00:00:00.0|2016-09-01 00:00:00.0|8 | |138|2016-09-13 00:00:00.0|2016-09-27 00:00:00.0|12 | |138|2016-10-04 00:00:00.0|null |7 | +---+---------------------+---------------------+----------+ 

I am thinking of using a window function to split records by id , but I don't understand how to take the following steps.

+6
source share
3 answers

Thanks to the @lostInOverflow hint, I came up with the following solution:

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val w = Window.partitionBy("id").orderBy("initDate") val previousEnd = lag($"endDate", 1).over(w) filteredDF.withColumn("prev", previousEnd) .withColumn("difference", datediff($"initDate", $"prev")) 
+4
source

Try:

 import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ val w = Window.partitionBy("id").orderBy("endDate") df.withColumn("difference", date_sub($"initDate", lag($"endDate", 1).over(w))) 
+6
source

Just an addition to the previously good answers, in case someone wants to try with spark sql or on Hive.

 select tab.tran_id,tab.init_date,tab.end_date,coalesce(tab.day_diff,-1) as day_diffrence from (select *,datediff(day,lag(end_date,1) over(partition by tran_id order by init_date) ,init_date) as day_diff from your_table) tab ; 
0
source

Source: https://habr.com/ru/post/1259035/


All Articles