Suppose you have something like
import scala.util.Random val sqlc: SQLContext = ??? case class Record(id: Long, value: String, desc: String) val testData = for { (i, j) <- List.fill(30)(Random.nextInt(5), Random.nextInt(5)) } yield Record(i, s"v$i$j", s"d$i$j") val df = sqlc.createDataFrame(testData)
You can easily combine data like:
import sqlc.implicits._ def aggConcat(col: String) = df .map(row => (row.getAs[Long]("id"), row.getAs[String](col))) .aggregateByKey(Vector[String]())(_ :+ _, _ ++ _) val result = aggConcat("value").zip(aggConcat("desc")).map{ case ((id, value), (_, desc)) => (id, value, desc) }.toDF("id", "values", "descs")
If you want to have concatenated strings instead of arrays, you can run later
import org.apache.spark.sql.functions._ val resultConcat = result .withColumn("values", concat_ws(";", $"values")) .withColumn("descs" , concat_ws(";", $"descs" ))
source share