Spark: group concat equivalent in scala rdd

I have the following DataFrame:

|-----id-------|----value------|-----desc------| | 1 | v1 | d1 | | 1 | v2 | d2 | | 2 | v21 | d21 | | 2 | v22 | d22 | |--------------|---------------|---------------| 

I want to convert it to:

  |-----id-------|----value------|-----desc------| | 1 | v1;v2 | d1;d2 | | 2 | v21;v22 | d21;d22 | |--------------|---------------|---------------| 
  • Is this possible through operations with data frames?
  • What will the rdd conversion look like in this case?

I assume that the rdd.reduce key is the key, but I do not know how to adapt it to this scenario.

+5
source share
4 answers

You can convert your data with spark sql

 case class Test(id: Int, value: String, desc: String) val data = sc.parallelize(Seq((1, "v1", "d1"), (1, "v2", "d2"), (2, "v21", "d21"), (2, "v22", "d22"))) .map(line => Test(line._1, line._2, line._3)) .df data.registerTempTable("data") val result = sqlContext.sql("select id,concat_ws(';', collect_list(value)),concat_ws(';', collect_list(value)) from data group by id") result.show 
+7
source

Suppose you have something like

 import scala.util.Random val sqlc: SQLContext = ??? case class Record(id: Long, value: String, desc: String) val testData = for { (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5)) } yield Record(i, s"v$i$j", s"d$i$j") val df = sqlc.createDataFrame(testData) 

You can easily combine data like:

 import sqlc.implicits._ def aggConcat(col: String) = df .map(row => (row.getAs[Long]("id"), row.getAs[String](col))) .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _) val result = aggConcat("value").zip(aggConcat("desc")).map{ case ((id, value), (_, desc)) => (id, value, desc) }.toDF("id", "values", "descs") 

If you want to have concatenated strings instead of arrays, you can run later

 import org.apache.spark.sql.functions._ val resultConcat = result .withColumn("values", concat_ws(";", $"values")) .withColumn("descs" , concat_ws(";", $"descs" )) 
+1
source

If you are working with DataFrames, use UDAF

 import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} class ConcatStringsUDAF(InputColumnName: String, sep:String = ",") extends UserDefinedAggregateFunction { def inputSchema: StructType = StructType(StructField(InputColumnName, StringType) :: Nil) def bufferSchema: StructType = StructType(StructField("concatString", StringType) :: Nil) def dataType: DataType = StringType def deterministic: Boolean = true def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = "" private def concatStrings(str1: String, str2: String): String = { (str1, str2) match { case (s1: String, s2: String) => Seq(s1, s2).filter(_ != "").mkString(sep) case (null, s: String) => s case (s: String, null) => s case _ => "" } } def update(buffer: MutableAggregationBuffer, input: Row): Unit = { val acc1 = buffer.getAs[String](0) val acc2 = input.getAs[String](0) buffer(0) = concatStrings(acc1, acc2) } def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { val acc1 = buffer1.getAs[String](0) val acc2 = buffer2.getAs[String](0) buffer1(0) = concatStrings(acc1, acc2) } def evaluate(buffer: Row): Any = buffer.getAs[String](0) } 

And then use this method

 val stringConcatener = new ConcatStringsUDAF("Category_ID", ",") data.groupBy("aaid", "os_country").agg(stringConcatener(data("X")).as("Xs")) 

Like from Spark 1.6, look at Datasets and Aggregator.

+1
source

After some research, I came up with this:

  val data = sc.parallelize( List( ("1", "v1", "d1"), ("1", "v2", "d2"), ("2", "v21", "d21"), ("2", "v22", "d22"))) .map{ case(id, value, desc)=>((id), (value, desc))} .reduceByKey((x,y)=>(x._1+";"+y._1, x._2+";"+x._2)) .map{ case(id,(value, desc))=>(id, value, desc)}.toDF("id", "value","desc") .show() 

what leaves me:

  +---+-------+-------+ | id| value| desc| +---+-------+-------+ | 1| v1;v2| d1;d1| | 2|v21;v22|d21;d21| +---+-------+-------+ 
0
source

Source: https://habr.com/ru/post/1237632/


All Articles