from bsread import dispatcher, source, Source from bsread import BIND, CONNECT, PUSH, PULL, PUB, SUB # make these easier to access from .bsvar import BSVar from .prodthread import ProdThread FIXED_CHANNELS = { "pid", "toc" } class BSCache: def __init__(self, maxsize=100, **kwargs): kwargs.setdefault("receive_timeout", 1000) self.channels = kwargs.pop("channels", {}) self.kwargs = kwargs self.data = None self.pt = ProdThread(self.run, maxsize=maxsize) def __repr__(self): return str(self.data) def __iter__(self): return self def __next__(self): self.pt.start() self.data = data = self.pt.get() return data def run(self, running): timeout_counter = 0 configs = self.channels.values() or None with source(channels=configs, **self.kwargs) as src: while running.is_set(): msg = src.receive() if msg is None: timeout_counter += 1 print("receive timed out", timeout_counter) continue data = repack(msg) if data: data["toc"] = timeout_counter yield data def start(self): self.pt.start() while self.data is None: print("dropping empty data") next(self) while not self.channels.keys() <= self.data.keys(): missing = self.channels.keys() - self.data.keys() - FIXED_CHANNELS print("dropping data that is missing channel(s):", sorted(missing)) next(self) def stop(self): self.pt.stop() def get_vars(self, names): if not isinstance(names, dict): names = {n: {} for n in names} new_chans = {} for name, kwargs in names.items(): if name not in FIXED_CHANNELS and name not in self.channels: if self.uses_dispatcher: check_availability(name) cfg = make_channel_config(name, *kwargs) new_chans[name] = cfg if new_chans: print("add new channels", sorted(new_chans)) self.add_vars(new_chans) return {n: BSVar(n, self) for n in names} def get_var(self, name, modulo=None, offset=None): if name not in FIXED_CHANNELS and name not in self.channels: if self.uses_dispatcher: check_availability(name) cfg = make_channel_config(name, modulo, offset) print("add new channel", name) self.add_var(name, cfg) return BSVar(name, self) def add_vars(self, chans): self.stop() self.channels.update(chans) self.start() def add_var(self, name, cfg): self.stop() self.channels[name] = cfg self.start() def rem_vars(self, names): self.stop() for n in names: del self.channels[n] self.start() def rem_var(self, name): self.stop() del self.channels[name] self.start() def clear_vars(self): self.stop() self.channels.clear() # cannot start without any channel def flush(self): self.pt.queue.clear() @property def uses_dispatcher(self): return not bool(self.kwargs.get("host")) BSCache.__doc__ = f""" maxsize: Size of the queue between main and receiving thread. kwargs are forwarded to bsread.source:{Source.__init__.__doc__} """ def check_availability(name): if not is_available(name): raise ValueError(f"channel {name} is not available") def is_available(name): available = get_available_channels() return name in available def get_available_channels(): channels = dispatcher.get_current_channels() return set(ch["name"] for ch in channels) def make_channel_config(name, modulo=None, offset=None): res = {} if modulo is not None: res["modulo"] = modulo if offset is not None: res["offset"] = offset if not res: return name res["name"] = name return res def repack(message): data = message.data.data pulse_id = message.data.pulse_id res = {n: v.value for n, v in data.items()} # res = {k: v for k, v in res.items() if v is not None} #TODO: should this be a ValueError? if any(v is None for v in res.values()): return None if not res: return None res["pid"] = pulse_id return res