Python multiprocessing module: calling an instance method in a process

based on the multiprocessor module of Python, I need to do the following:

-Create all running processes that may be interrupted by a specific event.

-In this process, receive a message from the client and pass this message to the object instance handler method.

The base code is below (some details are omitted). The problem is that I am trying to call the instance method (self.enroll (message)), but there is no effect, as expected. I know that the reason is that processes use their own memory, etc. - and I already implemented the solution presented in Impossible to sort <type 'instancemethod'> when using multiprocessing python Pool.map () to solve the problem of etching methods limited, as well as for different approaches using dispatcher, queue, pool ... so as no one worked, I decided to put the code as raw as possible so you can understand my intention. Any help is appreciated.

class DistManager: def __init__(self, name, network_address, password): self.name = name self.network_address = network_address self.password = password self.distribution_clients = {} def _run_distribution_process(self): import select while not self.should_stop_distribution_service.is_set(): (sread, swrite, sexc) = select.select([self.distribution_listener], [], [], 0) if (sread): connection = self.distribution_listener.accept() serialized_message = connection.recv() # currently only receiving connection.close() message = pickle.loads(serialized_message) self.enroll(message) # THE PROBLEM IS HERE def start_distribution_service(self, distribution_port): self.distribution_port = distribution_port # patch for making Listener work with select.select during run Listener.fileno = lambda self: self._listener._socket.fileno() self.distribution_listener = Listener(address=(self.network_address, self.distribution_port), authkey=self.password) self.should_stop_distribution_service = Event() self.distribution_process = Process(name='Distribution Runner', target=self._run_distribution_process) self.distribution_process.daemon = True self.distribution_process.start() def stop_distribution_service(self): from time import sleep self.should_stop_distribution_service.set() sleep(1) self.distribution_listener.close() self.distribution_process.terminate() return self.distribution_process.exitcode def _enroll_distribution_client(self, identifier, network_address, phone_number): self.distribution_clients[identifier] = (network_address, phone_number) def enroll(self, message): if type(message.content) is tuple: self._enroll_distribution_client(message.generator_identifier, message.content[0], message.content[1]) else: raise TypeError("Tuple expected") return message.code 
+6
source share
1 answer

You can use multiprocessing.pool to do this without etching errors.

Add the following line to the code that does multiprocessing with the class, and you can still pass the method through the pool. codes must go above the class

 import copy_reg import types def _reduce_method(meth): return (getattr,(meth.__self__,meth.__func__.__name__)) copy_reg.pickle(types.MethodType,_reduce_method) 

for a better understanding of how to expand a method, see below http://docs.python.org/2/library/copy_reg.html

+1
source

Source: https://habr.com/ru/post/957146/


All Articles