I am processing HTTP session logs in Spark SQL. This task lends itself to distributed processing, because after all the events for an HTTP session are grouped into the same node in the cluster, they can all be processed locally.
However, I am in one of two traps: either I perform operations, such as join, groupBy or Window, which require exchange between nodes, or I write a giant UDF in normal Scala.
Is there a way to work with each group as a small DataFrame or data set on one node without any sub-group broadcast to other nodes?
Example: consider this framework in a question from Ramesh :
val df = (Seq((1, "a", "10"),(1,"b", "12"),(1,"c", "13"),(2, "a", "14"),
(2,"c", "11"),(1,"b","12" ),(2, "c", "12"),(3,"r", "11")).
toDF("col1", "col2", "col3"))
And this response code to summarize by groupBy ("col1"), as well as summarize by groupBy ("col1", "col2"):
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"col1")
(df.groupBy("col1", "col2").agg(sum($"col3").as("sum_level2")).
withColumn("sum_level1", sum($"sum_level2").over(w)).show)
When I execute this code, I see two exchanges: one for groupBy ("col1", "col2") and one for the window above "col1".
Is there a way to avoid a second exchange? It seems that after you have done groupBy ("col1"), you should be able to do sub-groupBy for "col2" in the results of column 1 and avoid sharing alltogether.