Files
BStrd/bstrd/bscache.py

187 lines
4.5 KiB
Python

from bsread import dispatcher, source, Source
from bsread import BIND, CONNECT, PUSH, PULL, PUB, SUB # make these easier to access
from .bsvar import BSVar
from .prodthread import ProdThread
FIXED_CHANNELS = {
"pid",
"toc"
}
class BSCache:
def __init__(self, maxsize=100, **kwargs):
kwargs.setdefault("receive_timeout", 1000)
self.channels = kwargs.pop("channels", {})
self.kwargs = kwargs
self.data = None
self.pt = ProdThread(self.run, maxsize=maxsize)
def __repr__(self):
return str(self.data)
def __iter__(self):
return self
def __next__(self):
self.pt.start()
self.data = data = self.pt.get()
return data
def run(self, running):
timeout_counter = 0
configs = self.channels.values() or None
with source(channels=configs, **self.kwargs) as src:
while running.is_set():
msg = src.receive()
if msg is None:
timeout_counter += 1
print("receive timed out", timeout_counter)
continue
data = repack(msg)
if data:
data["toc"] = timeout_counter
yield data
def start(self):
self.pt.start()
while self.data is None:
print("dropping empty data")
next(self)
while not self.channels.keys() <= self.data.keys():
missing = self.channels.keys() - self.data.keys() - FIXED_CHANNELS
print("dropping data that is missing channel(s):", sorted(missing))
next(self)
def stop(self):
self.pt.stop()
def get_vars(self, names):
if not isinstance(names, dict):
names = {n: {} for n in names}
new_chans = {}
for name, kwargs in names.items():
if name not in FIXED_CHANNELS and name not in self.channels:
if self.uses_dispatcher:
check_availability(name)
cfg = make_channel_config(name, *kwargs)
new_chans[name] = cfg
if new_chans:
print("add new channels", sorted(new_chans))
self.add_vars(new_chans)
return {n: BSVar(n, self) for n in names}
def get_var(self, name, modulo=None, offset=None):
if name not in FIXED_CHANNELS and name not in self.channels:
if self.uses_dispatcher:
check_availability(name)
cfg = make_channel_config(name, modulo, offset)
print("add new channel", name)
self.add_var(name, cfg)
return BSVar(name, self)
def add_vars(self, chans):
self.stop()
self.channels.update(chans)
self.start()
def add_var(self, name, cfg):
self.stop()
self.channels[name] = cfg
self.start()
def rem_vars(self, names):
self.stop()
for n in names:
del self.channels[n]
self.start()
def rem_var(self, name):
self.stop()
del self.channels[name]
self.start()
def clear_vars(self):
self.stop()
self.channels.clear()
# cannot start without any channel
def flush(self):
self.pt.queue.clear()
@property
def uses_dispatcher(self):
return not bool(self.kwargs.get("host"))
BSCache.__doc__ = f"""
maxsize: Size of the queue between main and receiving thread.
kwargs are forwarded to bsread.source:{Source.__init__.__doc__}
"""
def check_availability(name):
if not is_available(name):
raise ValueError(f"channel {name} is not available")
def is_available(name):
available = get_available_channels()
return name in available
def get_available_channels():
channels = dispatcher.get_current_channels()
return set(ch["name"] for ch in channels)
def make_channel_config(name, modulo=None, offset=None):
res = {}
if modulo is not None:
res["modulo"] = modulo
if offset is not None:
res["offset"] = offset
if not res:
return name
res["name"] = name
return res
def repack(message):
data = message.data.data
pulse_id = message.data.pulse_id
res = {n: v.value for n, v in data.items()}
# res = {k: v for k, v in res.items() if v is not None}
#TODO: should this be a ValueError?
if any(v is None for v in res.values()):
return None
if not res:
return None
res["pid"] = pulse_id
return res