diff --git a/bs.py b/bs.py new file mode 100644 index 0000000..65f16ac --- /dev/null +++ b/bs.py @@ -0,0 +1,8 @@ +from bscache import BSCache + + +bsstream = BSCache() +BS = bsstream.get_var + + + diff --git a/bscache.py b/bscache.py new file mode 100644 index 0000000..d0823ce --- /dev/null +++ b/bscache.py @@ -0,0 +1,73 @@ +from time import sleep +from bsread import source +from bsvar import BSVar +from prodthread import ProdThread + + +class BSCache: + + def __init__(self): + self.channels = set() + self.data = None + self.pt = ProdThread(self.run) + + + def __iter__(self): + return self + + def __next__(self): + self.pt.start() + self.data = data = self.pt.queue.get() + return data + + + def run(self, queue, running): + channels = self.channels + with source(channels=channels, receive_timeout=-1) as src: + while running.is_set(): + msg = src.receive() + data = repack(channels, msg) + if data: + queue.put(data) + + + def stop(self, *args): + print("\n\nstopping\n\n", args) + self.pt.stop() + + + def get_var(self, name): + if name is not "pid" and not name in self.channels: + self.update_source(name) + return BSVar(name, self) + + + def update_source(self, name): + self.pt.stop() + print("add channel", name) + self.channels.add(name) + self.pt.start() + + + + + +def repack(channels, message): + data = message.data.data + pulse_id = message.data.pulse_id + + res = {n: data[n].value for n in channels} +# res = {k: v for k, v in res.items() if v is not None} + + #TODO: should this be a ValueError? + if any(v is None for v in res.values()): + return None + + if not res: + return None + + res["pid"] = pulse_id + return res + + + diff --git a/bsvar.py b/bsvar.py new file mode 100644 index 0000000..e6e9d7b --- /dev/null +++ b/bsvar.py @@ -0,0 +1,21 @@ + +class BSVar: + + def __init__(self, name, cache): + self.name = name + self.cache = cache + + def get(self): + try: + return self.cache.data[self.name] + except KeyError as e: + print("KeyError:", e) #TODO: remove / KeyError should be impossible to trigger + return None + + value = property(get) + + def __repr__(self): + return f"{self.name} = {self.value}" + + + diff --git a/clock.py b/clock.py new file mode 100644 index 0000000..520c16d --- /dev/null +++ b/clock.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +from time import time + + +class Clock(object): + """ + Simple timing clock + + prec sets precision of returned time deltas + + tick() returns time delta since last tick + tock() returns time delta since the start of the clock + """ + + def __init__(self, prec=2): + self.prec = prec + self.start = self.last = time() + + def tick(self): + """Time delta since last tick""" + now = time() + delta = now - self.last + self.last = now + return self._fmt_delta(delta) + + def tock(self): + """Time delta since the start of the clock""" + now = time() + delta = now - self.start + return self._fmt_delta(delta) + + def _fmt_delta(self, delta): + """Format time deltas using the precision given to the constructor""" + if self.prec is not None: + delta = round(delta, self.prec) + return delta + + + + + +if __name__ == "__main__": + from time import sleep + + c = Clock() + for i in range(5): + sleep(float(i) / 10) + print(i, c.tick(), c.tock()) + + + diff --git a/ltddict.py b/ltddict.py new file mode 100644 index 0000000..5ae6395 --- /dev/null +++ b/ltddict.py @@ -0,0 +1,22 @@ +from collections import OrderedDict + + +class LimitedDict(OrderedDict): + + def __init__(self, *args, maxlen=None, **kwargs): + super().__init__(*args, **kwargs) + self.maxlen = maxlen + self._ensure_length() + + def __setitem__(self, *args): + super().__setitem__(*args) + self._ensure_length() + + def _ensure_length(self): + if self.maxlen is None: + return + while len(self) > self.maxlen: + self.popitem(last=False) + + + diff --git a/prodthread.py b/prodthread.py new file mode 100644 index 0000000..2c8bbbb --- /dev/null +++ b/prodthread.py @@ -0,0 +1,52 @@ +import signal + +from threading import Thread, Event +from queue import Queue + + +def prepend_signal(sig, func): + handler = signal.getsignal(sig) + + def wrapper(*args, **kwargs): + func(*args, **kwargs) + handler(*args, **kwargs) + + signal.signal(sig, wrapper) + + + +class ProdThread: + """ + provided func has to accept two arguments: + - queue = queue.Queue() + - running = threading.Event() + """ + + def __init__(self, func): + self.func = func + + self.thread = None + self.queue = Queue() + self.running = Event() + + prepend_signal(signal.SIGINT, self.stop) + + + def target(self): + self.running.set() + self.func(self.queue, self.running) + self.running.clear() + + def start(self): + if not self.thread: + self.thread = thread = Thread(target=self.target) + thread.start() + + def stop(self, *args): # signal() gives some extra args + self.running.clear() + if self.thread: + self.thread.join() + self.thread = None + + + diff --git a/run.py b/run.py new file mode 100755 index 0000000..02f3031 --- /dev/null +++ b/run.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +from time import sleep +from bs import BS, bsstream + + +v0 = BS("pid") +v1 = BS("SATFE10-PEPG046:FCUP-INTENSITY-CAL") +#v2 = BS("SATES21-GES1:A1_VALUES") +#v3 = BS("SIN-CVME-TIFGUN-EVR0:RX-PULSEID") + + +#from collections import defaultdict +#h = defaultdict(int) + + +for i, data in enumerate(bsstream): + sleep(0.005) + +# h[mc.queue.qsize()] += 1 + +# print(v0.value) + +# if v0.value is not None and v0.value % 100 == 0: +# for val, count in sorted(h.items()): +# print(val, "x" * count if count <100 else "=" + str(count)) +# print() + + if i == 200: + v2 = BS("SATES21-GES1:A1_VALUES") + + if i < 200: + print(i, v0, v1) + else: + print(i, v0, v1, v2) + + + if i == 400: + break + + +for data in bsstream: + print(data) + +