How to allow Python finish thread

I am doing a project involving data collection and logging. I have 2 threads, a collection thread and a log stream, both started mostly. I am trying to enable program termination using Ctrl-C.

I use threading.Event to signal threads to complete their respective loops. It works great to stop the sim_collectData method, but it doesn't seem to stop the logData stream. The Collection terminated print statement is never executed, and the program simply stops. (This is not the end, just sitting there).

The second while in logData is to make sure everything in the queue is logged. The goal is for Ctrl-C to immediately stop the collection thread, and then allow the logging thread to finish emptying the queue and only then complete the program. (Right now, the data is only printed out - eventually it will be entered into the database).

I do not understand why the second thread never ends. I base what I did on this answer: Stopping the thread after a certain time . What am I missing?

 def sim_collectData(input_queue, stop_event): ''' this provides some output simulating the serial data from the data logging hardware. ''' n = 0 while not stop_event.is_set(): input_queue.put("DATA: <here are some random data> " + str(n)) stop_event.wait(random.randint(0,5)) n += 1 print "Terminating data collection..." return def logData(input_queue, stop_event): n = 0 # we *don't* want to loop based on queue size because the queue could # theoretically be empty while waiting on some data. while not stop_event.is_set(): d = input_queue.get() if d.startswith("DATA:"): print d input_queue.task_done() n += 1 # if the stop event is recieved and the previous loop terminates, # finish logging the rest of the items in the queue. print "Collection terminated. Logging remaining data to database..." while not input_queue.empty(): d = input_queue.get() if d.startswith("DATA:"): print d input_queue.task_done() n += 1 return def main(): input_queue = Queue.Queue() stop_event = threading.Event() # used to signal termination to the threads print "Starting data collection thread...", collection_thread = threading.Thread(target=sim_collectData, args=(input_queue, stop_event)) collection_thread.start() print "Done." print "Starting logging thread...", logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event)) logging_thread.start() print "Done." try: while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): # stop data collection. Let the logging thread finish logging everything in the queue stop_event.set() main() 
+4
source share
4 answers

The problem is that your registrar is expecting d = input_queue.get() and is not checking the event. One solution is to completely skip the event and create a unique message that informs the log of the shutdown. When you receive a signal, send this message to the queue.

 import threading import Queue import random import time def sim_collectData(input_queue, stop_event): ''' this provides some output simulating the serial data from the data logging hardware. ''' n = 0 while not stop_event.is_set(): input_queue.put("DATA: <here are some random data> " + str(n)) stop_event.wait(random.randint(0,5)) n += 1 print "Terminating data collection..." input_queue.put(None) return def logData(input_queue): n = 0 # we *don't* want to loop based on queue size because the queue could # theoretically be empty while waiting on some data. while True: d = input_queue.get() if d is None: input_queue.task_done() return if d.startswith("DATA:"): print d input_queue.task_done() n += 1 def main(): input_queue = Queue.Queue() stop_event = threading.Event() # used to signal termination to the threads print "Starting data collection thread...", collection_thread = threading.Thread(target=sim_collectData, args=(input_queue, stop_event)) collection_thread.start() print "Done." print "Starting logging thread...", logging_thread = threading.Thread(target=logData, args=(input_queue,)) logging_thread.start() print "Done." try: while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): # stop data collection. Let the logging thread finish logging everything in the queue stop_event.set() main() 
+7
source

I'm not an expert in streaming mode, but in your logData function logData first d=input_queue.get() blocked, that is, if the queue is empty, it will wait indefinitely until it receives a queue message. This is probably why the logData stream never ends, it sits waiting for a queue message.

See [Python Docs] to change this to read without blocking: use .get(False) or .get_nowait() - but this will require either some exception handling for cases where the queue is empty.

+2
source

You cause a lock on your input_queue without a timeout. In any section of logData , if you call input_queue.get() and the queue is empty, it will block indefinitely, preventing the completion of logging_thread .

To fix the error, you want to call input_queue.get_nowait() or pass a timeout to input_queue.get() .

