It is impossible to stop \ kill all processes that are simultaneously performed by multiprocessing.

I need to stop \ kill all processes when there is an error \ exception. I found in the StackOwerflow solution to kill all processes using psutil , but from time to time I have a problem: when psutil kills the child and main processes, new processes may start and the code continues to run.

 import psutil class MyClass: parent_pid = 0 ids_list = range(300) def main(self): self.parent_pid = os.getpid() pool = multiprocessing.Pool(3) for osm_id in self.ids_list: pool.apply_async(self.handle_country_or_region, kwds=dict(country_id=osm_id), error_callback=self.kill_proc_tree) pool.close() pool.join() def kill_proc_tree(self, including_parent=True): parent = psutil.Process(self.parent_pid) children = parent.children(recursive=True) for child in children: child.kill() psutil.wait_procs(children, timeout=5) if including_parent: parent.kill() parent.wait(5) def handle_country_or_region(self, country_id=None, queue=None): pass # here I do some task 

It seems I need to stop the pool, not kill processes, but in this case, if I do

 pool.close() pool.terminate() pool.join() 

my terminal stops doing anything, the new line is completely empty (that is, without "→>"), and nothing happens.

Ideally, I want to have the following stream: if there is any error \ exception, stop \ kill all executions of the code and return to the interactive help in the terminal.

Can someone help me make it work correctly? I am using Python 3.5 and Ubuntu 15.10

+5
source share
1 answer

The solution is quite simple - put the "killer" function inside the "main".

The full code is as follows:

 class MyClass: ids_list = range(300) def main(self): pool = multiprocessing.Pool(3) def kill_pool(err_msg): print(err_msg) pool.terminate() for osm_id in self.ids_list: pool.apply_async(self.handle_country_or_region, kwds=dict(country_id=osm_id), error_callback=kill_pool) pool.close() pool.join() def handle_country_or_region(self, country_id=None, queue=None): pass # here I do some task 

If someone needs to use queue , the following is an extended version of the code that shows how to correctly finish the queue , which avoids zombie processes:

 import pickle import os import multiprocessing class MyClass: ids_list = range(300) folder = os.path.join(os.getcwd(), 'app_geo') STOP_TOKEN = 'stop queue' def main(self): # >>> Queue part shared between processes <<< manager = multiprocessing.Manager() remove_id_queue = manager.Queue() remove_id_process = multiprocessing.Process(target=self.remove_id_from_file, args=(remove_id_queue,)) remove_id_process.start() # >>> End of queue part <<< pool = multiprocessing.Pool(3) def kill_pool(err_msg): print(err_msg) pool.terminate() for osm_id in self.ids_list: pool.apply_async(self.handle_country_or_region, kwds=dict(country_id=osm_id), error_callback=kill_pool) pool.close() pool.join() # >>> Anti-zombie processes queue part <<< remove_id_queue.put(self.STOP_TOKEN) remove_id_process.join() manager.shutdown() # >>> End def handle_country_or_region(self, country_id=None, queue=None): # here I do some task queue.put(country_id) def remove_id_from_file(self, some_queue): while True: osm_id = some_queue.get() if osm_id == self.STOP_TOKEN: return self.ids_list.remove(osm_id) with open(self.folder + '/ids_list.pickle', 'wb') as f: pickle.dump(self.ids_list, f) 
0
source

Source: https://habr.com/ru/post/1246410/


All Articles