From cc4fc3268a1f335ccc95748e9145b6175b420812 Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Thu, 1 Jun 2023 13:59:58 +0200 Subject: [PATCH] added Queue subclass where put_nowait honors mutex lock, and that can be flushed --- bstrd/prodqueue.py | 24 ++++++++++++++++++++++++ bstrd/prodthread.py | 6 +++--- 2 files changed, 27 insertions(+), 3 deletions(-) create mode 100644 bstrd/prodqueue.py diff --git a/bstrd/prodqueue.py b/bstrd/prodqueue.py new file mode 100644 index 0000000..0684e6c --- /dev/null +++ b/bstrd/prodqueue.py @@ -0,0 +1,24 @@ +from queue import Queue, Full + + +class ProdQueue(Queue): + + def put_nowait(self, *args, **kwargs): + if self.mutex.locked(): + raise Locked + + super().put_nowait(*args, **kwargs) + + + def clear(self): + with self.mutex: + self.queue.clear() + + + +class Locked(Exception): + "Exception raised by ProdQueue.put_nowait() if mutex is locked." + pass + + + diff --git a/bstrd/prodthread.py b/bstrd/prodthread.py index 43aa8a4..3e59c42 100644 --- a/bstrd/prodthread.py +++ b/bstrd/prodthread.py @@ -1,7 +1,7 @@ import signal from threading import Thread, Event -from queue import Queue, Full +from .prodqueue import ProdQueue, Full, Locked def prepend_signal(sig, func): @@ -38,7 +38,7 @@ class ProdThread: self.func = func self.thread = None - self.queue = Queue(maxsize=maxsize) + self.queue = ProdQueue(maxsize=maxsize) self.get = self.queue.get self.running = Event() @@ -58,7 +58,7 @@ class ProdThread: for data in gen: try: self.queue.put_nowait(data) - except Full: + except (Full, Locked): pass self.running.clear()