Pyspark: grouby, then get the maximum value of each group

I would like to group by value and then find the maximum value in each group using PySpark. I have the following code, but now I am a little stuck on how to extract the maximum value.

# some file contains tuples ('user', 'item', 'occurrences') data_file = sc.textData('file:///some_file.txt') # Create the triplet so I index stuff data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) # Group by the user ie r[0] grouped = data_file.groupBy(lambda r: r[0]) # Here is where I am stuck group_list = grouped.map(lambda x: (list(x[1]))) #? 

It returns something like:

 [[(u'u1', u's1', 20), (u'u1', u's2', 5)], [(u'u2', u's3', 5), (u'u2', u's2', 10)]] 

I want to find the maximum "appearance" for each user now. The end result after executing max would result in an RDD that would look like this:

 [[(u'u1', u's1', 20)], [(u'u2', u's2', 10)]] 

Where only the maximum data set will remain for each user in the file. In other words, I want to change the RDD value to contain only one triplet, each of which contains the maximum number of users.

+5
source share
2 answers

There is no need for groupBy . A simple reduceByKey will work reduceByKey fine, and most of the time will be more efficient:

 data_file = sc.parallelize([ (u'u1', u's1', 20), (u'u1', u's2', 5), (u'u2', u's3', 5), (u'u2', u's2', 10)]) max_by_group = (data_file .map(lambda x: (x[0], x)) # Convert to PairwiseRD # Take maximum of the passed arguments by the last element (key) # equivalent to: # lambda x, y: x if x[-1] > y[-1] else y .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) .values()) # Drop keys max_by_group.collect() ## [('u2', 's2', 10), ('u1', 's1', 20)] 
+9
source

I think I found a solution:

 from pyspark import SparkContext, SparkConf def reduce_by_max(rdd): """ Helper function to find the max value in a list of values ie triplets. """ max_val = rdd[0][2] the_index = 0 for idx, val in enumerate(rdd): if val[2] > max_val: max_val = val[2] the_index = idx return rdd[the_index] conf = SparkConf() \ .setAppName("Collaborative Filter") \ .set("spark.executor.memory", "5g") sc = SparkContext(conf=conf) # some file contains tuples ('user', 'item', 'occurrences') data_file = sc.textData('file:///some_file.txt') # Create the triplet so I can index stuff data_file = data_file.map(lambda l: l.split()).map(lambda l: (l[0], l[1], float(l[2]))) # Group by the user ie r[0] grouped = data_file.groupBy(lambda r: r[0]) # Get the values as a list group_list = grouped.map(lambda x: (list(x[1]))) # Get the max value for each user. max_list = group_list.map(reduce_by_max).collect() 
+2
source

Source: https://habr.com/ru/post/1235972/


All Articles