added package structure
This commit is contained in:
4
bstrd/__init__.py
Normal file
4
bstrd/__init__.py
Normal file
@ -0,0 +1,4 @@
|
||||
|
||||
from .bstrd import *
|
||||
|
||||
|
103
bstrd/bscache.py
Normal file
103
bstrd/bscache.py
Normal file
@ -0,0 +1,103 @@
|
||||
from time import sleep
|
||||
from bsread import source, dispatcher
|
||||
from .bsvar import BSVar
|
||||
from .prodthread import ProdThread
|
||||
|
||||
|
||||
class BSCache:
|
||||
|
||||
def __init__(self):
|
||||
self.channels = {}
|
||||
self.data = None
|
||||
self.pt = ProdThread(self.run)
|
||||
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
self.pt.start()
|
||||
self.data = data = self.pt.queue.get()
|
||||
return data
|
||||
|
||||
|
||||
def run(self, queue, running):
|
||||
channels = self.channels.keys()
|
||||
configs = self.channels.values()
|
||||
with source(channels=configs, receive_timeout=-1) as src:
|
||||
while running.is_set():
|
||||
msg = src.receive()
|
||||
data = repack(channels, msg)
|
||||
if data:
|
||||
queue.put(data)
|
||||
|
||||
|
||||
def stop(self):
|
||||
self.pt.stop()
|
||||
|
||||
|
||||
def get_var(self, name, modulo=None, offset=None):
|
||||
if name is not "pid" and not name in self.channels:
|
||||
if not is_available(name):
|
||||
raise ValueError(f"channel {name} is not available")
|
||||
cfg = make_channel_config(name, modulo, offset)
|
||||
self.update_source(name, cfg)
|
||||
return BSVar(name, self)
|
||||
|
||||
|
||||
def update_source(self, name, cfg):
|
||||
self.pt.stop()
|
||||
print("add channel", name)
|
||||
self.channels[name] = cfg
|
||||
self.pt.start()
|
||||
|
||||
while self.data is None or name not in self.data:
|
||||
print("dropping data that is missing new channel:", name)
|
||||
next(self)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def is_available(name):
|
||||
available = get_available_channels()
|
||||
return name in available
|
||||
|
||||
def get_available_channels():
|
||||
channels = dispatcher.get_current_channels()
|
||||
return set(ch["name"] for ch in channels)
|
||||
|
||||
|
||||
|
||||
def make_channel_config(name, modulo, offset):
|
||||
res = {}
|
||||
if modulo is not None:
|
||||
res["modulo"] = modulo
|
||||
if offset is not None:
|
||||
res["offset"] = offset
|
||||
if not res:
|
||||
return name
|
||||
res["name"] = name
|
||||
return res
|
||||
|
||||
|
||||
|
||||
def repack(channels, message):
|
||||
data = message.data.data
|
||||
pulse_id = message.data.pulse_id
|
||||
|
||||
res = {n: data[n].value for n in channels}
|
||||
# res = {k: v for k, v in res.items() if v is not None}
|
||||
|
||||
#TODO: should this be a ValueError?
|
||||
if any(v is None for v in res.values()):
|
||||
return None
|
||||
|
||||
if not res:
|
||||
return None
|
||||
|
||||
res["pid"] = pulse_id
|
||||
return res
|
||||
|
||||
|
||||
|
8
bstrd/bstrd.py
Normal file
8
bstrd/bstrd.py
Normal file
@ -0,0 +1,8 @@
|
||||
from .bscache import BSCache
|
||||
|
||||
|
||||
bsstream = BSCache()
|
||||
BS = bsstream.get_var
|
||||
|
||||
|
||||
|
25
bstrd/bsvar.py
Normal file
25
bstrd/bsvar.py
Normal file
@ -0,0 +1,25 @@
|
||||
|
||||
class BSVar:
|
||||
|
||||
def __init__(self, name, cache):
|
||||
self.name = name
|
||||
self.cache = cache
|
||||
|
||||
def get(self):
|
||||
return self.cache.data.get(self.name)
|
||||
|
||||
value = property(get)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.name} = {self.value}"
|
||||
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
next(self.cache)
|
||||
return self.get()
|
||||
|
||||
|
||||
|
52
bstrd/clock.py
Normal file
52
bstrd/clock.py
Normal file
@ -0,0 +1,52 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
from time import time
|
||||
|
||||
|
||||
class Clock(object):
|
||||
"""
|
||||
Simple timing clock
|
||||
|
||||
prec sets precision of returned time deltas
|
||||
|
||||
tick() returns time delta since last tick
|
||||
tock() returns time delta since the start of the clock
|
||||
"""
|
||||
|
||||
def __init__(self, prec=2):
|
||||
self.prec = prec
|
||||
self.start = self.last = time()
|
||||
|
||||
def tick(self):
|
||||
"""Time delta since last tick"""
|
||||
now = time()
|
||||
delta = now - self.last
|
||||
self.last = now
|
||||
return self._fmt_delta(delta)
|
||||
|
||||
def tock(self):
|
||||
"""Time delta since the start of the clock"""
|
||||
now = time()
|
||||
delta = now - self.start
|
||||
return self._fmt_delta(delta)
|
||||
|
||||
def _fmt_delta(self, delta):
|
||||
"""Format time deltas using the precision given to the constructor"""
|
||||
if self.prec is not None:
|
||||
delta = round(delta, self.prec)
|
||||
return delta
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from time import sleep
|
||||
|
||||
c = Clock()
|
||||
for i in range(5):
|
||||
sleep(float(i) / 10)
|
||||
print(i, c.tick(), c.tock())
|
||||
|
||||
|
||||
|
22
bstrd/ltddict.py
Normal file
22
bstrd/ltddict.py
Normal file
@ -0,0 +1,22 @@
|
||||
from collections import OrderedDict
|
||||
|
||||
|
||||
class LimitedDict(OrderedDict):
|
||||
|
||||
def __init__(self, *args, maxlen=None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.maxlen = maxlen
|
||||
self._ensure_length()
|
||||
|
||||
def __setitem__(self, *args):
|
||||
super().__setitem__(*args)
|
||||
self._ensure_length()
|
||||
|
||||
def _ensure_length(self):
|
||||
if self.maxlen is None:
|
||||
return
|
||||
while len(self) > self.maxlen:
|
||||
self.popitem(last=False)
|
||||
|
||||
|
||||
|
52
bstrd/prodthread.py
Normal file
52
bstrd/prodthread.py
Normal file
@ -0,0 +1,52 @@
|
||||
import signal
|
||||
|
||||
from threading import Thread, Event
|
||||
from queue import Queue
|
||||
|
||||
|
||||
def prepend_signal(sig, func):
|
||||
handler = signal.getsignal(sig)
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
func(*args, **kwargs)
|
||||
handler(*args, **kwargs)
|
||||
|
||||
signal.signal(sig, wrapper)
|
||||
|
||||
|
||||
|
||||
class ProdThread:
|
||||
"""
|
||||
provided func has to accept two arguments:
|
||||
- queue = queue.Queue()
|
||||
- running = threading.Event()
|
||||
"""
|
||||
|
||||
def __init__(self, func):
|
||||
self.func = func
|
||||
|
||||
self.thread = None
|
||||
self.queue = Queue()
|
||||
self.running = Event()
|
||||
|
||||
prepend_signal(signal.SIGINT, self.stop)
|
||||
|
||||
|
||||
def target(self):
|
||||
self.running.set()
|
||||
self.func(self.queue, self.running)
|
||||
self.running.clear()
|
||||
|
||||
def start(self):
|
||||
if not self.thread:
|
||||
self.thread = thread = Thread(target=self.target)
|
||||
thread.start()
|
||||
|
||||
def stop(self, *args): # signal() gives some extra args
|
||||
self.running.clear()
|
||||
if self.thread:
|
||||
self.thread.join()
|
||||
self.thread = None
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user