Spark Data Sequences

I have a DataFrame in Spark. Looks like that:

+-------+----------+-------+
|  value|     group|     ts|
+-------+----------+-------+
|      A|         X|      1|
|      B|         X|      2|
|      B|         X|      3|
|      D|         X|      4|
|      E|         X|      5|
|      A|         Y|      1|
|      C|         Y|      2|
+-------+----------+-------+

Endgoal: I would like to find the number of sequences A-B-E(a sequence is just a list of subsequent lines). With the added restriction that subsequent parts of the sequence can be maximum nrows from each other. Consider for this example what nequals 2.

Consider the group X. In this case, there is exactly 1 Dbetween Band E(several consecutive ones are Bignored). This means that Band are Edivided into 1 line and therefore there is a sequenceA-B-E

collect_list(), (, ) . , , , ?

Edit:

, dataframe . dataframe (, , ) .

+4
1

@Tim + "AABE"

, , id :

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)

, , Column . , "AABE". : "ABAEXX" ):

def createSeq(m:Int) = split(
  concat(
    (1 to 2*m)
      .map(i => coalesce(lag('value,-i).over(w),lit("")))
  :_*),"A")(0)


val m=2
val tmp = df
  .withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+

- , API Column, UDF

def patternInSeq(m: Int) = udf((str: String) => {
  var notFound = str
    .split("B")
    .filter(_.contains("E"))
    .filter(_.indexOf("E") <= m)
    .isEmpty
  !notFound
})

val res = tmp
  .filter(('value === "A") && (locate("B",'seq) > 0))
  .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
  .filter(patternInSeq(m)('seq))
  .groupBy('group)
  .count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

( )

, , . , ( "ABAE" ) (. ). , , -, ( "Z", )

val df = List(
  (1,"A","X",1),
  (2,"B","X",2),
  (3,"B","X",3),
  (4,"D","X",4),
  (5,"E","X",5),
  (6,"A","Y",1),
  (7,"C","Y",2),
  ( 8,"A","Z",1),
  ( 9,"B","Z",2),
  (10,"D","Z",3),
  (11,"B","Z",4),
  (12,"E","Z",5)
).toDF("id","value","group","ts")

import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,-i).over(w),lit(""))):_*)
def filterPairUdf(m: Int, t: (String,String)) = udf((ar: Array[String]) => {
  val (a,b) = t
  val foundAt = ar
    .dropWhile(_ != a)
    .takeWhile(_ != a)
    .indexOf(b)
  foundAt != -1 && foundAt <= m
})

, ,

def filterSeq(seq: List[String], m: Int)(df: DataFrame): DataFrame = {
  var a = seq(0)
  seq.tail.foldLeft(df){(df: DataFrame, b: String) => {
    val res  = df.filter(filterPairUdf(m,(a,b))('seq))
    a = b
    res
  }}
}

, ,

val m = 2
val tmp = df
  .filter('value === "A") // reduce problem
  .withColumn("seq",createSeq(m))

scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  6|    A|    Y|  1|   [A, C, , , ]|
|  8|    A|    Z|  1|[A, B, D, B, E]|
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

val res = tmp.transform(filterSeq(List("A","B","E"),m))

scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  1|    A|    X|  1|[A, B, B, D, E]|
+---+-----+-----+---+---------------+

(transform DataFrame => DataFrame)

res
  .groupBy('group)
  .count
  .show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

, " " , , , .

+4

Source: https://habr.com/ru/post/1658451/


All Articles