Multiprocessing calls in a Python method class

Initially, I have a class for storing some processed values ​​and reusing them in other ways.

The problem is that I tried to split the class method into several processes in order to speed up the python process, but it didn’t seem to work (as I saw in the task manager, which executed only 1 process) and the result was never delivered.

I did a couple of searches and found that pathos.multiprocessing can do this, but I wonder if the standard library can solve these problems?

from multiprocessing import Pool class A(): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = Pool(processes=4) rs = t.map(self.cal, dt) t.close() return t a = A(2) a.run(list(range(10))) 
0
source share
2 answers

Your code fails because it cannot pickle use the instance method ( self.cal ), which Python tries to do when you create several processes by matching them with multiprocessing.Pool (well, there is a way to do this, but it's too confusing and not very useful in any case) - since there is no access to shared memory, it must "pack" the data and send it to the spawned process for unpacking. The same thing will happen to you if you try to sort an instance of a .

The only shared memory access available in the multiprocessing package is slightly known to multiprocessing.pool.ThreadPool , so if you really want to do this:

 from multiprocessing.pool import ThreadPool class A(): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = ThreadPool(processes=4) rs = t.map(self.cal, dt) t.close() return rs a = A(2) print(a.run(list(range(10)))) # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

But this will not give you parallelization, since it essentially corresponds to your regular threads that have access to shared memory. Instead, you should pass the class / static methods (if you need them), followed by the data you want to work with (in your case self.vl ). If you need to share this data between processes, you will have to use some common memory abstraction, for example multiprocessing.Value , applying the mutex along the way, of course.

UPDATE

I said that you can do this (and there are modules that more or less do this, for example, pathos.multiprocessing ), but I don’t think it is worth it when you come to the point where you should fool your system, when doing what you want, most likely you are using the wrong system or you need to rethink your design. But for the sake of awareness, here is one way to do what you want in multiprocessing setup:

 import sys from multiprocessing import Pool def parallel_call(params): # a helper for calling 'remote' instances cls = getattr(sys.modules[__name__], params[0]) # get our class type instance = cls.__new__(cls) # create a new instance without invoking __init__ instance.__dict__ = params[1] # apply the passed state to the new instance method = getattr(instance, params[2]) # get the requested method args = params[3] if isinstance(params[3], (list, tuple)) else [params[3]] return method(*args) # expand arguments, call our method and return the result class A(object): def __init__(self, vl): self.vl = vl def cal(self, nb): return nb * self.vl def run(self, dt): t = Pool(processes=4) rs = t.map(parallel_call, self.prepare_call("cal", dt)) t.close() return rs def prepare_call(self, name, args): # creates a 'remote call' package for each argument for arg in args: yield [self.__class__.__name__, self.__dict__, name, arg] if __name__ == "__main__": # important protection for cross-platform use a = A(2) print(a.run(list(range(10)))) # prints: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 

I think it’s very clear how this works, but in short it passes the name of your class, its current state (sans signal, tho), the desired method to call, and the arguments to call it using parallel_call , which is called for each process in the Pool . Python automatically resolves and decompresses all this data, so all parallel_call must be restored, restored to the original object, find the desired method in it and call it with the provided parameters.

Thus, we only transmit data, not trying to pass active objects, so Python does not complain (well, in this case, try adding a reference to the instance method to your class parameters and see what happens) and everything works fine.

If you want to have a strong effect on magic, you can make it look like your code (create your own Pool handler, take names from functions and send names to actual processes, etc.), but this should serve as a sufficient function for your example .

However, before raising your hopes, keep in mind that this will only work if the “static” instance is shared (an instance that does not change its initial state after its launch in the context of multiprocessing). If the A.cal method should change the internal state of the vl property, this will only affect the instance where it changes (if it does not change in the main instance, which calls Pool between calls). If you want to share this state, you can update parallel_call to select instance.__dict__ after the call and return it with the result of the method call, and then on the calling side you will need to update the local __dict__ with the returned data to change the initial state. And this is not enough - you really need to create a common dict file and process all the mutex employees so that it simultaneously accesses all processes (for this you can use multiprocessing.Manager ).

So, as I said, more problems than its value ...

+4
source

Question : it seems that it does not work (as I saw in the task manager when only 1 process was running), and the result is never delivered.

You see only 1 process, since Pool calculates the number of processes used as follows:
You set range(10) = task index 0..9, so Pool compute (10/4) * 4 = 8+1 = 9 10/4 (10/4) * 4 = 8+1 = 9 .
After starting the first process there are no more jobs left.
Use range(32) and you will see 4 running process .

You return return t instead of returning the result rs = pool.map(...


This will work for example

 def cal(self, nb): import os print('pid:{} cal({})'.format(os.getpid(), nb)) return nb * self.vl def run(self,df): with mp.Pool(processes=4) as pool: return pool.map(self.cal, df) if __name__ == '__main__': a = A(2) result = a.run(list(range(32))) print(result) 

Tested with Python: 3.4.2

0
source

Source: https://habr.com/ru/post/1269803/


All Articles