From bce669a225f05779bb293db4727cea80812c37e7 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 16 May 2025 15:30:11 +0200 Subject: [PATCH] first running version --- README.md | 28 ++++++---- main.py | 30 +++++++++++ pxsi.py | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 204 insertions(+), 9 deletions(-) create mode 100644 main.py create mode 100644 pxsi.py diff --git a/README.md b/README.md index 510904c..2951a13 100644 --- a/README.md +++ b/README.md @@ -12,23 +12,31 @@ Code example # declare variables visibile over serial - vars = Pxsi('MyBox', # idn will be: pxsi,MyBox - t = 5, - a = 2, - ) + with Pxsi('MyBox'): # idn will be: pxsi,MyBox + pollinterval = 5.0 # important: add decimal point in case you want this to be a float + anint = 2 + afloat = 0.0 + astring = 'text' + + # the given variables are now available in the main namespace # create action functions for some of the variables - def change_t(value): + def change_afloat(value): + value = float(value) # check value is compatible with float # write here what to do when t changes + return value # return the validated value - # your real code + # the validity of has to be checked here, and if any conversion happens + # the converted value has to be returned + # if no change function is present, the value is converted to the type of + # the initial value (e.g. float, int or str) + + # here follows your real code - print(t) # the given variables are now available in the main namesapce - while True: # handle IO, wait at most 0.5 secs - vars.handle_io(0.5) + Pxsi.handle(0.5) # read out sensors, do control logic here # important: do not sleep here! @@ -43,6 +51,8 @@ Commands: ["name1", ["name2", value]] # multiple commands: query name1, change name2. must be a valid JSON array {"name1": null, "name2": value} # alternative form - be aware that order is not guaranteed + The values must match the type of initial values + Special command for identification *IDN? diff --git a/main.py b/main.py new file mode 100644 index 0000000..bf47466 --- /dev/null +++ b/main.py @@ -0,0 +1,30 @@ +# simple example +# + +from machine import Pin +from pxsi import Pxsi + +with Pxsi('MyBox'): + t = 0.5 # blink interval + a = 'x' + b = 0 # changes to this variable will be converted to int ! + +Pxsi.add(d=0.0) + + +# called whenever a is changed remotely +def change_a(value): + print(f'changed t: {value!r}') + return str(value) # return validated value + + +led = Pin(6, Pin.OUT) + + +def main(): + while 1: + led.toggle() + Pxsi.handle(t) # wait at most for the given blink interval + Pxsi.vars['b'] += 1 + +main() \ No newline at end of file diff --git a/pxsi.py b/pxsi.py new file mode 100644 index 0000000..0f70842 --- /dev/null +++ b/pxsi.py @@ -0,0 +1,155 @@ +"""pxsi: parameter update over serial interface + +Markus Zolliker (markus.zolliker@psi.ch) + +see README.md for documentation +""" + +import sys +import uselect +import json +import __main__ + +CR = 13 +LF = 10 + + +class Serial: + def __init__(self): + # self.select = uselect.poll() + # self.select.register(sys.stdin, uselect.POLLIN) + self.buffer = bytearray() + self._last_char = None + + def read(self, timeout=0): + """read a line or None if not yet available within timeout""" + prev = self._last_char + while uselect.select([sys.stdin], [], [], timeout)[0]: + ch = sys.stdin.buffer.read(1)[0] + if ch < 32: # control characters + if ch == LF: + if prev == CR: + prev = ch + continue + elif ch != CR: + # ignore other control characters + prev = ch + continue + else: + self.buffer.append(ch) + prev = ch + continue + result = self.buffer.decode() + self.buffer = bytearray() + self._last_char = ch + return result + self._last_char = prev + + def write(self, line): + """write a line appended with the line end""" + sys.stdout.write(line.encode() + b'\r\n') + + +class Pxsi: + def __init__(self, idn, **kwds): + self.idn = f'pxsi,{idn}' + self.io = Serial() + self.io.write(self.idn) + self.main = __main__.__dict__ + self.varset = set(kwds) + self.main.update(kwds) + # make methods also available on the class + Pxsi.handle = self.handle + Pxsi.add = self.add + # key access of variables via Pxsi.vars + Pxsi.vars = self + + def add(self, **kwds): + self.varset.update(kwds) + self.main.update(kwds) + + def __enter__(self): + self._previous = set(__main__.__dict__) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.varset.update(set(self.main) - set(self._previous)) + + def __getitem__(self, key): + if key not in self.varset: + raise KeyError(f'{key!r} is not a declared variable') + return self.main[key] + + def __setitem__(self, key, value): + if key not in self.varset: + raise KeyError(f'{key!r} is not a declared variable') + self.main[key] = value + + def handle(self, timeout): + """handle input and output until timeout has expired + + or some parameters were changed + """ + request = self.io.read(timeout) + if not request: + return + while request: + reply = {} + errors = [] + jsonvalue = request + try: + if request.startswith('{'): + todo = json.loads(request).items() + elif request.startswith('['): + todo = json.loads(request) + elif request == '*IDN?': + self.io.write(self.idn) + request = self.io.read(0) + continue + elif request == '*': + todo = self.varset + else: + split = request.split('=', 1) + if len(split) == 2: + jsonvalue = split[1] + todo = [(split[0], json.loads(jsonvalue))] + else: + todo = split + except ValueError: + errors = [f'invalid JSON: {jsonvalue}'] + todo = [] + for cmd in todo: + if isinstance(cmd, str): + key = cmd + value = None + else: + key, value = cmd + if key not in self.varset: + errors.append(f"BadKey({key!r})") + continue + if value is None: + reply[key] = self.main[key] + continue + func_name = f'change_{key}' + change_func = self.main.get(func_name) + if not change_func: + change_func = type(self.main[key]) + try: + result = change_func(value) + if result is not None: + value = result + except Exception as e: + if func_name == change_func.__name__: + errors.append(f'{e!r} in {func_name}') + else: + errors.append(f'{e!r} converting {key}') + continue + self.main[key] = value + reply[key] = value + if errors: + reply['_errors_'] = errors + self.io.write(json.dumps(reply)) + # continue only as long as there is something in the buffer + request = self.io.read(0) + + def __repr__(self): + return f"{type(self).__name__}({self.idn!r})"