Spark Scala Split data into equal number of rows

I have a Dataframe and want to split it into an equal number of rows.

In other words, I need a list of dataframes where each one is an unrelated subset of the original frame.

Let's say the input dataframer is as follows:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+

I want to split it into K-equal sizes. If k = 4, then the following results are possible:

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  106|[15.5097750043269...|
  |15.509775004326936|          0|  107|[15.5097750043269...|
  |15.509775004326936|          0|  110|[15.5097750043269...|
  |15.509775004326936|          0|  111|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+

  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  104|[15.5097750043269...|
  |15.509775004326936|          0|  108|[15.5097750043269...|
  |15.509775004326936|          0|  112|[15.5097750043269...|
  |15.509775004326936|          0|  114|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  100|[15.5097750043269...|
  |15.509775004326936|          0|  105|[15.5097750043269...|
  |15.509775004326936|          0|  109|[15.5097750043269...|
  |15.509775004326936|          0|  115|[15.5097750043269...|
  +------------------+-----------+-----+--------------------+


  +------------------+-----------+-----+--------------------+
  |         eventName|original_dt|count|            features|
  +------------------+-----------+-----+--------------------+
  |15.509775004326936|          0|  101|[15.5097750043269...|
  |15.509775004326936|          0|  102|[15.5097750043269...|
  |15.509775004326936|          0|  103|[15.5097750043269...|
  |15.509775004326936|          0|  113|[15.5097750043269...|
  | 43.01955000865387|          0|  116|[43.0195500086538...|
  +------------------+-----------+-----+--------------------+
+7
source share
3 answers

According to my understanding, with your input and the required output, you can create row numberson grouping dataframewith one groupId.

filter dataframe row number storing .

. .

val k = 4

val windowSpec = Window.partitionBy("grouped").orderBy("original_dt")

val newDF = dataFrame.withColumn("grouped", lit("grouping"))

var latestDF = newDF.withColumn("row", row_number() over windowSpec)

val totalCount = latestDF.count()
var lowLimit = 0
var highLimit = lowLimit + k

while(lowLimit < totalCount){
  latestDF.where(s"row <= ${highLimit} and row > ${lowLimit}").show(false)
  lowLimit = lowLimit + k
  highLimit = highLimit + k
}

, .

+3

. Dataframes, . , .

var numberOfNew = 4
var input = List(1,2,3,4,5,6,7,8,9).toDF
var newFrames = 0 to numberOfNew map (_ => Seq.empty[Int].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt

while (size > 0) {
    newFrames(numberOfNew) = input.limit(limit)
    input = input.except(newFrames(numberOfNew))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

+-----+
|value|
+-----+
|    7|
+-----+

+-----+
|value|
+-----+
|    4|
|    8|
+-----+

+-----+
|value|
+-----+
|    5|
|    9|
+-----+

...
+2

This is Steffen Schmitz's improved answer, which is actually incorrect. I improved it for posterity and summarized it. However, I really wonder about scale performance.

var numberOfNew = 4  
var input = Seq((1,2),(3,4),(5,6),(7,8),(9,10),(11,12)).toDF
var newFrames = 0 to numberOfNew-1 map (_ => Seq.empty[(Int, Int)].toDF) toArray
var size = input.count();
val limit = (size / numberOfNew).toInt
val limit_fract = (size / numberOfNew.toFloat)
val residual = ((limit_fract.toDouble - limit.toDouble) * size).toInt
var limit_to_use = limit

while (numberOfNew > 0) {

    if (numberOfNew == 1 && residual != 0) limit_to_use = residual  

    newFrames(numberOfNew-1) = input.limit(limit_to_use)
    input = input.except(newFrames(numberOfNew-1))
    size = size - limit
    numberOfNew = numberOfNew - 1
}

newFrames.foreach(_.show)

val singleDF = newFrames.reduce(_ union _)
singleDF.show(false)

returns individual data frames:

+---+---+
| _1| _2|
+---+---+
|  7|  8|
|  3|  4|
| 11| 12|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  5|  6|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  9| 10|
+---+---+

+---+---+
| _1| _2|
+---+---+
|  1|  2|
+---+---+
+1
source

Source: https://habr.com/ru/post/1677769/