Error using reducebykey: int file not subject to subscription

I get the error message "int object is unsubscriptable" when I run the following script:

element.reduceByKey( lambda x , y : x[1]+y[1]) 

with an element is RDD with a key, and the value is a tuple. Input Example:

 (A, (toto , 10)) (A, (titi , 30)) (5, (tata, 10)) (A, (toto, 10)) 

I understand that the reduceByKey function takes (K, V) tuples and applies the function to all values ​​to get the final reduction result. As an example, given in ReduceByKey Apache .

Any help please?

+1
source share
2 answers

Here is an example that illustrates what happens.

Consider what happens when you call reduce on a list using some function f :

 reduce(f, [a,b,c]) = f(f(a,b),c) 

If you take your example, f = lambda u, v: u[1] + v[1] , then the above expression is divided into:

 reduce(f, [a,b,c]) = f(f(a,b),c) = f(a[1]+b[1],c) 

But a[1] + b[1] is an integer, so there is no __getitem__ method, therefore, your mistake.

In general, the best approach (as shown below) is to use map() to first extract the data in the desired format, and then apply reduceByKey() .


MCVE with your data

 element = sc.parallelize( [ ('A', ('toto' , 10)), ('A', ('titi' , 30)), ('5', ('tata', 10)), ('A', ('toto', 10)) ] ) 

You can get the desired result with a more complex reduction function:

 def add_tuple_values(a, b): try: u = a[1] except: u = a try: v = b[1] except: v = b return u + v print(element.reduceByKey(add_tuple_values).collect()) 

Except that this results in:

 [('A', 50), ('5', ('tata', 10))] 

Why? Because there is only one value for the key '5' , so there is nothing to reduce.

For these reasons, it is better to call map first. To get the desired result, you can do:

 >>> print(element.map(lambda x: (x[0], x[1][1])).reduceByKey(lambda u, v: u+v).collect()) [('A', 50), ('5', 10)] 

Update 1

Here is another approach:

You can create a tuple in your reduce function, and then call map to extract the desired value. (Essentially, reorder map and reduce .)

 print( element.reduceByKey(lambda u, v: (0,u[1]+v[1])) .map(lambda x: (x[0], x[1][1])) .collect() ) [('A', 50), ('5', 10)] 

Notes

  • If there were at least 2 entries for each key, using add_tuple_values() would give you the correct result.
+2
source

Another approach is to use a Dataframe

 rdd = sc.parallelize([('A', ('toto', 10)),('A', ('titi', 30)),('5', ('tata', 10)),('A', ('toto', 10))]) rdd.map(lambda (a,(b,c)): (a,b,c)).toDF(['a','b','c']).groupBy('a').agg(sum("c")).rdd.map(lambda (a,c): (a,c)).collect() >>>[(u'5', 10), (u'A', 50)] 
+2
source

Source: https://habr.com/ru/post/986205/


All Articles