Based on tdelaney's answer, I created an iterator based approach. An iterator exits when a completion message is encountered. I also added a get lock count at the moment and a stop method that sends so many completion messages. To prevent a race condition between incrementing and reading a counter, I set the stop bit there. In addition, I do not use None as a completion message because it cannot be compared with other data types when using PriorityQueue .
There are two limitations that I did not need to eliminate. For one, the stop method first waits until the queue is empty before closing threads. The second limitation is that I did not have any code to reuse the queue after stop . The latter can probably be added quite easily, while the former requires caution regarding concurrency and the context in which the code is used.
You must decide if you want stop also expect all completion messages to be used. I decided to put join there, but you can just delete it.
So this is the code:
import threading, queue from functools import total_ordering @total_ordering class Final: def __repr__(self): return "∞" def __lt__(self, other): return False def __eq__(self, other): return isinstance(other, Final) Infty = Final() class IterQueue(queue.Queue): def __init__(self): self.lock = threading.Lock() self.stopped = False self.getters = 0 super().__init__() def __iter__(self): return self def get(self): raise NotImplementedError("This queue may only be used as an iterator.") def __next__(self): with self.lock: if self.stopped: raise StopIteration self.getters += 1 data = super().get() if data == Infty: self.task_done() raise StopIteration with self.lock: self.getters -= 1 return data def stop(self): self.join() self.stopped = True with self.lock: for i in range(self.getters): self.put(Infty) self.join() class IterPriorityQueue(IterQueue, queue.PriorityQueue): pass
Oh and I wrote this in python 3.2 . So, after you back up,
import threading, Queue from functools import total_ordering @total_ordering class Final: def __repr__(self): return "Infinity" def __lt__(self, other): return False def __eq__(self, other): return isinstance(other, Final) Infty = Final() class IterQueue(Queue.Queue, object): def __init__(self): self.lock = threading.Lock() self.stopped = False self.getters = 0 super(IterQueue, self).__init__() def __iter__(self): return self def get(self): raise NotImplementedError("This queue may only be used as an iterator.") def next(self): with self.lock: if self.stopped: raise StopIteration self.getters += 1 data = super(IterQueue, self).get() if data == Infty: self.task_done() raise StopIteration with self.lock: self.getters -= 1 return data def stop(self): self.join() self.stopped = True with self.lock: for i in range(self.getters): self.put(Infty) self.join() class IterPriorityQueue(IterQueue, Queue.PriorityQueue): pass
would you use it as
import random import time def sim_collectData(input_queue, stop_event): ''' this provides some output simulating the serial data from the data logging hardware. ''' n = 0 while not stop_event.is_set(): input_queue.put("DATA: <here are some random data> " + str(n)) stop_event.wait(random.randint(0,5)) n += 1 print "Terminating data collection..." return def logData(input_queue): n = 0