Here is an example that illustrates what happens.
Consider what happens when you call reduce on a list using some function f :
reduce(f, [a,b,c]) = f(f(a,b),c)
If you take your example, f = lambda u, v: u[1] + v[1] , then the above expression is divided into:
reduce(f, [a,b,c]) = f(f(a,b),c) = f(a[1]+b[1],c)
But a[1] + b[1] is an integer, so there is no __getitem__ method, therefore, your mistake.
In general, the best approach (as shown below) is to use map() to first extract the data in the desired format, and then apply reduceByKey() .
MCVE with your data
element = sc.parallelize( [ ('A', ('toto' , 10)), ('A', ('titi' , 30)), ('5', ('tata', 10)), ('A', ('toto', 10)) ] )
You can get the desired result with a more complex reduction function:
def add_tuple_values(a, b): try: u = a[1] except: u = a try: v = b[1] except: v = b return u + v print(element.reduceByKey(add_tuple_values).collect())
Except that this results in:
[('A', 50), ('5', ('tata', 10))]
Why? Because there is only one value for the key '5' , so there is nothing to reduce.
For these reasons, it is better to call map first. To get the desired result, you can do:
>>> print(element.map(lambda x: (x[0], x[1][1])).reduceByKey(lambda u, v: u+v).collect()) [('A', 50), ('5', 10)]
Update 1
Here is another approach:
You can create a tuple in your reduce function, and then call map to extract the desired value. (Essentially, reorder map and reduce .)
print( element.reduceByKey(lambda u, v: (0,u[1]+v[1])) .map(lambda x: (x[0], x[1][1])) .collect() ) [('A', 50), ('5', 10)]
Notes
- If there were at least 2 entries for each key, using
add_tuple_values() would give you the correct result.