first running version
This commit is contained in:
155
pxsi.py
Normal file
155
pxsi.py
Normal file
@ -0,0 +1,155 @@
|
||||
"""pxsi: parameter update over serial interface
|
||||
|
||||
Markus Zolliker (markus.zolliker@psi.ch)
|
||||
|
||||
see README.md for documentation
|
||||
"""
|
||||
|
||||
import sys
|
||||
import uselect
|
||||
import json
|
||||
import __main__
|
||||
|
||||
CR = 13
|
||||
LF = 10
|
||||
|
||||
|
||||
class Serial:
|
||||
def __init__(self):
|
||||
# self.select = uselect.poll()
|
||||
# self.select.register(sys.stdin, uselect.POLLIN)
|
||||
self.buffer = bytearray()
|
||||
self._last_char = None
|
||||
|
||||
def read(self, timeout=0):
|
||||
"""read a line or None if not yet available within timeout"""
|
||||
prev = self._last_char
|
||||
while uselect.select([sys.stdin], [], [], timeout)[0]:
|
||||
ch = sys.stdin.buffer.read(1)[0]
|
||||
if ch < 32: # control characters
|
||||
if ch == LF:
|
||||
if prev == CR:
|
||||
prev = ch
|
||||
continue
|
||||
elif ch != CR:
|
||||
# ignore other control characters
|
||||
prev = ch
|
||||
continue
|
||||
else:
|
||||
self.buffer.append(ch)
|
||||
prev = ch
|
||||
continue
|
||||
result = self.buffer.decode()
|
||||
self.buffer = bytearray()
|
||||
self._last_char = ch
|
||||
return result
|
||||
self._last_char = prev
|
||||
|
||||
def write(self, line):
|
||||
"""write a line appended with the line end"""
|
||||
sys.stdout.write(line.encode() + b'\r\n')
|
||||
|
||||
|
||||
class Pxsi:
|
||||
def __init__(self, idn, **kwds):
|
||||
self.idn = f'pxsi,{idn}'
|
||||
self.io = Serial()
|
||||
self.io.write(self.idn)
|
||||
self.main = __main__.__dict__
|
||||
self.varset = set(kwds)
|
||||
self.main.update(kwds)
|
||||
# make methods also available on the class
|
||||
Pxsi.handle = self.handle
|
||||
Pxsi.add = self.add
|
||||
# key access of variables via Pxsi.vars
|
||||
Pxsi.vars = self
|
||||
|
||||
def add(self, **kwds):
|
||||
self.varset.update(kwds)
|
||||
self.main.update(kwds)
|
||||
|
||||
def __enter__(self):
|
||||
self._previous = set(__main__.__dict__)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.varset.update(set(self.main) - set(self._previous))
|
||||
|
||||
def __getitem__(self, key):
|
||||
if key not in self.varset:
|
||||
raise KeyError(f'{key!r} is not a declared variable')
|
||||
return self.main[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key not in self.varset:
|
||||
raise KeyError(f'{key!r} is not a declared variable')
|
||||
self.main[key] = value
|
||||
|
||||
def handle(self, timeout):
|
||||
"""handle input and output until timeout has expired
|
||||
|
||||
or some parameters were changed
|
||||
"""
|
||||
request = self.io.read(timeout)
|
||||
if not request:
|
||||
return
|
||||
while request:
|
||||
reply = {}
|
||||
errors = []
|
||||
jsonvalue = request
|
||||
try:
|
||||
if request.startswith('{'):
|
||||
todo = json.loads(request).items()
|
||||
elif request.startswith('['):
|
||||
todo = json.loads(request)
|
||||
elif request == '*IDN?':
|
||||
self.io.write(self.idn)
|
||||
request = self.io.read(0)
|
||||
continue
|
||||
elif request == '*':
|
||||
todo = self.varset
|
||||
else:
|
||||
split = request.split('=', 1)
|
||||
if len(split) == 2:
|
||||
jsonvalue = split[1]
|
||||
todo = [(split[0], json.loads(jsonvalue))]
|
||||
else:
|
||||
todo = split
|
||||
except ValueError:
|
||||
errors = [f'invalid JSON: {jsonvalue}']
|
||||
todo = []
|
||||
for cmd in todo:
|
||||
if isinstance(cmd, str):
|
||||
key = cmd
|
||||
value = None
|
||||
else:
|
||||
key, value = cmd
|
||||
if key not in self.varset:
|
||||
errors.append(f"BadKey({key!r})")
|
||||
continue
|
||||
if value is None:
|
||||
reply[key] = self.main[key]
|
||||
continue
|
||||
func_name = f'change_{key}'
|
||||
change_func = self.main.get(func_name)
|
||||
if not change_func:
|
||||
change_func = type(self.main[key])
|
||||
try:
|
||||
result = change_func(value)
|
||||
if result is not None:
|
||||
value = result
|
||||
except Exception as e:
|
||||
if func_name == change_func.__name__:
|
||||
errors.append(f'{e!r} in {func_name}')
|
||||
else:
|
||||
errors.append(f'{e!r} converting {key}')
|
||||
continue
|
||||
self.main[key] = value
|
||||
reply[key] = value
|
||||
if errors:
|
||||
reply['_errors_'] = errors
|
||||
self.io.write(json.dumps(reply))
|
||||
# continue only as long as there is something in the buffer
|
||||
request = self.io.read(0)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{type(self).__name__}({self.idn!r})"
|
Reference in New Issue
Block a user