How to get all columns after groupby in dataset <Row> in spark sql 2.1.0

Firstly, I am very new to SPARK

I have millions of records in my dataset, and I wanted to group by column with names and find names having the maximum age. I get the correct results, but I need all the columns in my result set.

Dataset<Row> resultset = studentDataSet.select("*").groupBy("name").max("age");
resultset.show(1000,false);

I get only the name and max (age) in the result set dataset.

+7
source share
6 answers

For your decision you need to try a different approach. You were almost there for a solution, but let me help you understand.

Dataset<Row> resultset = studentDataSet.groupBy("name").max("age");

you can now join resultsetwithstudentDataSet

Dataset<Row> joinedDS = studentDataset.join(resultset, "name");

groupBy , groupBy RelationalGroupedDataset, , sum, min, mean, max .., groupBy

, name max age, , groupBy age, max "age" , age, - max(age).

: - , , ,

+12

, . DataFrames , .

:

val df = Seq(
  ("bob", 20, "blah"),
  ("bob", 40, "blah"),
  ("karen", 21, "hi"),
  ("monica", 43, "candy"),
  ("monica", 99, "water")
).toDF("name", "age", "another_column")

.

df
  .groupBy("name")
  .agg(
    max("name").as("name1_dup"), 
    max("another_column").as("another_column"),  
    max("age").as("age")
  ).drop(
    "name1_dup"
  ).show()

+------+--------------+---+
|  name|another_column|age|
+------+--------------+---+
|monica|         water| 99|
| karen|            hi| 21|
|   bob|          blah| 40|
+------+--------------+---+
+1

, , , . (! ), . UDF ( ), , funniness_of_requisite. , , . , , funniness_of_requisite.

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{IntegerType, StringType}

import scala.collection.mutable


object TestJob4 {

def main (args: Array[String]): Unit = {

val sparkSession = SparkSession
  .builder()
  .appName(this.getClass.getName.replace("$", ""))
  .master("local")
  .getOrCreate()

val sc = sparkSession.sparkContext

import sparkSession.sqlContext.implicits._

val rawDf = Seq(
  (1, "Moe",  "Slap",  7.9, 118),
  (2, "Larry",  "Spank",  8.0, 115),
  (3, "Curly",  "Twist", 6.0, 113),
  (4, "Laurel", "Whimper", 7.53, 119),
  (5, "Hardy", "Laugh", 6.0, 18),
  (6, "Charley",  "Ignore",   9.7, 115),
  (2, "Moe",  "Spank",  6.8, 118),
  (3, "Larry",  "Twist", 6.0, 115),
  (3, "Charley",  "fall", 9.0, 115)
).toDF("id", "name", "requisite", "funniness_of_requisite", "age")

rawDf.show(false)
rawDf.printSchema

val rawSchema = rawDf.schema

val fUdf = udf(reduceByFunniness, rawSchema)

val nameUdf = udf(extractAge, IntegerType)

val aggDf = rawDf
  .groupBy("name")
  .agg(
    count(struct("*")).as("count"),
    max(col("funniness_of_requisite")),
    collect_list(struct("*")).as("horizontal")
  )
  .withColumn("short", fUdf($"horizontal"))
  .withColumn("age", nameUdf($"short"))
  .drop("horizontal")

aggDf.printSchema

aggDf.show(false)
}

def reduceByFunniness= (x: Any) => {

val d = x.asInstanceOf[mutable.WrappedArray[GenericRowWithSchema]]

val red = d.reduce((r1, r2) => {

  val funniness1 = r1.getAs[Double]("funniness_of_requisite")
  val funniness2 = r2.getAs[Double]("funniness_of_requisite")

  val r3 = funniness1 match {
    case a if a >= funniness2 =>
      r1
    case _ =>
      r2
  }

  r3
})

red
}

def extractAge = (x: Any) => {

val d = x.asInstanceOf[GenericRowWithSchema]

d.getAs[Int]("age")
}
 }

  d.getAs[String]("name")
}
}

