Multiprocessor pool: how to call an arbitrary list of methods in the list of class objects

A cleaned version of the code , including a solution to the problem (thanks @JohanL!) Can be found as a Gist on GitHub .


The following code snippet (CPython 3. [4,5,6]) illustrates my intention (as well as my problem):

from functools import partial
import multiprocessing
from pprint import pprint as pp

NUM_CORES = multiprocessing.cpu_count()

class some_class:
    some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})

def run_routines_on_objects_in_parallel_and_return(in_object_list, routine_list):
    func_handle = partial(__run_routines_on_object_and_return__, routine_list)
    with multiprocessing.Pool(processes = NUM_CORES) as p:
        out_object_list = list(p.imap_unordered(
            func_handle,
            (in_object for in_object in in_object_list)
            ))
    return out_object_list

def __run_routines_on_object_and_return__(routine_list, in_object):
    for routine_name in routine_list:
        getattr(in_object, routine_name)()
    return in_object

object_list = [some_class() for item in range(20)]
pp([item.some_dict for item in object_list])

new_object_list = run_routines_on_objects_in_parallel_and_return(
        object_list,
        ['some_routine', 'some_other_routine']
        )
pp([item.some_dict for item in new_object_list])

verification_object_list = [
    __run_routines_on_object_and_return__(
        ['some_routine', 'some_other_routine'],
        item
        ) for item in object_list
    ]
pp([item.some_dict for item in verification_object_list])

I work with a list of type objects some_class. some_classIt has a property with the name dictionary some_dictand a few methods that can change the dict ( some_routineand some_other_routine). Sometimes I want to call a sequence of methods for all objects in a list. Since it is computationally intensive, I intend to distribute the objects across several CPU cores (using multiprocessing.Pooland imap_unordered- the order of the list does not matter).

__run_routines_on_object_and_return__ . , , . functools.partial - .

, ... . , , imap_unordered, , . , . , - , - , .

verification_object_list ( /). new_object_list object_list, .

?


, . , :

import random
from multiprocessing import Pool, Manager

class Tester(object):
    def __init__(self, num=0.0, name='none'):
        self.num  = num
        self.name = name
    def modify_me(self):
        self.num += random.normalvariate(mu=0, sigma=1)
        self.name = 'pla' + str(int(self.num * 100))
    def __repr__(self):
        return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    for method_name in nn:
        getattr(t, method_name)()
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([Tester(num=i) for i in range(num)])
    print(tests[:2])

    args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()

some_class, some_dict. :

import random
from multiprocessing import Pool, Manager
from pprint import pformat as pf

class some_class:
    some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})
    def __repr__(self):
        return pf(self.some_dict)

def init(L):
    global tests
    tests = L

def modify(i_t_nn):
    i, t, nn = i_t_nn
    for method_name in nn:
        getattr(t, method_name)()
    tests[i] = t # copy back
    return i

def main():
    num_processes = num = 10 #note: num_processes and num may differ
    manager = Manager()
    tests = manager.list([some_class() for i in range(num)])
    print(tests[:2])

    args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))
    pool = Pool(processes=num_processes, initializer=init, initargs=(tests,))
    for i in pool.imap_unordered(modify, args):
        print("done %d" % i)
    pool.close()
    pool.join()
    print(tests[:2])

if __name__ == '__main__':
    main()

, :

diff --git a/test.py b/test.py
index b12eb56..0aa6def 100644
--- a/test.py
+++ b/test.py
@@ -1,15 +1,15 @@
 import random
 from multiprocessing import Pool, Manager
+from pprint import pformat as pf

-class Tester(object):
-       def __init__(self, num=0.0, name='none'):
-               self.num  = num
-               self.name = name
-       def modify_me(self):
-               self.num += random.normalvariate(mu=0, sigma=1)
-               self.name = 'pla' + str(int(self.num * 100))
+class some_class:
+       some_dict = {'some_key': None, 'some_other_key': None}
+       def some_routine(self):
+               self.some_dict.update({'some_key': 'some_value'})
+       def some_other_routine(self):
+               self.some_dict.update({'some_other_key': 77})
        def __repr__(self):
-               return '%s(%r, %r)' % (self.__class__.__name__, self.num, self.name)
+               return pf(self.some_dict)

 def init(L):
        global tests
@@ -25,10 +25,10 @@ def modify(i_t_nn):
 def main():
        num_processes = num = 10 #note: num_processes and num may differ
        manager = Manager()
-       tests = manager.list([Tester(num=i) for i in range(num)])
+       tests = manager.list([some_class() for i in range(num)])
        print(tests[:2])

-       args = ((i, t, ['modify_me']) for i, t in enumerate(tests))
+       args = ((i, t, ['some_routine', 'some_other_routine']) for i, t in enumerate(tests))

?

+4
1

; , .

, , . , . (.. class).

, . , , , . , .

, , , some_dict , . __init__(). , some_class :

class some_class:
    def __init__(self):
        self.some_dict = {'some_key': None, 'some_other_key': None}
    def some_routine(self):
        self.some_dict.update({'some_key': 'some_value'})
    def some_other_routine(self):
        self.some_dict.update({'some_other_key': 77})

, . __init__(), , ( ). , , .

EDIT: , , class . , , , class, . , , . , , , class.

+3

Source: https://habr.com/ru/post/1685793/


All Articles