Spark scala - how to do count () by conditioning on two lines

I'm new to scala sparks and apologize for the stupid question (if so). I am stuck in a problem that I simplified as shown below:

There is a data frame with three columns, "machine identifier" is the identifier of the machine. "startTime" is the timestamp of the start of the task. "endTime" is the timestamp of the end of the task.

My goal is to count the number of downtime intervals for each machine.
For example, the photographs in the table below, the 1st and 2nd rows show that machine No. 1 starts at time 0 and ends at time 3 and starts again at time 4, so the time interval [3, 4] is idle. For the 3rd and 4th rows, machine No. 1 starts at time 10 and ends at the 20th time and starts again immediately, so there is no downtime.

machineID, startTime, endTime  
1, 0, 3  
1, 4, 8  
1, 10, 20  
1, 20, 31  
...  
1, 412, 578  
...  
2, 231, 311  
2, 781, 790  
...  

The data frame was already groupBy ("machineID").
I am using spark 2.0.1 and scala 2.11.8

+4
source share
1 answer

/ DataFrame Window. lag , machineId.

import org.apache.spark.sql.expressions.Window

// Dataframe Schema
case class MachineData(id:String, start:Int, end:Int)
// Sample Data
machineDF.show
+---+-----+---+
| id|start|end|
+---+-----+---+
|  1|    0|  3|
|  1|    4|  8|
|  1|   10| 20|
|  1|   20| 31|
|  1|  412|578|
|  2|  231|311|
|  2|  781|790|
+---+-----+---+


// define the window as a partition over machineId, ordered by start (time)
val byMachine = Window.partitionBy($"id").orderBy($"start")
// we define a new column, "previous end" using the Lag Window function over the previously defined window
val prevEnd = lag($"end", 1).over(byMachine)

// new DF with the prevEnd column
val withPrevEnd = machineDF.withColumn("prevEnd", prevEnd)
withPrevEnd.show

+---+-----+---+-------+
| id|start|end|prevEnd|
+---+-----+---+-------+
|  1|    0|  3|   null|
|  1|    4|  8|      3|
|  1|   10| 20|      8|
|  1|   20| 31|     20|
|  1|  412|578|     31|
|  2|  231|311|   null|
|  2|  781|790|    311|
+---+-----+---+-------+

// we're calculating the idle intervals as the numerical diff as an example
val idleIntervals = withPrevEnd.withColumn("diff", $"start"-$"prevEnd")
idleIntervals.show

+---+-----+---+-------+----+
| id|start|end|prevEnd|diff|
+---+-----+---+-------+----+
|  1|    0|  3|   null|null|
|  1|    4|  8|      3|   1|
|  1|   10| 20|      8|   2|
|  1|   20| 31|     20|   0|
|  1|  412|578|     31| 381|
|  2|  231|311|   null|null|
|  2|  781|790|    311| 470|
+---+-----+---+-------+----+

// to calculate the total, we are summing over the differences. Adapt this as your business logic requires.
val totalIdleIntervals = idleIntervals.select($"id",$"diff").groupBy($"id").agg(sum("diff"))

+---+---------+
| id|sum(diff)|
+---+---------+
|  1|      384|
|  2|      470|
+---+---------+
+4

Source: https://habr.com/ru/post/1659433/


All Articles