You can window
execute the count
function and then use inbuilt functions
to get the final data file you want by doing the following
from pyspark.sql import Window windowSpec = Window.partitionBy("c1") from pyspark.sql import functions as F df.withColumn("cnt_orig", count('c1').over(windowSpec)).orderBy('c3').groupBy("c1", "c2", "cnt_orig").agg(first('c3').as('c3')) .withColumn("c2", F.regexp_replace(F.regexp_replace(F.array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : ")) .groupBy("c1", "cnt_orig").agg(F.collect_list("c2").as('map_category_room_date'))
You should get the following result
+---+--------+----------------------+ |c1 |cnt_orig|map_category_room_date| +---+--------+----------------------+ |A |4 |[b : 09:00, c : 22:00]| |b |1 |[c : 09:00] | +---+--------+----------------------+
Scala way
working code to get the desired result in scala is
val windowSpec = Window.partitionBy("c1") df.withColumn("cnt_orig", count("c1").over(windowSpec)).orderBy("c3").groupBy("c1", "c2", "cnt_orig").agg(first("c3").as("c3")) .withColumn("c2", regexp_replace(regexp_replace(array($"c2", $"c3").cast(StringType), "[\\[\\]]", ""), ",", " : ")) .groupBy("c1", "cnt_orig").agg(collect_list("c2").as("map_category_room_date"))
source share