Rearrange / merge DataFrame rows in Spark

I have a DataFrame that looks like this:

scala> data.show +-----+---+---------+ |label| id| features| +-----+---+---------+ | 1.0| 1|[1.0,2.0]| | 0.0| 2|[5.0,6.0]| | 1.0| 1|[3.0,4.0]| | 0.0| 2|[7.0,8.0]| +-----+---+---------+ 

I want to rearrange functions based on "id" to get the following:

 scala> data.show +---------+---+-----------------+ | label| id| features | +---------+---+-----------------+ | 1.0,1.0| 1|[1.0,2.0,3.0,4.0]| | 0.0,0.0| 2|[5.0,6.0,7.8,8.0]| +---------+---+-----------------+ 

This is the code I use to create the mentioned DataFrame

 val rdd = sc.parallelize(List((1.0, 1, Vectors.dense(1.0, 2.0)), (0.0, 2, Vectors.dense(5.0, 6.0)), (1.0, 1, Vectors.dense(3.0, 4.0)), (0.0, 2, Vectors.dense(7.0, 8.0)))) val data = rdd.toDF("label", "id", "features") 

I tried different things with both RDD and DataFrames. The most promising approach so far has been id-based filtering.

 data.filter($"id".equalTo(1)) +-----+---+---------+ |label| id| features| +-----+---+---------+ | 1.0| 1|[1.0,2.0]| | 1.0| 1|[3.0,4.0]| +-----+---+---------+ 

But I have two bottlenecks:

1) How to automate filtering for all the different values ​​that an identifier can have?

The following error is generated:

 data.select("id").distinct.foreach(x => data.filter($"id".equalTo(x))) 

2) How to combine common "functions" in relation to this "id". Didn't try a lot since I'm still stuck on 1)

Any suggestion is more than welcome

Note. For clarification, the "label" is always the same for each occurrence of "id". Sorry for the confusion, a simple extension of my task will also group the β€œtags” (updated example)

+5
source share
1 answer

I believe that there is no effective way to achieve what you want, and additional order requirements do not improve the situation. The cleanest way I can think of is groupByKey :

 import org.apache.spark.mllib.linalg.{Vectors, Vector} import org.apache.spark.sql.functions.monotonicallyIncreasingId import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD val pairs: RDD[((Double, Int), (Long, Vector))] = data // Add row identifiers so we can keep desired order .withColumn("uid", monotonicallyIncreasingId) // Create PairwiseRDD where (label, id) is a key // and (row-id, vector is a value) .map{case Row(label: Double, id: Int, v: Vector, uid: Long) => ((label, id), (uid, v))} val rows = pairs.groupByKey.mapValues(xs => { val vs = xs .toArray .sortBy(_._1) // Sort by row id to keep order .flatMap(_._2.toDense.values) // flatmap vector values Vectors.dense(vs) // return concatenated vectors }).map{case ((label, id), v) => (label, id, v)} // Reshape val grouped = rows.toDF("label", "id", "features") grouped.show // +-----+---+-----------------+ // |label| id| features| // +-----+---+-----------------+ // | 0.0| 2|[5.0,6.0,7.0,8.0]| // | 1.0| 1|[1.0,2.0,3.0,4.0]| // +-----+---+-----------------+ 

It is also possible to use UDAF, similar to the one I proposed for replacing SPARK SQL for the mysql GROUP_CONCAT aggregation function , but it is even less efficient than this.

+6
source

Source: https://habr.com/ru/post/1236848/


All Articles