diff --git a/bstrd/prodqueue.py b/bstrd/prodqueue.py new file mode 100644 index 0000000..0684e6c --- /dev/null +++ b/bstrd/prodqueue.py @@ -0,0 +1,24 @@ +from queue import Queue, Full + + +class ProdQueue(Queue): + + def put_nowait(self, *args, **kwargs): + if self.mutex.locked(): + raise Locked + + super().put_nowait(*args, **kwargs) + + + def clear(self): + with self.mutex: + self.queue.clear() + + + +class Locked(Exception): + "Exception raised by ProdQueue.put_nowait() if mutex is locked." + pass + + + diff --git a/bstrd/prodthread.py b/bstrd/prodthread.py index 43aa8a4..3e59c42 100644 --- a/bstrd/prodthread.py +++ b/bstrd/prodthread.py @@ -1,7 +1,7 @@ import signal from threading import Thread, Event -from queue import Queue, Full +from .prodqueue import ProdQueue, Full, Locked def prepend_signal(sig, func): @@ -38,7 +38,7 @@ class ProdThread: self.func = func self.thread = None - self.queue = Queue(maxsize=maxsize) + self.queue = ProdQueue(maxsize=maxsize) self.get = self.queue.get self.running = Event() @@ -58,7 +58,7 @@ class ProdThread: for data in gen: try: self.queue.put_nowait(data) - except Full: + except (Full, Locked): pass self.running.clear()