Here is my suggestion:

 def logData(input_queue, stop_event): n = 0 while not stop_event.is_set(): try: d = input_queue.get_nowait() if d.startswith("DATA:"): print "LOG: " + d n += 1 except Queue.Empty: time.sleep(1) return 

You also signal the termination of flows, but not waiting for them. Think about it in your main function.

 try: while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): stop_event.set() collection_thread.join() logging_thread.join() 
0
source

Based on tdelaney's answer, I created an iterator based approach. An iterator exits when a completion message is encountered. I also added a get lock count at the moment and a stop method that sends so many completion messages. To prevent a race condition between incrementing and reading a counter, I set the stop bit there. In addition, I do not use None as a completion message because it cannot be compared with other data types when using PriorityQueue .

There are two limitations that I did not need to eliminate. For one, the stop method first waits until the queue is empty before closing threads. The second limitation is that I did not have any code to reuse the queue after stop . The latter can probably be added quite easily, while the former requires caution regarding concurrency and the context in which the code is used.

You must decide if you want stop also expect all completion messages to be used. I decided to put join there, but you can just delete it.

So this is the code:

 import threading, queue from functools import total_ordering @total_ordering class Final: def __repr__(self): return "∞" def __lt__(self, other): return False def __eq__(self, other): return isinstance(other, Final) Infty = Final() class IterQueue(queue.Queue): def __init__(self): self.lock = threading.Lock() self.stopped = False self.getters = 0 super().__init__() def __iter__(self): return self def get(self): raise NotImplementedError("This queue may only be used as an iterator.") def __next__(self): with self.lock: if self.stopped: raise StopIteration self.getters += 1 data = super().get() if data == Infty: self.task_done() raise StopIteration with self.lock: self.getters -= 1 return data def stop(self): self.join() self.stopped = True with self.lock: for i in range(self.getters): self.put(Infty) self.join() class IterPriorityQueue(IterQueue, queue.PriorityQueue): pass 

Oh and I wrote this in python 3.2 . So, after you back up,

 import threading, Queue from functools import total_ordering @total_ordering class Final: def __repr__(self): return "Infinity" def __lt__(self, other): return False def __eq__(self, other): return isinstance(other, Final) Infty = Final() class IterQueue(Queue.Queue, object): def __init__(self): self.lock = threading.Lock() self.stopped = False self.getters = 0 super(IterQueue, self).__init__() def __iter__(self): return self def get(self): raise NotImplementedError("This queue may only be used as an iterator.") def next(self): with self.lock: if self.stopped: raise StopIteration self.getters += 1 data = super(IterQueue, self).get() if data == Infty: self.task_done() raise StopIteration with self.lock: self.getters -= 1 return data def stop(self): self.join() self.stopped = True with self.lock: for i in range(self.getters): self.put(Infty) self.join() class IterPriorityQueue(IterQueue, Queue.PriorityQueue): pass 

would you use it as

 import random import time def sim_collectData(input_queue, stop_event): ''' this provides some output simulating the serial data from the data logging hardware. ''' n = 0 while not stop_event.is_set(): input_queue.put("DATA: <here are some random data> " + str(n)) stop_event.wait(random.randint(0,5)) n += 1 print "Terminating data collection..." return def logData(input_queue): n = 0 # we *don't* want to loop based on queue size because the queue could # theoretically be empty while waiting on some data. for d in input_queue: if d.startswith("DATA:"): print d input_queue.task_done() n += 1 def main(): input_queue = IterQueue() stop_event = threading.Event() # used to signal termination to the threads print "Starting data collection thread...", collection_thread = threading.Thread(target=sim_collectData, args=(input_queue, stop_event)) collection_thread.start() print "Done." print "Starting logging thread...", logging_thread = threading.Thread(target=logData, args=(input_queue,)) logging_thread.start() print "Done." try: while True: time.sleep(10) except (KeyboardInterrupt, SystemExit): # stop data collection. Let the logging thread finish logging everything in the queue stop_event.set() input_queue.stop() main() 
0
source

Source: https://habr.com/ru/post/1490535/


All Articles