The following is a Python issue that demonstrates how to execute a function funcin parallel using multiprocessing.Pool. The number of elements Npto iterate. The function funcsimply returns Npminus the iterability index. As you can see, I use the queue to return values from the function when working in parallel mode.
If I install runParallel=False, the program can be executed in sequential mode.
The program works fine, for runParallel=Falseand runParallel=True, but now the main problem arises: As you can see below, if the parameter is problemIndexslightly lower Np(for example, problemIndex=7), then I am making a floating-point exception. I am divided by zero - stupid me :-)
On startup, runParallel=FalseI can see the line number of the source error code, and I will catch the error directly.
$ python map.py
Traceback (most recent call last):
File "map.py", line 63, in <module>
a = func(argList[p])
File "map.py", line 22, in func
ret = 1/(args["index"]-args["problemIndex"])
ZeroDivisionError: integer division or modulo by zero
Nice!
However, for runParallel=TrueI just finished the printed “Bummer” section without indicating the source of the error. Annoyingly!
My question is: for runParallel=True, how can I effectively debug this and get the line number from the buggy line of code from Pool()?
import time
import multiprocessing
import sys
import random
runParallel = True
problemIndex = 13
Np = 10
def func(args):
time.sleep(random.randint(1,4))
ret = args["Np"] - args["index"]
if args["index"]==args["problemIndex"]:
ret = 1/(args["index"]-args["problemIndex"])
if args["runParallel"]:
args["q"].put((args["index"],ret))
else:
return ret
manager = multiprocessing.Manager()
q = manager.Queue()
argList = []
for i in range(Np):
args={}
args["index"] = i
args["Np"] = Np
args["q"] = q
args["problemIndex"] = problemIndex
args["runParallel"] = runParallel
argList.append(args)
if runParallel:
p = multiprocessing.Pool(processes=10)
ret = p.map_async(func, argList)
ret.wait()
qLen = q.qsize()
p.close()
if not qLen == Np:
print "Bummer - one of more worker threads broke down",Np,qLen
sys.exit(0)
resultVector = [None]*Np
for p in range(Np):
if runParallel:
(i,a) = q.get(timeout=0.1)
else:
i = p
a = func(argList[p])
resultVector[i] = a
for i in range(Np):
print "Index", i, "gives",resultVector[i]