Files
pxsi/pxsi.py
2025-05-16 15:30:11 +02:00

156 lines
4.9 KiB
Python

"""pxsi: parameter update over serial interface
Markus Zolliker (markus.zolliker@psi.ch)
see README.md for documentation
"""
import sys
import uselect
import json
import __main__
CR = 13
LF = 10
class Serial:
def __init__(self):
# self.select = uselect.poll()
# self.select.register(sys.stdin, uselect.POLLIN)
self.buffer = bytearray()
self._last_char = None
def read(self, timeout=0):
"""read a line or None if not yet available within timeout"""
prev = self._last_char
while uselect.select([sys.stdin], [], [], timeout)[0]:
ch = sys.stdin.buffer.read(1)[0]
if ch < 32: # control characters
if ch == LF:
if prev == CR:
prev = ch
continue
elif ch != CR:
# ignore other control characters
prev = ch
continue
else:
self.buffer.append(ch)
prev = ch
continue
result = self.buffer.decode()
self.buffer = bytearray()
self._last_char = ch
return result
self._last_char = prev
def write(self, line):
"""write a line appended with the line end"""
sys.stdout.write(line.encode() + b'\r\n')
class Pxsi:
def __init__(self, idn, **kwds):
self.idn = f'pxsi,{idn}'
self.io = Serial()
self.io.write(self.idn)
self.main = __main__.__dict__
self.varset = set(kwds)
self.main.update(kwds)
# make methods also available on the class
Pxsi.handle = self.handle
Pxsi.add = self.add
# key access of variables via Pxsi.vars
Pxsi.vars = self
def add(self, **kwds):
self.varset.update(kwds)
self.main.update(kwds)
def __enter__(self):
self._previous = set(__main__.__dict__)
def __exit__(self, exc_type, exc_val, exc_tb):
self.varset.update(set(self.main) - set(self._previous))
def __getitem__(self, key):
if key not in self.varset:
raise KeyError(f'{key!r} is not a declared variable')
return self.main[key]
def __setitem__(self, key, value):
if key not in self.varset:
raise KeyError(f'{key!r} is not a declared variable')
self.main[key] = value
def handle(self, timeout):
"""handle input and output until timeout has expired
or some parameters were changed
"""
request = self.io.read(timeout)
if not request:
return
while request:
reply = {}
errors = []
jsonvalue = request
try:
if request.startswith('{'):
todo = json.loads(request).items()
elif request.startswith('['):
todo = json.loads(request)
elif request == '*IDN?':
self.io.write(self.idn)
request = self.io.read(0)
continue
elif request == '*':
todo = self.varset
else:
split = request.split('=', 1)
if len(split) == 2:
jsonvalue = split[1]
todo = [(split[0], json.loads(jsonvalue))]
else:
todo = split
except ValueError:
errors = [f'invalid JSON: {jsonvalue}']
todo = []
for cmd in todo:
if isinstance(cmd, str):
key = cmd
value = None
else:
key, value = cmd
if key not in self.varset:
errors.append(f"BadKey({key!r})")
continue
if value is None:
reply[key] = self.main[key]
continue
func_name = f'change_{key}'
change_func = self.main.get(func_name)
if not change_func:
change_func = type(self.main[key])
try:
result = change_func(value)
if result is not None:
value = result
except Exception as e:
if func_name == change_func.__name__:
errors.append(f'{e!r} in {func_name}')
else:
errors.append(f'{e!r} converting {key}')
continue
self.main[key] = value
reply[key] = value
if errors:
reply['_errors_'] = errors
self.io.write(json.dumps(reply))
# continue only as long as there is something in the buffer
request = self.io.read(0)
def __repr__(self):
return f"{type(self).__name__}({self.idn!r})"