"""pxsi: parameter update over serial interface Markus Zolliker (markus.zolliker@psi.ch) see README.md for documentation """ import sys import uselect import json import __main__ CR = 13 LF = 10 class Serial: def __init__(self): # self.select = uselect.poll() # self.select.register(sys.stdin, uselect.POLLIN) self.buffer = bytearray() self._last_char = None def read(self, timeout=0): """read a line or None if not yet available within timeout""" prev = self._last_char while uselect.select([sys.stdin], [], [], timeout)[0]: ch = sys.stdin.buffer.read(1)[0] if ch < 32: # control characters if ch == LF: if prev == CR: prev = ch continue elif ch != CR: # ignore other control characters prev = ch continue else: self.buffer.append(ch) prev = ch continue result = self.buffer.decode() self.buffer = bytearray() self._last_char = ch return result self._last_char = prev def write(self, line): """write a line appended with the line end""" sys.stdout.write(line.encode() + b'\r\n') class Pxsi: def __init__(self, idn, **kwds): self.idn = f'pxsi,{idn}' self.io = Serial() self.io.write(self.idn) self.main = __main__.__dict__ self.varset = set(kwds) self.main.update(kwds) # make methods also available on the class Pxsi.handle = self.handle Pxsi.add = self.add # key access of variables via Pxsi.vars Pxsi.vars = self def add(self, **kwds): self.varset.update(kwds) self.main.update(kwds) def __enter__(self): self._previous = set(__main__.__dict__) def __exit__(self, exc_type, exc_val, exc_tb): self.varset.update(set(self.main) - set(self._previous)) def __getitem__(self, key): if key not in self.varset: raise KeyError(f'{key!r} is not a declared variable') return self.main[key] def __setitem__(self, key, value): if key not in self.varset: raise KeyError(f'{key!r} is not a declared variable') self.main[key] = value def handle(self, timeout): """handle input and output until timeout has expired or some parameters were changed """ request = self.io.read(timeout) if not request: return while request: reply = {} errors = [] jsonvalue = request try: if request.startswith('{'): todo = json.loads(request).items() elif request.startswith('['): todo = json.loads(request) elif request == '*IDN?': self.io.write(self.idn) request = self.io.read(0) continue elif request == '*': todo = self.varset else: split = request.split('=', 1) if len(split) == 2: jsonvalue = split[1] todo = [(split[0], json.loads(jsonvalue))] else: todo = split except ValueError: errors = [f'invalid JSON: {jsonvalue}'] todo = [] for cmd in todo: if isinstance(cmd, str): key = cmd value = None else: key, value = cmd if key not in self.varset: errors.append(f"BadKey({key!r})") continue if value is None: reply[key] = self.main[key] continue func_name = f'change_{key}' change_func = self.main.get(func_name) if not change_func: change_func = type(self.main[key]) try: result = change_func(value) if result is not None: value = result except Exception as e: if func_name == change_func.__name__: errors.append(f'{e!r} in {func_name}') else: errors.append(f'{e!r} converting {key}') continue self.main[key] = value reply[key] = value if errors: reply['_errors_'] = errors self.io.write(json.dumps(reply)) # continue only as long as there is something in the buffer request = self.io.read(0) def __repr__(self): return f"{type(self).__name__}({self.idn!r})"