From 78591549a91047830a274ed2f7d36b07e5986bfd Mon Sep 17 00:00:00 2001 From: Sven Augustin Date: Wed, 8 Sep 2021 18:40:54 +0200 Subject: [PATCH] made BSCache.run() a generator; moved dealing with the queue completely into ProdThread; allowed setting the maxsize of the queue from ProdThread() --- bstrd/bscache.py | 6 +++--- bstrd/prodthread.py | 34 +++++++++++++++++++++++++++------- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/bstrd/bscache.py b/bstrd/bscache.py index 8d6b283..f4fc8b7 100644 --- a/bstrd/bscache.py +++ b/bstrd/bscache.py @@ -17,11 +17,11 @@ class BSCache: def __next__(self): self.pt.start() - self.data = data = self.pt.queue.get() + self.data = data = self.pt.get() return data - def run(self, queue, running): + def run(self, running): channels = self.channels.keys() configs = self.channels.values() with source(channels=configs, receive_timeout=-1) as src: @@ -29,7 +29,7 @@ class BSCache: msg = src.receive() data = repack(channels, msg) if data: - queue.put(data) + yield data def stop(self): diff --git a/bstrd/prodthread.py b/bstrd/prodthread.py index 2c8bbbb..39695ca 100644 --- a/bstrd/prodthread.py +++ b/bstrd/prodthread.py @@ -1,7 +1,7 @@ import signal from threading import Thread, Event -from queue import Queue +from queue import Queue, Full def prepend_signal(sig, func): @@ -17,16 +17,30 @@ def prepend_signal(sig, func): class ProdThread: """ - provided func has to accept two arguments: - - queue = queue.Queue() - - running = threading.Event() + Upon call of start(), + the provided func will be executed with the argument + running = threading.Event() + in a separate thread. + + The result is expected to be iterable + and the yielded values will be filled into a queue. + + If the queue is full, the values will be dropped. + + The oldest entry can be retrieved/removed via get(). + + The iterator should obey the state of the running Event. + + Calling stop() clears the running Event and joins the thread. """ - def __init__(self, func): + def __init__(self, func, maxsize=0): self.func = func self.thread = None - self.queue = Queue() + self.queue = Queue(maxsize=maxsize) + self.get = self.queue.get + self.running = Event() prepend_signal(signal.SIGINT, self.stop) @@ -34,9 +48,15 @@ class ProdThread: def target(self): self.running.set() - self.func(self.queue, self.running) + gen = self.func(self.running) + for data in gen: + try: + self.queue.put_nowait(data) + except Full: + pass self.running.clear() + def start(self): if not self.thread: self.thread = thread = Thread(target=self.target)