Python threading.Thread can only be stopped using the private method self .__ Thread_stop ()

I have a function that takes a large array of x, y pairs as input, which does some complicated curve setup using numpy and scipy and then returns a single value. To try to speed things up, I'm trying to use two streams that I pass data using Queue.Queue. As soon as the data is completed. I try to have threads terminate and then terminate the calling process and return control to the shell.

I am trying to understand why I have to resort to a private method in threading.Thread to stop my threads and return control to the command line.

Self.join () does not exit the program. The only way to regain control is to use the private stop method.

def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE" # self.join(timeout=None) does not work self._Thread__stop() 

Here is an approximate code for my code:

  class CalcThread(threading.Thread): def __init__(self,in_queue,out_queue,function): threading.Thread.__init__(self) self.in_queue = in_queue self.out_queue = out_queue self.function = function self.finished = threading.Event() def stop(self): print "STOP CALLED" self.finished.set() print "SET DONE" self._Thread__stop() def run(self): while not self.finished.isSet(): params_for_function = self.in_queue.get() try: tm = self.function(paramsforfunction) self.in_queue.task_done() self.out_queue.put(tm) except ValueError as v: #modify params and reinsert into queue window = params_for_function["window"] params_for_function["window"] = window + 1 self.in_queue.put(params_for_function) def big_calculation(well_id,window,data_arrays): # do some analysis to calculate tm return tm if __name__ == "__main__": NUM_THREADS = 2 workers = [] in_queue = Queue() out_queue = Queue() for i in range(NUM_THREADS): w = CalcThread(in_queue,out_queue,big_calculation) w.start() workers.append(w) if options.analyze_all: for i in well_ids: in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict)) in_queue.join() print "ALL THREADS SEEM TO BE DONE" # gather data and report it from out_queue for i in well_ids: p = out_queue.get() print p out_queue.task_done() # I had to do this to get the out_queue to proceed if out_queue.qsize() == 0: out_queue.join() break # Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method for aworker in workers: aworker.stop() 
+6
source share
3 answers

In general, it is a bad idea to kill a thread that modifies a shared resource.

Processes with heavy CPU utilization in multiple threads are worse than useless in Python if you don't release the GIL when doing the calculations. Many numpy functions release GIL.

ThreadPoolExecutor example from documentation

 import concurrent.futures # on Python 2.x: pip install futures calc_args = [] if options.analyze_all: calc_args.extend(dict(well_id=i,...) for i in well_ids) with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor: future_to_args = dict((executor.submit(big_calculation, args), args) for args in calc_args) while future_to_args: for future in concurrent.futures.as_completed(dict(**future_to_args)): args = future_to_args.pop(future) if future.exception() is not None: print('%r generated an exception: %s' % (args, future.exception())) if isinstance(future.exception(), ValueError): #modify params and resubmit args["window"] += 1 future_to_args[executor.submit(big_calculation, args)] = args else: print('f%r returned %r' % (args, future.result())) print("ALL work SEEMs TO BE DONE") 

You can replace ThreadPoolExecutor with ProcessPoolExecutor if there is no general state. Put the code in your main() function.

+5
source

To clarify my comment - if the sole purpose of your threads is to consume values ​​from the queue and perform a function on them, you are clearly better off doing something like this IMHO:

 q = Queue() results = [] def worker(): while True: x, y = q.get() results.append(x ** y) q.task_done() for _ in range(workerCount): t = Thread(target = worker) t.daemon = True t.start() for tup in listOfXYs: q.put(tup) q.join() # Some more code here with the results list. 

q.join() will block until it becomes empty. Workflows will continue to try to retrieve the values, but will not find them, so they will wait indefinitely as soon as the queue is empty. When your script completes its execution later, worker threads will die because they are marked as daemon threads.

+4
source

I tried the gddc method and it gave an interesting result. I could get his exact calculation of x ** y, so that only a subtle spread between the threads would work.

When I called my function inside the worker, while True loop. I could only perform calculations among multiple threads if I put time.sleep (1) in the for loop, which calls the start () method of the threads.

So in my code. Without time.sleep (1), the program gave me either a clean output without output, or in some cases

"An exception in the thread Thread-2 (most likely to occur when the interpreter is completed): An exception in the thread-1 thread (most likely to occur when the interpreter is turned off):"

As soon as I added time.sleep (), everything was fine.

 for aworker in range(5): t = Thread(target = worker) t.daemon = True t.start() # This sleep was essential or results for my specific function were None time.sleep(1) print "Started" 
0
source

Source: https://habr.com/ru/post/898796/


All Articles