Finding BrokenProcessPool reason in python concurrent.futures

In a nutshell

I get a BrokenProcessPool exception when parallelizing my code with concurrent.futures . No additional error is displayed. I want to find the cause of the error and ask for ideas on how to do this.

Complete problem

I use concurrent.futures to parallelize some code.

 with ProcessPoolExecutor() as pool: mapObj = pool.map(myMethod, args) 

I end up (and only with) with the following exception:

 concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore 

Unfortunately, the program is complicated, and the error appears only after the program starts within 30 minutes. Therefore, I cannot provide a good minimal example.

To find the cause of the problem, I wrapped a method that I run in parallel with try-except-block:

 def myMethod(*args): try: ... except Exception as e: print(e) 

The problem remained the same, and an exception block was never introduced. I came to the conclusion that the exception does not come from my code.

The next step was to write a custom ProcessPoolExecutor class, which is a child of the original ProcessPoolExecutor , and allows me to replace some methods with cusomized. I copied and pasted the source code of the _process_worker method and added some print statements.

 def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. ... """ while True: call_item = call_queue.get(block=True) if call_item is None: # Wake up queue management thread result_queue.put(os.getpid()) return try: r = call_item.fn(*call_item.args, **call_item.kwargs) except BaseException as e: print("??? Exception ???") # newly added print(e) # newly added exc = _ExceptionWithTraceback(e, e.__traceback__) result_queue.put(_ResultItem(call_item.work_id, exception=exc)) else: result_queue.put(_ResultItem(call_item.work_id, result=r)) 

Again, an except block is never entered. This was to be expected, since I was already convinced that my code does not raise an exception (and if everything works well, the exception should be passed to the main process).

Now I have not enough ideas how I can find a mistake. The exception is here:

 def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool('A child process terminated ' 'abruptly, the process pool is not usable anymore') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread self._result_queue.put(None) self._start_queue_management_thread() return f 

The process pool is installed here:

 def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue): """Manages the communication between this process and the worker processes. ... """ executor = None def shutting_down(): return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): ... reader = result_queue._reader while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) sentinels = [p.sentinel for p in processes.values()] assert sentinels ready = wait([reader] + sentinels) if reader in ready: result_item = reader.recv() else: #THIS BLOCK IS ENTERED WHEN THE ERROR OCCURS # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: executor._broken = True executor._shutdown_thread = True executor = None # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( BrokenProcessPool( "A process in the process pool was " "terminated abruptly while the future was " "running or pending." )) # Delete references to object. See issue16284 del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() shutdown_worker() return ... 

It is (or it seems) the fact that the process is ending, but I do not know why. Are my thoughts still corrected? What are the possible reasons why the process ends without reporting? (Is this possible?) Where can I apply further diagnostics? What questions should I ask myself to get closer to a solution?

I am using python 3.5 on 64 bit Linux.

+5
source share
1 answer

I think I managed as far as possible:

I changed the _queue_management_worker method in my modified ProcessPoolExecutor module so that the exit code of the failed process is printed:

 def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue): """Manages the communication between this process and the worker processes. ... """ executor = None def shutting_down(): return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): ... reader = result_queue._reader while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) sentinels = [p.sentinel for p in processes.values()] assert sentinels ready = wait([reader] + sentinels) if reader in ready: result_item = reader.recv() else: # BLOCK INSERTED FOR DIAGNOSIS ONLY --------- vals = list(processes.values()) for s in ready: j = sentinels.index(s) print("is_alive()", vals[j].is_alive()) print("exitcode", vals[j].exitcode) # ------------------------------------------- # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: executor._broken = True executor._shutdown_thread = True executor = None # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( BrokenProcessPool( "A process in the process pool was " "terminated abruptly while the future was " "running or pending." )) # Delete references to object. See issue16284 del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. for p in processes.values(): p.terminate() shutdown_worker() return ... 

Subsequently, I looked at the exit code value:

 from multiprocessing.process import _exitcode_to_name print(_exitcode_to_name[my_exit_code]) 

whereby my_exit_code is the exit code that was printed in block I inserted in _queue_management_worker . In my case, the code was -11, which means that I encountered a segmentation error. Finding the cause of this problem will be a huge task, but is beyond the scope of this issue.

+2
source

Source: https://habr.com/ru/post/1262248/


All Articles