This is an input dataset.
$ cat input.csv
Sno|UserID|TypeExp
1|JAS123|MOVIE
2|ASP123|GAMES
3|JAS123|CLOTHING
4|DPS123|MOVIE
5|DPS123|CLOTHING
6|ASP123|MEDICAL
7|JAS123|OTH
8|POQ133|MEDICAL
pivot groupBy UserID.
val bins = spark
.read
.option("sep", "|")
.option("header", true)
.csv("input.csv")
.groupBy("UserID")
.pivot("TypeExp")
.count
.na
.fill(0)
scala> bins.show
+------+--------+-----+-------+-----+---+
|UserID|CLOTHING|GAMES|MEDICAL|MOVIE|OTH|
+------+--------+-----+-------+-----+---+
|POQ133| 0| 0| 1| 0| 0|
|JAS123| 1| 0| 0| 1| 1|
|DPS123| 1| 0| 0| 1| 0|
|ASP123| 0| 1| 1| 0| 0|
+------+--------+-----+-------+-----+---+
0 1 s. , array , , .
val solution = bins.select(
$"UserID" as "User",
array("MOVIE","GAMES","CLOTHING","MEDICAL","OTH") as "TypeExpList")
scala> solution.show
+------+---------------+
| User| TypeExpList|
+------+---------------+
|POQ133|[0, 0, 0, 1, 0]|
|JAS123|[1, 0, 1, 0, 1]|
|DPS123|[1, 0, 1, 0, 0]|
|ASP123|[0, 1, 0, 1, 0]|
+------+---------------+
, , , , count 0, 1 .
UDF , 0 1.
val binarizer = udf { count: Long => if (count > 0) 1 else 0 }
val binaryCols = bins
.columns
.filterNot(_ == "UserID")
.map(col)
.map(c => binarizer(c) as c.toString)
val selectCols = ($"UserID" as "User") +: binaryCols
val solution = bins
.select(selectCols: _*)
.select(
$"User",
array("MOVIE","GAMES","CLOTHING","MEDICAL","OTH") as "TypeExpList")
scala> solution.show
+------+---------------+
| User| TypeExpList|
+------+---------------+
|POQ133|[0, 0, 0, 1, 0]|
|JAS123|[1, 0, 1, 0, 1]|
|DPS123|[1, 0, 1, 0, 0]|
|ASP123|[0, 1, 0, 1, 0]|
+------+---------------+