A cleaned version of the code , including a solution to the problem (thanks @JohanL!) Can be found as a Gist on GitHub .
The following code snippet (CPython 3. [4,5,6]) illustrates my intention (as well as my problem):
from functools import partial
import multiprocessing
from pprint import pprint as pp
NUM_CORES = multiprocessing.cpu_count()
class some_class:
some_dict = {'some_key': None, 'some_other_key': None}
def some_routine(self):
self.some_dict.update({'some_key': 'some_value'})
def some_other_routine(self):
self.some_dict.update({'some_other_key': 77})
def run_routines_on_objects_in_parallel_and_return(in_object_list, routine_list):
func_handle = partial(__run_routines_on_object_and_return__, routine_list)
with multiprocessing.Pool(processes = NUM_CORES) as p:
out_object_list = list(p.imap_unordered(
func_handle,
(in_object for in_object in in_object_list)
))
return out_object_list
def __run_routines_on_object_and_return__(routine_list, in_object):
for routine_name in routine_list:
getattr(in_object, routine_name)()
return in_object
object_list = [some_class() for item in range(20)]
pp([item.some_dict for item in object_list])
new_object_list = run_routines_on_objects_in_parallel_and_return(
object_list,
['some_routine', 'some_other_routine']
)
pp([item.some_dict for item in new_object_list])
verification_object_list = [
__run_routines_on_object_and_return__(
['some_routine', 'some_other_routine'],
item
) for item in object_list
]
pp([item.some_dict for item in verification_object_list])
I work with a list of type objects some_class
. some_class
It has a property with the name dictionary some_dict
and a few methods that can change the dict ( some_routine
and some_other_routine
). Sometimes I want to call a sequence of methods for all objects in a list. Since it is computationally intensive, I intend to distribute the objects across several CPU cores (using multiprocessing.Pool
and imap_unordered
- the order of the list does not matter).
__run_routines_on_object_and_return__
. , , . functools.partial
- .
, ... . , , imap_unordered
, , . , . , - , - , .
verification_object_list
( /). new_object_list
object_list
, .
?
, . , :
import random
from multiprocessing import Pool, Manager
class Tester(object):
def __init__(self, num=0.0, name='none'):
self.num = num
self.name = name
def modify_me(self):
self.num += random.normalvariate(mu=0, sigma=1)
self.name = 'pla' + str(int(self.num * 100))
def __repr__(self):
return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
def init(L):
global tests
tests = L
def modify(i_t_nn):
i, t, nn = i_t_nn
for method_name in nn:
getattr(t, method_name)()
tests[i] = t
return i
def main():
num_processes = num = 10
manager = Manager()
tests = manager.list([Tester(num=i) for i in range(num)])
print(tests[:2])
args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
for i in pool.imap_unordered(modify, args):
print("done %d" % i)
pool.close()
pool.join()
print(tests[:2])
if __name__ == '__main__':
main()
some_class
, some_dict
. :
import random
from multiprocessing import Pool, Manager
from pprint import pformat as pf
class some_class:
some_dict = {'some_key': None, 'some_other_key': None}
def some_routine(self):
self.some_dict.update({'some_key': 'some_value'})
def some_other_routine(self):
self.some_dict.update({'some_other_key': 77})
def __repr__(self):
return pf(self.some_dict)
def init(L):
global tests
tests = L
def modify(i_t_nn):
i, t, nn = i_t_nn
for method_name in nn:
getattr(t, method_name)()
tests[i] = t
return i
def main():
num_processes = num = 10
manager = Manager()
tests = manager.list([some_class() for i in range(num)])
print(tests[:2])
args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
for i in pool.imap_unordered(modify, args):
print("done %d" % i)
pool.close()
pool.join()
print(tests[:2])
if __name__ == '__main__':
main()
, :
diff --git a/test.py b/test.py
index b12eb56..0aa6def 100644
--- a/test.py
+++ b/test.py
@@ -1,15 +1,15 @@
import random
from multiprocessing import Pool, Manager
+from pprint import pformat as pf
-class Tester(object):
- def __init__(self, num=0.0, name='none'):
- self.num = num
- self.name = name
- def modify_me(self):
- self.num += random.normalvariate(mu=0, sigma=1)
- self.name = 'pla' + str(int(self.num * 100))
+class some_class:
+ some_dict = {'some_key': None, 'some_other_key': None}
+ def some_routine(self):
+ self.some_dict.update({'some_key': 'some_value'})
+ def some_other_routine(self):
+ self.some_dict.update({'some_other_key': 77})
def __repr__(self):
- return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
+ return pf(self.some_dict)
def init(L):
global tests
@@ -25,10 +25,10 @@ def modify(i_t_nn):
def main():
num_processes = num = 10
manager = Manager()
- tests = manager.list([Tester(num=i) for i in range(num)])
+ tests = manager.list([some_class() for i in range(num)])
print(tests[:2])
- args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
+ args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
?