How to pass a function with multiple arguments in python concurrent.futures.ProcessPoolExecutor.map ()?

I would like concurrent.futures.ProcessPoolExecutor.map() call a function consisting of 2 or more arguments. In the example below, I resorted to using the lambda function and defined ref as an array of equal size with numberlist with the same value.

1st question: Is there a better way to do this? In the case where the size of the list of numbers can be from a million to a billion elements, therefore, the size of the link should correspond to the list of numbers, this approach unnecessarily takes precious memory, which I would like to avoid. I did this because I read the map function to complete its display until the shortest end of the array is reached.

 import concurrent.futures as cf nmax = 10 numberlist = range(nmax) ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5] workers = 3 def _findmatch(listnumber, ref): print('def _findmatch(listnumber, ref):') x='' listnumber=str(listnumber) ref = str(ref) print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) if ref in listnumber: x = listnumber print('x = {0}'.format(x)) return xa = map(lambda x, y: _findmatch(x, y), numberlist, ref) for n in a: print(n) if str(ref[0]) in n: print('match') with cf.ProcessPoolExecutor(max_workers=workers) as executor: #for n in executor.map(_findmatch, numberlist): for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref): print(type(n)) print(n) if str(ref[0]) in n: print('match') 

By running the code above, I found that the map function was able to achieve the desired result. However, when I passed the same conditions to concurrent.futures.ProcessPoolExecutor.map (), python3.5 failed with this error:

 Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed obj = ForkingPickler.dumps(obj) File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed 

Question 2 . Why did this error occur and how do I get concurrent.futures.ProcessPoolExecutor.map () to call a function with more than one argument?

+5
source share
3 answers

To answer your second question, you get an exception because a lambda function like the one you are using cannot be selected. Since Python uses the pickle protocol to serialize the data passed between the main process and the ProcessPoolExecutor workflows, this is a problem. It is not clear why you use lambda at all. At lambda you have two arguments, like the original function. You can use _findmatch directly instead of lambda , and it should work.

 with cf.ProcessPoolExecutor(max_workers=workers) as executor: for n in executor.map(_findmatch, numberlist, ref): ... 

Regarding the first question of passing a second, constant argument without creating a giant list, you can solve this in several ways. One approach might be to use itertools.repeat to create an iterative object that forever repeats the same value when iterates.

But a better approach would probably be to write an extra function that passes you the constant argument. (Perhaps this is why you tried to use the lambda function?). It should work if the function you use is available in the top-level namespace of the module:

 def _helper(x): return _findmatch(x, 5) with cf.ProcessPoolExecutor(max_workers=workers) as executor: for n in executor.map(_helper, numberlist): ... 
+3
source

(1) There is no need to make a list. You can use itertools.repeat to create an iterator that just repeats some value.

(2) You need to pass the named function to map , because it will be passed to the subprocess for execution. map uses the brine protocol to send things, lambdas cannot be pickled, and therefore they cannot be part of the map. But this is completely unnecessary. All your lambdas made a call to parameter function 2 with two parameters. Remove it completely.

Working code

 import concurrent.futures as cf import itertools nmax = 10 numberlist = range(nmax) workers = 3 def _findmatch(listnumber, ref): print('def _findmatch(listnumber, ref):') x='' listnumber=str(listnumber) ref = str(ref) print('listnumber = {0} and ref = {1}'.format(listnumber, ref)) if ref in listnumber: x = listnumber print('x = {0}'.format(x)) return x with cf.ProcessPoolExecutor(max_workers=workers) as executor: #for n in executor.map(_findmatch, numberlist): for n in executor.map(_findmatch, numberlist, itertools.repeat(5)): print(type(n)) print(n) #if str(ref[0]) in n: # print('match') 
+2
source

As for your first question, do I understand correctly that you want to pass an argument whose value is determined only when you call map , but a constant for all instances of the function being displayed? If so, I would make map with a function derived from the "template function" with the second argument ( ref in your example) baked into it using functools.partial :

 from functools import partial refval = 5 def _findmatch(ref, listnumber): # arguments swapped ... with cf.ProcessPoolExecutor(max_workers=workers) as executor: for n in executor.map(partial(_findmatch, refval), numberlist): ... 

Re. question 2, first part: I did not find the exact code fragment that tries to sort (serialize) the function, which should then be executed in parallel, but it seems natural that this should happen - not only the arguments, but also the function should be passed to the workers somehow , and most likely, it should be serialized for this transfer. The fact that partial functions can be pickled and lambda cannot be mentioned anywhere else, for example here: fooobar.com/questions/44572 / ....

Re. question 2, second part: if you want to call a function with several arguments in ProcessPoolExecutor.map , you will pass the function to it as the first argument, followed by the iterability of the first arguments to the function, followed by the iterability of its second argument, etc. In your case:

 for n in executor.map(_findmatch, numberlist, ref): ... 
+1
source

Source: https://habr.com/ru/post/1263848/


All Articles