In a nutshell
I get a BrokenProcessPool exception when parallelizing my code with concurrent.futures . No additional error is displayed. I want to find the cause of the error and ask for ideas on how to do this.
Complete problem
I use concurrent.futures to parallelize some code.
with ProcessPoolExecutor() as pool: mapObj = pool.map(myMethod, args)
I end up (and only with) with the following exception:
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore
Unfortunately, the program is complicated, and the error appears only after the program starts within 30 minutes. Therefore, I cannot provide a good minimal example.
To find the cause of the problem, I wrapped a method that I run in parallel with try-except-block:
def myMethod(*args): try: ... except Exception as e: print(e)
The problem remained the same, and an exception block was never introduced. I came to the conclusion that the exception does not come from my code.
The next step was to write a custom ProcessPoolExecutor class, which is a child of the original ProcessPoolExecutor , and allows me to replace some methods with cusomized. I copied and pasted the source code of the _process_worker method and added some print statements.
def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. ... """ while True: call_item = call_queue.get(block=True) if call_item is None:
Again, an except block is never entered. This was to be expected, since I was already convinced that my code does not raise an exception (and if everything works well, the exception should be passed to the main process).
Now I have not enough ideas how I can find a mistake. The exception is here:
def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool('A child process terminated ' 'abruptly, the process pool is not usable anymore') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._pending_work_items[self._queue_count] = w self._work_ids.put(self._queue_count) self._queue_count += 1
The process pool is installed here:
def _queue_management_worker(executor_reference, processes, pending_work_items, work_ids_queue, call_queue, result_queue): """Manages the communication between this process and the worker processes. ... """ executor = None def shutting_down(): return _shutdown or executor is None or executor._shutdown_thread def shutdown_worker(): ... reader = result_queue._reader while True: _add_call_item_to_queue(pending_work_items, work_ids_queue, call_queue) sentinels = [p.sentinel for p in processes.values()] assert sentinels ready = wait([reader] + sentinels) if reader in ready: result_item = reader.recv() else:
It is (or it seems) the fact that the process is ending, but I do not know why. Are my thoughts still corrected? What are the possible reasons why the process ends without reporting? (Is this possible?) Where can I apply further diagnostics? What questions should I ask myself to get closer to a solution?
I am using python 3.5 on 64 bit Linux.