made BSCache.run() a generator; moved dealing with the queue completely into ProdThread; allowed setting the maxsize of the queue from ProdThread()

This commit is contained in:
2021-09-08 18:40:54 +02:00
parent 36b14b470c
commit 78591549a9
2 changed files with 30 additions and 10 deletions

View File

@ -17,11 +17,11 @@ class BSCache:
def __next__(self): def __next__(self):
self.pt.start() self.pt.start()
self.data = data = self.pt.queue.get() self.data = data = self.pt.get()
return data return data
def run(self, queue, running): def run(self, running):
channels = self.channels.keys() channels = self.channels.keys()
configs = self.channels.values() configs = self.channels.values()
with source(channels=configs, receive_timeout=-1) as src: with source(channels=configs, receive_timeout=-1) as src:
@ -29,7 +29,7 @@ class BSCache:
msg = src.receive() msg = src.receive()
data = repack(channels, msg) data = repack(channels, msg)
if data: if data:
queue.put(data) yield data
def stop(self): def stop(self):

View File

@ -1,7 +1,7 @@
import signal import signal
from threading import Thread, Event from threading import Thread, Event
from queue import Queue from queue import Queue, Full
def prepend_signal(sig, func): def prepend_signal(sig, func):
@ -17,16 +17,30 @@ def prepend_signal(sig, func):
class ProdThread: class ProdThread:
""" """
provided func has to accept two arguments: Upon call of start(),
- queue = queue.Queue() the provided func will be executed with the argument
- running = threading.Event() running = threading.Event()
in a separate thread.
The result is expected to be iterable
and the yielded values will be filled into a queue.
If the queue is full, the values will be dropped.
The oldest entry can be retrieved/removed via get().
The iterator should obey the state of the running Event.
Calling stop() clears the running Event and joins the thread.
""" """
def __init__(self, func): def __init__(self, func, maxsize=0):
self.func = func self.func = func
self.thread = None self.thread = None
self.queue = Queue() self.queue = Queue(maxsize=maxsize)
self.get = self.queue.get
self.running = Event() self.running = Event()
prepend_signal(signal.SIGINT, self.stop) prepend_signal(signal.SIGINT, self.stop)
@ -34,9 +48,15 @@ class ProdThread:
def target(self): def target(self):
self.running.set() self.running.set()
self.func(self.queue, self.running) gen = self.func(self.running)
for data in gen:
try:
self.queue.put_nowait(data)
except Full:
pass
self.running.clear() self.running.clear()
def start(self): def start(self):
if not self.thread: if not self.thread:
self.thread = thread = Thread(target=self.target) self.thread = thread = Thread(target=self.target)