I have a function that takes a large array of x, y pairs as input, which does some complicated curve setup using numpy and scipy and then returns a single value. To try to speed things up, I'm trying to use two streams that I pass data using Queue.Queue. As soon as the data is completed. I try to have threads terminate and then terminate the calling process and return control to the shell.
I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the command line.
Self.join () does not exit the program. The only way to regain control is to use the private stop method.
def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE"
Here is an approximate code for my code:
class CalcThread(threading.Thread): def __init__(self,in_queue,out_queue,function): threading.Thread.__init__(self) self.in_queue = in_queue self.out_queue = out_queue self.function = function self.finished = threading.Event() def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE" self._Thread__stop() def run(self): while not self.finished.isSet(): params_for_function = self.in_queue.get() try: tm = self.function(paramsforfunction) self.in_queue.task_done() self.out_queue.put(tm) except ValueError as v: #modify params and reinsert into queue window = params_for_function["window"] params_for_function["window"] = window + 1 self.in_queue.put(params_for_function) def big_calculation(well_id,window,data_arrays): # do some analysis to calculate tm return tm if __name__ == "__main__": NUM_THREADS = 2 workers = [] in_queue = Queue() out_queue = Queue() for i in range(NUM_THREADS): w = CalcThread(in_queue,out_queue,big_calculation) w.start() workers.append(w) if options.analyze_all: for i in well_ids: in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict)) in_queue.join() print "ALL THREADS SEEM TO BE DONE" # gather data and report it from out_queue for i in well_ids: p = out_queue.get() print p out_queue.task_done() # I had to do this to get the out_queue to proceed if out_queue.qsize() == 0: out_queue.join() break # Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method for aworker in workers: aworker.stop()
source share