Sorry, this will be a long time ...
Perhaps related:
Python Multiprocessing atexit Error "Error in atexit._run_exitfuncs"
Definitely related:
python parallel map (multiprocessing.Pool.map) with global data
Keyboard interrupts with python multiprocessing pool
Here's a โsimpleโ script I hacked together to illustrate my problem ...
import time import multiprocessing as multi import atexit cleanup_stuff=multi.Manager().list([]) ################################################## # Some code to allow keyboard interrupts ################################################## was_interrupted=multi.Manager().list([]) class _interrupt(object): """ Toy class to allow retrieval of the interrupt that triggered it execution """ def __init__(self,interrupt): self.interrupt=interrupt def interrupt(): was_interrupted.append(1) def interruptable(func): """ decorator to allow functions to be "interruptable" by a keyboard interrupt when in python multiprocessing.Pool.map **Note**, this won't actually cause the Map to be interrupted, It will merely cause the following functions to be not executed. """ def newfunc(*args,**kwargs): try: if(not was_interrupted): return func(*args,**kwargs) else: return False except KeyboardInterrupt as e: interrupt() return _interrupt(e) #If we really want to know about the interrupt... return newfunc @atexit.register def cleanup(): for i in cleanup_stuff: print(i) return @interruptable def func(i): print(i) cleanup_stuff.append(i) time.sleep(float(i)/10.) return i #Must wrap func here, otherwise it won't be found in __main__ dict #Maybe because it was created dynamically using the decorator? def wrapper(*args): return func(*args) if __name__ == "__main__": #This is an attempt to use signals -- I also attempted something similar where #The signals were only caught in the child processes...Or only on the main process... # #import signal #def onSigInt(*args): interrupt() #signal.signal(signal.SIGINT,onSigInt) #Try 2 with signals (only catch signal on main process) #import signal #def onSigInt(*args): interrupt() #signal.signal(signal.SIGINT,onSigInt) #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN) #p=multi.Pool(processes=4,initializer=startup) #Try 3 with signals (only catch signal on child processes) #import signal #def onSigInt(*args): interrupt() #signal.signal(signal.SIGINT,signal.SIG_IGN) #def startup(): signal.signal(signal.SIGINT,onSigInt) #p=multi.Pool(processes=4,initializer=startup) p=multi.Pool(4) try: out=p.map(wrapper,range(30)) #out=p.map_async(wrapper,range(30)).get() #This doesn't work either... #The following lines don't work either #Effectively trying to roll my own p.map() with p.apply_async # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)] # out = [ r.get() for r in results() ] except KeyboardInterrupt: print ("Hello!") out=None finally: p.terminate() p.join() print (out)
This works fine if KeyboardInterrupt is not raised. However, if I pick it up, the following exception will occur:
10 7 9 12 ^CHello! None Error in atexit._run_exitfuncs: Traceback (most recent call last): File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs func(*targs, **kargs) File "test.py", line 58, in cleanup for i in cleanup_stuff: File "<string>", line 2, in __getitem__ File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod self._connect() File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client c = SocketClient(address) File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient s.connect(address) File "<string>", line 1, in connect error: [Errno 2] No such file or directory Error in sys.exitfunc: Traceback (most recent call last): File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs func(*targs, **kargs) File "test.py", line 58, in cleanup for i in cleanup_stuff: File "<string>", line 2, in __getitem__ File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod self._connect() File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client c = SocketClient(address) File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient s.connect(address) File "<string>", line 1, in connect socket.error: [Errno 2] No such file or directory
Interestingly, the code really exits the Pool.map function without calling any additional functions ... The problem is that KeyboardInterrupt is not being processed properly at some point, but it is a bit confusing where it is and why it is not handled by interrupt. Thanks.
Please note: the same problem occurs if I use out=p.map_async(wrapper,range(30)).get()
EDIT 1
A little closer ... If I enclose out=p.map(...) in the try,except,finally clause, it will get rid of the first exception ... however, they still remain in atexit. Updated code and trace have been updated.
EDIT 2
Something else that does not work has been added to the code above as a comment. (Same error). This attempt was inspired by:
http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/
EDIT 3
Another unsuccessful attempt to use the signals added to the code above.
EDIT 4
I figured out how to restructure my code so that the above is no longer needed. In the (unlikely) event that someone came across this thread with the same use case as me, I will describe my decision ...
Use case
I have a function that generates temporary files using the tempfile module. I would like these temporary files to be cleaned when the program exits. My initial attempt was to pack each temporary file name into a list, and then delete all the elements of the list with a function registered through atexit.register . The problem is that the updated list was not updated in several processes. It was here that I got the idea of โโusing multiprocessing.Manager to manage list data. Unfortunately, this fails on KeyboardInterrupt , no matter how hard I try, because for some reason the communication slots between the processes were broken. The solution to this problem is simple. Before using multiprocessing, set the temporary directory of files ... something like tempfile.tempdir=tempfile.mkdtemp() , and then register a function to delete the temporary directory. Each of the processes is written to the same temporary directory, so it works. Of course, this solution only works where general data is a list of files that need to be deleted at the end of the programโs life cycle.