How to use Spark SQL DataFrame with a flat map?

I am using the Spark Scala API. I have a Spark SQL DataFrame (read from an Avro file) with the following schema:

root |-- ids: array (nullable = true) | |-- element: map (containsNull = true) | | |-- key: integer | | |-- value: string (valueContainsNull = true) |-- match: array (nullable = true) | |-- element: integer (containsNull = true) 

Essentially 2 columns [ids: List [Map [Int, String]], correspond to: List [Int]]. Sample data that looks like this:

 [List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)] [List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)] ... 

What I would like to do is flatMap() each row to create 3 columns [ id , property , match ]. Using the above 2 lines as input, we get:

 [1,a,0] [2,b,0] [3,c,1] [4,d,0] [5,c,1] [6,a,0] [7,e,1] [8,d,0] ... 

and then groupBy String property (for example: a, b, ...) to create count("property") and sum("match") :

  a 2 0 b 1 0 c 2 2 d 2 0 e 1 1 

I would like to do something like:

 val result = myDataFrame.select("ids","match").flatMap( (row: Row) => row.getList[Map[Int,String]](1).toArray() ) result.groupBy("property").agg(Map( "property" -> "count", "match" -> "sum" ) ) 

The problem is that flatMap converts the DataFrame to RDD. Is there a good way to do an operation like flatMap followed by groupBy with DataFrames?

+6
source share
2 answers

What does flatMap do what you want? It converts each input line to 0 or more lines. He can filter them out or add new ones. In SQL, you use join to get the same functionality. Can you do what you want to do with join ?

Alternatively, you can also look at Dataframe.explode , which is just a specific type of join (you can easily create your own explode by attaching a DataFrame to UDF). explode takes a single column as input and allows you to split it or convert it to multiple values, and then join original row back to new rows. So:

 user groups griffin mkt,it,admin 

Can be:

 user group griffin mkt griffin it griffin admin 

So, I would say look at Dataframe.explode , and if that doesn't help you easily, try joining UDF.

+8
source

My SQL is a little rusty, but one parameter is in your flatMap to create a list of Row objects, and then you can convert the resulting RDD back to a DataFrame.

0
source

Source: https://habr.com/ru/post/987687/


All Articles