Using groupBy in Spark and returning to DataFrame

I have difficulty working with data frames in spark mode using Scala. If I have a data frame that I want to extract a column of unique records when I use groupBy , I do not get a data frame.

For example, I have DataFrame called logs that look like this:

 machine_id | event | other_stuff 34131231 | thing | stuff 83423984 | notathing | notstuff 34131231 | thing | morestuff 

and I would like the unique identifiers of the machine where the event is stored in a new DataFrame so that I can do some filtering. Using

 val machineId = logs .where($"event" === "thing") .select("machine_id") .groupBy("machine_id") 

I get val of Grouped Data back, which is a pain in the butt to use (or I don't know how to use this type of object correctly). Having received this list of unique machine identifiers, I want to use this when filtering another DataFrame to retrieve all events for individual machine identifiers.

I see that I want to do such things pretty regularly, and the main workflow:

  • Retrieve a unique identifier from the log table.
  • Use unique identifiers to retrieve all events for a specific identifier.
  • Use some kind of analysis of the data that has been extracted.

These are the first two steps that I would appreciate here.

I appreciate that this example is far-fetched, but I hope it explains what my problem is. Perhaps I do not know enough about GroupedData objects or (as I hope) that I am missing something in the data frames, which makes this easier. I am using spark 1.5 built on Scala 2.10.4.

thanks

+5
source share
2 answers

Just use distinct not groupBy :

 val machineId = logs.where($"event"==="thing").select("machine_id").distinct 

Which will be equivalent to SQL:

 SELECT DISTINCT machine_id FROM logs WHERE event = 'thing' 

GroupedData not intended for direct use. It provides a number of methods where agg is the most common one that can be used to apply various aggregate functions and convert it back to a DataFrame . From an SQL perspective, what you got after where and groupBy is equivalent to something like this

 SELECT machine_id, ... FROM logs WHERE event = 'thing' GROUP BY machine_id 

where ... should be provided by agg or an equivalent method.

+7
source

Group into sparks, followed by aggregation, and then the select statement will return the data frame. For your example, this should be something like:

 val machineId = logs .groupBy("machine_id", "event") .agg(max("other_stuff") ) .select($"machine_id").where($"event" === "thing") 
+1
source

Source: https://habr.com/ru/post/1235775/


All Articles