+-------+-----+---------------------------+-------------------------------+---+
|name   |count|max(funniness_of_requisite)|short                          
|age|
+-------+-----+---------------------------+-------------------------------+---+
|Hardy  |1    |6.0                        |[5, Hardy, Laugh, 6.0, 18]     
|18 |
|Moe    |2    |7.9                        |[1, Moe, Slap, 7.9, 118]       
|118|
|Curly  |1    |6.0                        |[3, Curly, Twist, 6.0, 113]    
|113|
|Larry  |2    |8.0                        |[2, Larry, Spank, 8.0, 115]    
|115|
|Laurel |1    |7.53                       |[4, Laurel, Whimper, 7.53, 119]|119|
|Charley|2    |9.7                        |[6, Charley, Ignore, 9.7, 115] |115|
+-------+-----+---------------------------+-------------------------------+---+
0

  1. 1

import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


object TestJob5 {

  def main (args: Array[String]): Unit = {

    val sparkSession = SparkSession
      .builder()
      .appName(this.getClass.getName.replace("$", ""))
      .master("local")
      .getOrCreate()

    val sc = sparkSession.sparkContext
    sc.setLogLevel("ERROR")

    import sparkSession.sqlContext.implicits._

    val rawDf = Seq(
      ("Moe",  "Slap",  7.9, 118),
      ("Larry",  "Spank",  8.0, 115),
      ("Curly",  "Twist", 6.0, 113),
      ("Laurel", "Whimper", 7.53, 119),
      ("Hardy", "Laugh", 6.0, 118),
      ("Charley",  "Ignore",   9.7, 115),
      ("Moe",  "Spank",  6.8, 118),
      ("Larry",  "Twist", 6.0, 115),
      ("Charley",  "fall", 9.0, 115)
    ).toDF("name", "requisite", "funniness_of_requisite", "age")

    rawDf.show(false)
    rawDf.printSchema

    val nameWindow = Window
      .partitionBy("name")

    val aggDf = rawDf
      .withColumn("id", monotonically_increasing_id)
      .withColumn("maxFun", max("funniness_of_requisite").over(nameWindow))
      .withColumn("count", count("name").over(nameWindow))
      .withColumn("minId", min("id").over(nameWindow))
      .where(col("maxFun") === col("funniness_of_requisite") && col("minId") === col("id") )
      .drop("maxFun")
      .drop("minId")
      .drop("id")

    aggDf.printSchema

    aggDf.show(false)
  }

}

, 1 , . , , ,

0

, , , . :

case class People(name: String, age: Int, other: String)   
val df = Seq(
  People("Rob", 20, "cherry"),
  People("Rob", 55, "banana"),
  People("Rob", 40, "apple"),
  People("Ariel", 55, "fox"),
  People("Vera", 43, "zebra"),
  People("Vera", 99, "horse")
).toDS

val oldestResults = df
 .groupByKey(_.name)
 .mapGroups{ 
    case (nameKey, peopleIter) => {
        var oldestPerson = peopleIter.next  
        while(peopleIter.hasNext) {
            val nextPerson = peopleIter.next
            if(nextPerson.age > oldestPerson.age) oldestPerson = nextPerson 
        }
        oldestPerson
    }
  }    
  oldestResults.show  

:

+-----+---+------+
| name|age| other|
+-----+---+------+
|Ariel| 55|   fox|
|  Rob| 55|banana|
| Vera| 99| horse|
+-----+---+------+
0

:

name    age   id
abc     24   1001
cde     24   1002
efg     22   1003
ghi     21   1004
ijk     20   1005
klm     19   1006
mno     18   1007
pqr     17   1008
rst     26   1009
tuv     27   1010
pqr     18   1011
rst     28   1012
tuv     29   1013

GroupBy

df.select("name","age","id").groupBy("name","id").agg(functions.max("age")).show();

:

+----+----+--------+
|name|  id|max(age)|
+----+----+--------+
| tuv|1010|      27|
| ijk|1005|      20|
| cde|1002|      24|
| rst|1013|      28|
| klm|1006|      19|
| tuv|1011|      29|
| pqr|1012|      18|
| abc|1001|      24|
| pqr|1008|      17|
| ghi|1004|      21|
| mno|1007|      18|
| efg|1003|      22|
| rst|1009|      26|
+----+----+--------+
-1

Source: https://habr.com/ru/post/1665830/


All Articles