How to aggregate while dragging and dropping with groups in Spark

I have some data that I want to group by a specific column, and then combine a series of fields based on a time window from a group.

Here are some sample data:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                            Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                            Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                            Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                            Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                            Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])

I want to group with group_byand then create time windows that start from the earliest date and last up to 30 days without recording for this group. After these 30 days have ended, the next time window will start from the date of the next line that did not fall into the previous window.

Then I want to aggregate, for example, to get the average value get_avgand the first result get_first.

So the output for this example should be:

group_by    first date of window    get_avg  get_first
group1      2016-01-01              5        1
group2      2016-02-01              20       3
group2      2016-04-02              8        4

: , , . , 30 . 2.

+6
1

:

. :

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

:

w = Window.partitionBy("group_by").orderBy("date")

date DateType:

df_ = df.withColumn("date", col("date").cast("date"))

:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

subgroup :

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")
+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
|  group1|       0|         5.0|
|  group2|       0|        20.0|
|  group2|       1|         8.0|
+--------+--------+------------+

first , , min. .

Spark 2.1. Window Spark.

( )

Spark 2.0 a Window :

, . , , . 12:05 [12: 05,12: 10), [12: 00,12: 05].

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

,

+---------------------------------------------+-----+
|window                                       |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
+---------------------------------------------+-----+

, .

+14

Source: https://habr.com/ru/post/1667095/


All Articles