From 53256d1583c15d756b1afc1c78e781b94815dcaa Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 28 Jan 2026 08:32:59 +0100 Subject: [PATCH] [WIP] work on curses cfg editor state as of 2026-01-28 Change-Id: I73d2fa4e6fda8820a95fe4e7256c7a23bf565f67 --- frappy/datatypes.py | 4 +- frappy/lib/__init__.py | 12 + frappy/tools/__init__.py | 0 frappy/tools/completion.py | 295 +++++++++ frappy/tools/configdata.py | 442 +++++++++++++ frappy/tools/editorutils.py | 106 +++ frappy/tools/terminalgui.py | 1249 +++++++++++++++++++++++++++++++++++ frappy_demo/lakeshore.py | 4 +- frappy_psi/lakeshore.py | 13 +- frappy_psi/picontrol.py | 45 +- paramedit.py | 814 +++++++++++++++++++++++ 11 files changed, 2950 insertions(+), 34 deletions(-) create mode 100644 frappy/tools/__init__.py create mode 100644 frappy/tools/completion.py create mode 100644 frappy/tools/configdata.py create mode 100644 frappy/tools/editorutils.py create mode 100644 frappy/tools/terminalgui.py create mode 100644 paramedit.py diff --git a/frappy/datatypes.py b/frappy/datatypes.py index 7a396e94..16168d9d 100644 --- a/frappy/datatypes.py +++ b/frappy/datatypes.py @@ -99,7 +99,7 @@ class SimpleDataType(HasProperties): - StringType: the bare string is returned - EnumType: the name of the enum is returned """ - return self.format_value(value, False) + return value if isinstance(value, str) else repr(value) def export_value(self, value): """if needed, reformat value for transport""" @@ -1132,7 +1132,7 @@ class CommandType(DataType): # internally used datatypes (i.e. only for programming the SEC-node) -class DefaultType(DataType): +class DefaultType(SimpleDataType): """datatype used as default for parameters needs some minimal interface to avoid errors when diff --git a/frappy/lib/__init__.py b/frappy/lib/__init__.py index b227c13c..79c7fa5f 100644 --- a/frappy/lib/__init__.py +++ b/frappy/lib/__init__.py @@ -492,3 +492,15 @@ def delayed_import(modname): except Exception: return _Raiser(modname) return module + + +class LazyImport: + module = None + + def __init__(self, modulename): + self.modulename = modulename + + def __getattr__(self, name): + if self.module is None: + self.module = __import__(self.modulename) + return getattr(self.module, name) \ No newline at end of file diff --git a/frappy/tools/__init__.py b/frappy/tools/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/frappy/tools/completion.py b/frappy/tools/completion.py new file mode 100644 index 00000000..60959d92 --- /dev/null +++ b/frappy/tools/completion.py @@ -0,0 +1,295 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** + +import inspect +from pathlib import Path +from importlib import import_module +from frappy.core import Module, Parameter, Property +from frappy.datatypes import DataType +from frappy.lib.comparestring import compare +from frappy.tools.configdata import site, ClassChecker, ModuleClass +from frappy.tools.terminalgui import Completion + + +SKIP_PROPS = {'implementation', 'features', + 'interface_classes', 'slowinterval', 'omit_unchanged_within', 'original_id'} + +def recommended_prs(cls): + """get properties and parameters which are useful in configuration + + returns a dict of + """ + if isinstance(cls, str): + try: + cls = ClassCompletion.validate(cls) + except Exception: + return {} + result = {} + for pname, pdict in cls.configurables.items(): + pr = getattr(cls, pname) + if isinstance(pr, Property): + if pname not in SKIP_PROPS: + result[pname] = pr.mandatory + elif isinstance(pr, Parameter): + if pr.needscfg: + result[pname] = True + elif not pr.readonly or hasattr(cls, f'write_{pname}'): + result[pname] = False + if result.get('uri') is False and result.get('io') is False: + result['io'] = True + return result + + + +class FrappyModule(str): + """checker for finding subclasses of Module defined in a python module""" + def check(self, cls): + return isinstance(cls, type) and issubclass(cls, Module) and self.startswith(cls.__module__) + + +def get_suggested(guess, allowed_keys): + """select and sort values from allowed_keys based on similarity to a given value + + :param guess: given value. when empty, the allowed keys are return in the given order + :param allowed_keys: list of values sorted in a given order + :return: a list of values + """ + if len(guess) == 0: + return allowed_keys + low = guess.lower() + if len(guess) == 1: + # return only items starting with a single letter + return [k for k in allowed_keys if k.lower().startswith(low)] + comp = {} + # if there are items containing guess, return them only + result = [k for k in allowed_keys if low in k.lower()] + if result: + return result + # calculate similarity + for key in allowed_keys: + comp[key] = compare(guess, key) + comp = sorted(comp.items(), key=lambda t: t[1], reverse=True) + scorelimit = 2 + result = [] + for i, (key, score) in enumerate(comp): + if score < scorelimit: + break + if i > 2: + scorelimit = max(2, score - 0.05) + result.append(key) + return result or allowed_keys + + +class CheckerObsolete: + root = None + modobj = None + clsobj = None + modname = None + pyfile = None + + def module(self, base, name): + modname = f'{base}.{name}' if base else name + try: + self.modobj = self.root = import_module(modname) + self.modname = modname + self.pyfile = Path(self.modobj.__file__) + return None + except ImportError as e: + return str(e) + except Exception as e: + return f'{modname}: {e!r}' + + def cls(self, base, name): + try: + self.clsobj = getattr(self.root, name) + return None + except Exception: + return f'{base}.{name} does not exist' + + +def class_completion(value): + checker = ClassChecker(value) + if checker.position == len(value): + return checker.position, [] + if checker.root is None: + sdict = {p: f'{p}.' for p in site.packages} + else: + sdict = {} + file = checker.pyfile + if not checker.clsobj: + if file.name == '__init__.py': + sdict = {p.stem: f'{p.stem}.' + for p in sorted(file.parent.glob('*.py')) + if p.stem != '__init__'} + sdict.update((k, k) for k, v in sorted(inspect.getmembers( + checker.root, FrappyModule(checker.modname).check))) + found = sdict.get(checker.name, None) + if found: + selection = [found] + # selection = [found] + list(sdict.values()) + else: + selection = list(get_suggested(checker.name, sdict.values())) + return checker.position, [checker.name] + selection + + + +class Base(Completion, DataType): + def __init__(self, callback): + self.callback = callback + super().__init__() + + def from_string(self, strvalue): + value = self.validate(strvalue) + if self.callback: + self.callback(value) + return value + + def to_string(self, value): + value = self.validate(value) + return f'{value.__module__}.{value.__qualname__}' + + def format_value(self, value, unit=None): + result = repr(self.to_string(value)) + if '<' in result: + raise ValueError(result, value) + return result + + + +class ClassCompletionObsolete(Base): + @staticmethod + def propose(value, get_clsobj=False): + """analyze value to propositions of class path + + returns the length of the valid part and a list of guesses + """ + clspath = value.split('.') + check = CheckerObsolete() + for pathpos, name in enumerate(clspath): + base = '.'.join(clspath[:pathpos]) + clsroot = check.root + if name: + if check.clsobj: + error = check.cls(base, name) + elif name.isupper(): + error = check.cls(base, name) + if error and check.module(base, name) is None: + error = None + else: + error = check.module(base, name) + if error and check.cls(base, name) is None: + error = None + else: + error = 'empty element' + + if get_clsobj: + if error: + return None, error + elif pathpos == len(clspath) - 1 or error: + # get suggestions + # sdict is a dict '' of '' or '.' + # the latter when it is a pymodule + # sdict = {name: None} + if pathpos == 0: + sdict = {p: f'{p}.' for p in site.packages} + else: + sdict = {} + file = check.pyfile + if not check.clsobj: + if file.name == '__init__.py': + sdict = {p.stem: f'{p.stem}.' + for p in sorted(file.parent.glob('*.py')) + if p.stem != '__init__'} + sdict.update((k, k) for k, v in sorted(inspect.getmembers( + clsroot, FrappyModule(check.modname).check))) + found = sdict.get(name, None) + if found: + selection = [found] + # selection = [found] + list(sdict.values()) + else: + selection = list(get_suggested(name, sdict.values())) + position = sum(len(v) for v in clspath[:pathpos]) + pathpos + return position, [name] + selection + check.root = check.clsobj or check.modobj + if get_clsobj: + return check.clsobj, error + return 0, None + + @classmethod + def validate(cls, value, previous=None): + if isinstance(value, type): + if issubclass(value, Module): + return value + raise ValueError('value is a class, but not a frappy module') + clsobj, error = cls.propose(value, True) + if error: + raise ValueError(error) + return clsobj + + +class NameCompletion(Completion, DataType): + # TODO: make obsolete + def __init__(self, callback, get_name_info): + self.callback = callback + self.get_name_info = get_name_info + super().__init__() + + def get_selection(self, param): + cls, used_names = self.get_name_info() + if param: + names = dict(self.cls.configurables) + selection = [] + for name in recommended_prs(self.cls): + names.pop(name, None) + if name not in used_names: + selection.append(name) + selection.extend(k for k in names if k not in used_names) + else: + names = dict(cls.configurables.get(param, {})) + selection = [k for k in names if f'{param}.{k}' not in used_names] + return selection + + def propose(self, value): + """analyze value to propositions of class path + + returns the length of the valid part and a list of guesses + """ + param, dot, prop = value.partition('.') + if (param and not param.isidentifier()) or (prop and not prop.isidentifier()): + return 0, None + cls, used_names = self.get_name_info() + if dot: + print(param, repr(dot), prop, list(cls.configurables.get(param, {}))) + position = len(param) + 1 + selection = [k for k in cls.configurables.get(param, {}) if f'{param}.{k}' not in used_names] + guess = prop + else: + position = 0 + selection = {} + for name in recommended_prs(cls): + if name not in used_names: + selection[name] = 1 + selection.update({k: 1 for k in cls.configurables if k not in used_names}) + for k, v in cls.configurables.items(): + print(k, getattr(v, 'needscfg', '')) + guess = param + selection = get_suggested(guess, selection) + return position, [guess] + selection diff --git a/frappy/tools/configdata.py b/frappy/tools/configdata.py new file mode 100644 index 00000000..88fc8573 --- /dev/null +++ b/frappy/tools/configdata.py @@ -0,0 +1,442 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** + +import re +import frappy +from pathlib import Path +from ast import literal_eval +from importlib import import_module +from frappy.config import process_file, Node +from frappy.core import Module +from frappy.datatypes import DataType + + +class Site: + domain = 'psi.ch' + frappy_subdir = 'frappy_psi' + base = Path(frappy.__file__).parent.parent + + def __init__(self, domain='psi.ch', frappy_subdir='frappy_psi', default_interface='tcp://10767'): + self.init(domain, frappy_subdir, default_interface) + + def init(self, domain=None, frappy_subdir=None, default_interface=None): + if domain: + self.domain = domain + if default_interface: + self.default_interface = default_interface + if frappy_subdir: + self.packages = [v.name for v in self.base.glob('frappy_*')] + try: # psi should be first + self.packages.remove(frappy_subdir) + self.packages.insert(0, frappy_subdir) + except ValueError: + pass + + +site = Site() + + +class NonStringType: + """any type except string""" + def validate(self, value): + self(value) + + def __call__(self, value): + return value + + + def from_string(self, strvalue): + """convert from string """ + try: + return literal_eval(strvalue) + except Exception: + raise ValueError('this is no python value') + + def to_string(self, value): + return repr(value) + + def format_value(self, value, unit=None): + """convert to python code (inverse of ast.literal_eval) + + :param value: the value + :param unit: must be False (needed for compatibility with frappy.datatypes.DataType) + :return: version to used as python code + """ + return repr(value) + + +class SimpleStringType(NonStringType): + def validate(self, value): + pass + + def from_string(self, strvalue): + """convert from string """ + return strvalue + + def to_string(self, value): + return value + + def format_value(self, value, unit=None): + """convert to string + + :param value: + :param unit: must be False (needed for compatibility with frappy datatypes + :return: stringified version (triple quoted when containing line breaks) + """ + if '\n' in value: + value = value.replace('"""', '\\"\\"\\"') + return f'"""{value}"""' + return repr(value) + + +nonstringtype = NonStringType() +stringtype = SimpleStringType() + + +class Value: + """a value with additional info + + - hold a value (self.value) and a stringified value self.strvalue + - set from and get a string representation for use in a input element + - get a python code representation (to be reverted with ast.literal_eval) + - verify if the datatype is valid (this typically needs extension) + - get information for completion + """ + error = None + strvalue = None + modulecls = None + datatype = None + value = None + completion = None + + def __init__(self, value, datatype=None, error=None, from_string=False, callback=None): + if value is None: + raise ValueError(datatype) + self.datatype = datatype + self.error = error + if callback: + self.callback = callback + if from_string: + if datatype is None: + try: + literal_eval(value) + self.datatype = nonstringtype + except Exception: + self.datatype = stringtype + self.set_from_string(value) + else: + if datatype is None: + self.datatype = stringtype if isinstance(value, str) else nonstringtype + self.set_value(value) + + def callback(self, value): + return value + + def set_value(self, value): + self.strvalue = None + try: + dt = self.datatype + value = dt(value) + self.strvalue = dt.to_string(value) + dt.validate(value) + self.value = self.callback(value) + except Exception as e: + self.value = value + self.error = repr(e) + if self.strvalue is None: + self.strvalue = str(value) + + def validate_from_string(self, strvalue): + self.strvalue = strvalue + self.value = self.callback(self.datatype.from_string(strvalue)) + if type(self.datatype).__name__ == 'ClassCompletion': + raise ValueError(self) + + def set_from_string(self, strvalue): + self.strvalue = strvalue + try: + self.validate_from_string(strvalue) + except Exception as e: + self.error = repr(e) + + def get_repr(self): + """convert string value to repr + + :return: repr, to be used when building config code + """ + if self.datatype: + try: + return self.datatype.format_value(self.value, False) + except Exception as e: + pass + return repr(self.strvalue) + + def __repr__(self): + return f'Value({self.value!r}, {self.datatype!r})' + + +def get_datatype(pname, cls, value): + """ + + :param pname: or or . + :param cls: a frappy Module class or None + :param value: the given value (needed only in case the datatype can not be determined) + :return: + """ + param, _, prop = pname.partition('.') + error = None + if cls: + try: + prop_param = cls.configurables[param] + if isinstance(prop_param, dict): + propobj = prop_param.get(prop) if prop else cls.accessibles.get(param) + if propobj is None: + error = f'{cls.__module__}.{cls.__qualname__}.{param}.{prop} is not configurable' + else: + return propobj.datatype, None + elif prop: + error = f'{cls.__module__}.{cls.__qualname__}.{param} is not a parameter' + else: + return prop_param.datatype, None + except AttributeError: + error = f'{cls.__module__}.{cls.__qualname__} is not a Frappy Module' + except KeyError: + error = f'{cls.__module__}.{cls.__qualname__}.{param} is not configurable' + if isinstance(value, str): + return stringtype, error + return nonstringtype, error + + +def make_value(pname, cls, value): + """make value object""" + return Value(value, *get_datatype(pname, cls, value)) + + +class ClassChecker: + root = None + modobj = None + clsobj = None + modname = None + pyfile = None + error = None + + def __init__(self, clsstr): + """analyze clsstr + + returns the length of the valid part and a list of guesses + """ + clspath = clsstr.split('.') + for pathpos, name in enumerate(clspath): + base = '.'.join(clspath[:pathpos]) + if name: + if self.clsobj: + error = self.cls(base, name) + elif name.isupper(): + error = self.cls(base, name) + if error and self.module(base, name) is None: + error = None + else: + error = self.module(base, name) + if error and self.cls(base, name) is None: + error = None + else: + error = 'empty element' + + if error: + self.name = name + self.error = error + self.position = sum(len(v) for v in clspath[:pathpos]) + pathpos + return + self.root = self.clsobj or self.modobj + self.name = None + self.error = None + self.position = len(clsstr) + + def module(self, base, name): + modname = f'{base}.{name}' if base else name + try: + self.modobj = self.root = import_module(modname) + self.modname = modname + self.pyfile = Path(self.modobj.__file__) + return None + except ImportError as e: + return str(e) + except Exception as e: + return f'{modname}: {e!r}' + + def cls(self, base, name): + try: + self.clsobj = getattr(self.root, name) + return None + except Exception: + return f'{base}.{name} does not exist' + + +class ModuleClass(DataType): + @classmethod + def validate(cls, value, previous=None): + if isinstance(value, type): + if issubclass(value, Module): + return value + raise ValueError('value is a class, but not a frappy module') + checker = ClassChecker(value) + if checker.error: + raise ValueError(checker.error) + return checker.clsobj + + @classmethod + def from_string(cls, strvalue): + return cls.validate(strvalue) + + @classmethod + def to_string(cls, value): + value = cls.validate(value) + return f'{value.__module__}.{value.__qualname__}' + + @classmethod + def format_value(cls, value, unit=None): + result = repr(cls.to_string(value)) + if '<' in result: + raise ValueError(result, value) + return result + + +module_class = ModuleClass() + + +def moddata_from_cfgfile(name, cls, **kwds): + if isinstance(cls, str): + clsvalue = Value(cls, module_class, None, from_string=True) + else: + clsvalue = Value(cls, module_class, None) + cls = None if clsvalue.error else clsvalue.value + result = { + 'name': make_value('name', None, name), + 'cls': clsvalue, + } + for param, cfgvalue in kwds.items(): + if isinstance(cfgvalue, dict): + for prop, value in cfgvalue.items(): + pname = param if prop == 'value' else f'{param}.{prop}' + result[pname] = make_value(pname, cls, value) + else: + result[param] = make_value(param, cls, cfgvalue) + return result + + +def moddata_to_py(name, cls, description, **kwds): + if '<' in cls.get_repr(): + raise ValueError(cls) + items = [f'Mod({name!r}', cls.get_repr(), description.get_repr()] + paramdict = {} + for name, valobj in kwds.items(): + param, _, prop = name.partition('.') + paramdict.setdefault(param, {})[prop or 'value'] = valobj + for name, props in paramdict.items(): + valueitem = props.pop('value', None) + if valueitem is None: + args = [] + else: + args = [valueitem.get_repr()] + if not props: + # single value + items.append(f'{name} = {args[0]}') + continue + # args contains value + # extend with keyworded values for parameter properties + args.extend(f'{k}={v.get_repr()}' for k, v in props.items()) + items.append(f"{name} = Param({', '.join(args)})") + items.append(')') + return ',\n '.join(items) + + +def fix_equipment_id(name, equipment_id): + """normalize equipment id""" + if re.match(r'[a-zA-Z0-9_]+(\.[a-zA-Z0-9_]+)*$', equipment_id): + return equipment_id + return f'{name}.{site.domain}' + + +def fix_node_class(cls): + if cls == Node('', '')['cls']: + return '' + return cls + + +def nodedata_from_cfgfile(name, equipment_id='', description='', interface='', cls='', **kwds): + title, _, doc = description.partition('\n') + if doc.startswith('\n'): + doc = doc[1:] + props = { + 'name': name, + 'title': title, + 'doc': doc, + } + eq = fix_equipment_id(name, equipment_id) + if eq != fix_equipment_id(name, ''): + props['equipment_id'] = eq + if interface and interface != site.default_interface: + props['interface'] = interface + cls = fix_node_class(cls) + if cls: + props['cls'] = cls + props.update(kwds) + # TODO: do we have to check the proper datatype for node properties? + return {k: Value(v) for k, v in props.items()} + + +def nodedata_to_py(name, equipment_id, title, doc, interface=None, cls=None, **kwds): + eq_id = fix_equipment_id(name.value, equipment_id.value) + intfc = site.default_interface if interface is None else interface.value + desc = title.value.strip() + doc = doc.value.strip() + if doc: + desc = f'{desc}\n\n{doc}\n' + items = [f"doc={Value(desc).get_repr()}\nNode({eq_id!r}, doc", f'interface={intfc!r}'] + if cls: + clsstr = fix_node_class(cls.value) + if clsstr: + items.append(f'cls={clsstr!r}') + for key, value in kwds.items(): + items.append(f'{key} = {value.get_repr()}') + items.append(')') + return ',\n '.join(items) + + +def cfgdata_to_py(node, **moddata): + """convert cfgdata to python code + + :param node: dict of + :param cfgdata: dict of dict of + :return: python code + """ + items = [nodedata_to_py(**node)] + [moddata_to_py(k, **v) for k, v in moddata.items()] + return '\n\n'.join(items) + + +def cfgdata_from_py(name, cfgpath, filecontent, logger): + if filecontent: + config = process_file(cfgpath, logger, filecontent) + else: + config = {} + nodecfg = config.pop('node', {}) + modcfg = {k: moddata_from_cfgfile(k, **v) for k, v in config.items()} + return nodedata_from_cfgfile(name, **nodecfg), modcfg diff --git a/frappy/tools/editorutils.py b/frappy/tools/editorutils.py new file mode 100644 index 00000000..92bd0c0d --- /dev/null +++ b/frappy/tools/editorutils.py @@ -0,0 +1,106 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""helper functions for configuration editor""" + +import re +from frappy.config import Node +from frappy.lib import get_class + +SITE_TAIL = 'psi.ch' + + +def repr_param(value=object, **kwds): + if value is object: + if not kwds: + return 'Param()' + items = [] + else: + if not kwds: + return value + items = [value] + items.extend(f'{k}={v}' for k, v in kwds.items()) + return f"Param({', '.join(items)})" + + +def repr_module(modcfg, name, cls): + # items = [f'Mod({name}', f'cls={cls}', f'description={repr_param(**description)}'] + [ + # f'{k}={repr_param(**v)}' for k, v in kwds.items()] + [')'] + description = modcfg.pop('description', '') + items = [f'Mod({name}', cls, repr_param(**description)] + [ + f'{k}={repr_param(**v)}' for k, v in modcfg.items()] + [')'] + return ',\n '.join(items) + + +def repr_node(name, description, cls=None, equipment_id='', **kwds): + equipment_id = fix_equipment_id(name, equipment_id) + items = [f'Node({equipment_id!r}', f'description={description}'] + add_node_class(cls, kwds) + items.extend(f'{k}={v}' for k, v in kwds.items()) + items.append(')') + return ',\n '.join(items) + + +def fix_equipment_id(name, equipment_id): + if not re.match(r'[a-zA_Z0-9_]+(\.[a-zA_Z0-9_]+)*$', equipment_id): + equipment_id = f'{name}.{SITE_TAIL}' + return equipment_id + + +def add_node_class(cls, result): + if cls != Node('', '')['cls']: + result['cls'] = cls + + +def normalize_node(name, equipment_id='', description='', cls=None, interface=None, **kwds): + result = {} + eq = fix_equipment_id(name, equipment_id) + if equipment_id and eq != equipment_id: + result['equipment_id'] = eq + result['description'] = description + result['interface'] = interface or 'tcp://5555' + add_node_class(cls, result) + result.update(kwds) + return result + + +def convert_modcfg(cfgdict): + """convert cfgdict + + convert parameter properties to individual items . + """ + result = {} + for key, cfgvalue in cfgdict.items(): + if isinstance(cfgvalue, dict): + result.update((key, v) if k == 'value' else (f'{key}.{k}', v) + for k, v in cfgvalue.items()) + else: + result[key] = cfgvalue + return result + + +def needed_properties(cls): + if isinstance(cls, str): + cls = get_class(cls) + result = [] + for pname, prop in cls.propertyDict.items(): + if prop.mandatory and pname not in {'implementation', 'features'}: + result.append(pname) + return result diff --git a/frappy/tools/terminalgui.py b/frappy/tools/terminalgui.py new file mode 100644 index 00000000..29b23a63 --- /dev/null +++ b/frappy/tools/terminalgui.py @@ -0,0 +1,1249 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +import sys +import curses +import threading +from select import select + + +class Key(int): + def __new__(cls, name, nr): + if isinstance(nr, str): + if nr.startswith('^'): + nr = ord(nr[1]) & 0x1f + key = super().__new__(cls, nr) + key.name = name + return key + + def __repr__(self): + return self.name + +class Keys: + def __init__(self, **kwds): + # mapping int -> letter + self.bynumber = {k: chr(k) for k in range(32, 127)} + self.keys = {} + self.add(**kwds) + + def add(self, **kwds): + toadd = [(k, Key(k, v)) for k, v in kwds.items()] + self.keys.update(toadd) + self.bynumber.update((int(v), v) for _, v in toadd) + + def __getattr__(self, name): + try: + return self.keys[name] + except KeyError: + nr = getattr(curses, f'KEY_{name}', None) + if nr is None: + raise AttributeError(f'no key K.{name} {self.keys}') + self.keys[name] = key = Key(name, nr) + return key + + + +# unfortunately, we can not use some ctrl keys, as they already have a meaning: +# H: backspace, I: tab, M: ret + +K = Keys(ESC=27, TAB=9, DEL=127, RETURN=13, + QUIT='^x', BEG_LINE = '^a', END_LINE = '^e', MENU = '^c', + CUT = '^k', PASTE = '^v', HELP = '^g', + UNHANDLED=-1, GOTO_MAIN=-2, GO_UP=-3, + ) + +def clamp(*args): + return sorted(args)[len(args) // 2] + + +class Logger: + def __init__(self): + # self.parent = self + self.loglines = [] + + def info(self, fmt, *args): + self.loglines.append(fmt % args) + + def debug(self, fmt, *args): + return + + def getChild(self, *args): + return self + + def addHandler(self, *args): + pass + + warning = exception = error = info + handlers = [] + + +class Widget: + parent = None + parent_cls = None + default_height = 1 + log = Logger() + context_menu = None + + def get_menu(self): + if self.context_menu: + return self.context_menu + self.parent.get_menu() + return self.parent.get_menu() + + def init_parent(self, parent, checkcls=None): + checkcls = checkcls or self.parent_cls + if checkcls and not isinstance(parent, checkcls): + raise RuntimeError(f'parent of {self} is {type(parent).__name__},' + f' but must be {checkcls.__name__}') + self.parent = parent + self.log = parent.log + + def current_row(self): + """returns current row""" + return 0 + + def height(self): + """returns current height""" + return self.default_height + + # def draw(self, wr, in_focus=False): + # raise NotImplementedError + + +class HasWidgets: + focus = 0 + widgets = None # list of subwidgets + + def current_row(self): + height = sum(w.height() for w in self.widgets[:self.focus]) + return height + self.get_focus_widget().current_row() + + def height(self): + return sum(w.height() for w in self.widgets) + + def handle(self, main): + try: + while True: + main.current_widget = self.get_focus_widget() + key = main.current_widget.handle(main) + # if key in (CTRL_X, K.LEFT): + # return key + if key == K.UP: + if self.advance(-1): + continue + elif key in (K.DOWN, K.RETURN, K.ENTER): + if self.advance(1): + continue + return key + finally: + main.current_widget = self + + def set_focus(self, focus, step=1): + """set focus + + :param focus: the new focus index. will be clamped when outside range. + None: step < 0 the last possible index, step > 0 the first possible index + :param step: positive: take next, negative: take previous (for subclasses) + :return: focus is a valid position + """ + if focus is None: + if step < 0: + self.focus = len(self.widgets) - 1 + else: + self.focus = 0 + return True + if focus >= len(self.widgets): + self.focus = len(self.widgets) - 1 + return False + if focus < 0: + self.focus = 0 + return False + self.focus = focus + return True + + def get_focus_widget(self): + return self.widgets[clamp(0, self.focus, len(self.widgets) - 1)] + + def advance(self, step): + return self.set_focus(self.focus + step, step) + + def insert(self, y, widget): + self.widgets.insert(y, widget) + + def draw_widgets(self, wr, in_focus=False): + focus = self.focus if in_focus else None + for nr, widget in enumerate(self.widgets): + widget.draw(wr, nr == focus) + + +class Container(HasWidgets, Widget): + pass + + +class TitleBar(Widget): + default_height = 1 + + def __init__(self, left, right=''): + self.left = left + self.right = right + + def draw(self, wr, in_focus=False): + wr.startrow() + text = self.left.ljust(wr.width) + wr.bar(text) + if self.right: + wr.col = wr.width - len(self.right) - 3 + wr.bar(f' {self.right} ') + + +class StatusBar(Widget): + default_height = 1 + + def __init__(self, main): + main.statusbar = self + self.main = main + self.text = '' + self.warn = None + self.query = False + + def draw(self, wr, in_focus=False): + wid = wr.width - 1 + if self.warn: + wr.startrow() + wr.wr(self.warn[:wid].ljust(wid) + ' ', wr.warnstyle) + wr.startrow() + if self.query: + wr.wr(self.text.rjust(wid)[:wid] + ' ', wr.querystyle) + else: + wr.wr(self.text[:wid].rjust(wid) + ' ', wr.barstyle) + + + def set(self, text, warn=None, query=False): + self.text = text + self.warn = warn + self.query = query + + def height(self): + if self.warn: + return self.default_height + 1 + return self.default_height + + +class TextEdit(Widget): + minwidth = 16 + highlighted = False + + def __init__(self, value, finish_callback=None): + self.pos = None # cursor pos or None when not editing + if value is None: + raise ValueError('textedit') + self.value = value + self.col_offset = 0 + self.finish_callback = finish_callback + + def draw(self, wr, in_focus=False): + text = self.value[self.col_offset:] + if in_focus: + if self.highlighted: + wr.set_cursor_pos() + wr.high(text, self.minwidth) + else: + wr.edit(text, self.minwidth, self.pos) + else: + wr.norm(text) + + def get_key(self, main): + """hook for subclasses""" + return main.get_key() + + def enter(self, main): + pass + + def finish(self, value, main): + """called before saving the value""" + if self.finish_callback: + value = self.finish_callback(value, main) + return value + + def handle(self, main): + self.highlighted = True + self.pos = 0 + self._main = main + self.enter(main) + self.prev_value = self.value # for ESC + save = True + try: + while True: + key = self.get_key(main) + if key == K.LEFT: + if self.highlighted: + return key + self.pos = max(0, self.pos - 1) + continue + if key == K.TAB: + self.highlighted = not self.highlighted + self.pos = 0 + continue + if key == K.BEG_LINE: + self.pos = 0 + elif key == K.END_LINE: + self.pos = len(self.value) + elif key == K.RIGHT: + self.pos = len(self.value) if self.highlighted else min(len(self.value), self.pos + 1) + elif key == K.DEL: + if self.highlighted: + self.value = '' + self.pos = 0 + elif self.pos > 0: + self.value = self.value[:self.pos-1] + self.value[self.pos:] + self.pos -= 1 + elif isinstance(key, str): + if self.highlighted: + self.value = '' + self.value = self.value[:self.pos] + key + self.value[self.pos:] + self.pos += 1 + elif key is not None: + if key == K.ESC: + save = False + return K.DOWN + return key + self.highlighted = False + finally: + if save: + self.value = self.finish(self.value, main) + else: + self.value = self.prev_value + # if save: + # try: + # self.value = self.validator(self.value) + # self.error = None + # except Exception as e: + # self.error = str(e) + # # main.touch() + # else: + # self.value = self.prev_value + self.pos = None + + +class NameEdit(TextEdit): + def get_key(self, main): + key = super().get_key(main) + if key == K.TAB: + key = K.ENTER + return key + + +class Completion: + """mixin for datatype with completion""" + def propose(self, value): + """get proposals for completion + + :param value: a string to complete + :return: tuple(, , = self.completion_pos and + self == main.completion_widget): # widget has not changed + if selection: + main.popup_offset = self.completion_pos - self.pos + main.popupmenu = CompletionMenu(selection) + else: + main.popupmenu = None + self.completion_widget = None + + def get_key(self, main): + # if self.completion is None: + # return super().get_key(main) + if self.highlighted: + return super().get_key(main) + main.completion_widget = self + # mkthread(self.get_selection_menu, main, self.value) + self.get_selection_menu(main, self.value) + key = super().get_key(main) + self.menu = menu = main.popupmenu + if not menu or self.highlighted or key != K.DOWN or self.pos < self.completion_pos: + self.menu = main.popupmenu = None + return key + menu.pos = 1 + key = menu.get_key(main) + # we have left the popup menu with returned key + if key == K.UP: + return None + if key == K.LEFT: + self.pos = self.completion_pos + return key + value = self.value[:self.completion_pos] + if key in (K.TAB, K.RETURN, K.ENTER, K.RIGHT): + selected = menu.get_value() + value += selected + if key == K.TAB or (key in (K.ENTER, K.RETURN) + and self.completion(value)[0] < len(value)): + key = K.RIGHT + self.value = value + self.pos = len(value) + self.menu = main.popupmenu = None + return key + +class TextWidget(Widget): + # minwidth = 16 + + def __init__(self, text): + self.value = text + + def draw(self, wr, in_focus=False): + wr.norm(self.value) + + +class LineEdit(Widget): + error = None + + def __init__(self, labelwidget, valuewidget): + self.labelwidget = labelwidget + self.valuewidget = valuewidget + self.focus = 1 + + def handle(self, main): + name = self.labelwidget.value if isinstance(self.labelwidget, TextEdit) else None + if name == '': + self.focus = 0 # go to name first when empty + while True: + if self.focus: + key = self.valuewidget.handle(main) + if key == K.LEFT: + if name is not None: + self.focus = 0 + continue + return key + key = self.labelwidget.handle(main) + self.focus = 1 + if key in (K.ENTER, K.RETURN, K.TAB): + key = None + return key + + def draw(self, wr, in_focus=False): + wr.startrow() + self.labelwidget.draw(wr, in_focus and self.focus == 0) + wr.norm(': ') + wr.col = max(wr.col, wr.left + wr.leftwidth) + self.valuewidget.draw(wr, in_focus and self.focus == 1) + if self.error: + wr.norm(' ') + wr.error(self.error) + + +class MultiLineEdit(Container): + next_key = None + + def __init__(self, name, value=None, label=None, validator=str): + self.name = name + self.label = f'{self.name}: ' if label is None else label + self.focus = 0 + self.widgets = [LineOfMultiline(self, v) for v in value.split('\n')] + self.validator = validator + self.highlighted = False + self.cut_menuitems = [ + MenuItem('cut line(s)', K.CUT), + MenuItem('paste line(s)', K.PASTE)] + + def get_menu(self): + return self.cut_menuitems + self.parent.get_menu() + + def draw(self, wr, in_focus=False): + indent = None + for widget in self.widgets: + wr.startrow() + if indent is None: + label = self.label.ljust(wr.leftwidth) + wr.norm(label) + indent = len(label) + else: + wr.col += indent + if self.highlighted: + wr.high(widget.value, -1) + else: + widget.draw(wr, in_focus) + + @property + def value(self): + return '\n'.join(w.value for w in self.widgets) + + def handle(self, main): + self.highlighted = True + key = main.get_key() + self.highlighted = False + if key in (K.RIGHT, K.TAB): + self.set_focus(None, -1) + self.next_key = K.RIGHT + elif isinstance(key, str) or key == K.DEL: + self.set_focus(None, -1) + last = self.get_focus_widget() + if self.focus == 0: + # there is only one line: override + last.value = '' + elif last.value: # last line is not empty: add a new one + self.widgets.append(LineOfMultiline(self, '')) + self.advance(1) + self.next_key = key + elif key == K.LEFT: + self.set_focus(None, 0) + self.next_key = K.LEFT + else: + return key + while True: + widget = self.get_focus_widget() + key = widget.handle(main) + if key == K.UP: + if self.advance(-1): + continue + elif key in (K.DOWN, K.RETURN, K.ENTER): + if self.advance(1): + continue + elif key == K.GO_UP: + continue + return key + + +class LineOfMultiline(TextEdit): + minwidth = -1 + change_high = None + + def __init__(self, parent, value): + super().__init__(value) + self.init_parent(parent, MultiLineEdit) + self.cursor_col = None + + def handle(self, main): + self.change_high = self.parent.highlighted + self.cursor_col = None + return super().handle(main) + + def get_key(self, main): + self.highlighted = False + if self.cursor_col is not None: + main.cursor_col = self.cursor_col + self.cursor_col = None + key = self.parent.next_key + if key is None: + key = main.get_key() + else: + self.parent.next_key = None + multiline = self.parent + if key == K.RETURN: + self.value, nextline = self.value[:self.pos], self.value[self.pos:] + multiline.widgets.insert(multiline.focus + 1, LineOfMultiline(multiline, nextline)) + main.cursor_col = 0 + return K.DOWN + if key == K.DEL: + if self.pos == 0: + if multiline.focus > 0: + thisline = self.value + del multiline.widgets[multiline.focus] + prev = multiline.widgets[multiline.focus - 1] + pos = len(prev.value) + prev.value += thisline + main.cursor_col = pos + return K.UP + elif key == K.LEFT: + if self.pos == 0 and multiline.focus > 0: + prev = multiline.widgets[multiline.focus - 1] + main.cursor_col = len(prev.value) + # self.log.info('LEFT %r prev=%r', main.cursor_col, prev.value) + return K.UP + elif key == K.RIGHT: + if self.pos == len(self.value) and multiline.focus < len(multiline.widgets)-1: + # self.log.info('RIGHT %r focus=%r', main.cursor_col, multiline.focus) + main.cursor_col = 0 + return K.DOWN + elif key in (K.UP, K.DOWN): + return key + elif key == K.CUT: + if not main.cut_lines: + main.cut_extend = False + if not main.cut_extend: + main.cut_lines = [] + main.cut_lines.append(self.value) + i = multiline.focus + multiline.widgets[i:i+1] = [] + main.cut_extend = True + main.status(f'{len(main.cut_lines)} lines buffered') + return K.GO_UP + elif key == K.PASTE: + multiline.widgets[multiline.focus: multiline.focus] = [ + LineOfMultiline(multiline, v) for v in main.cut_lines] + return K.GO_UP + # self.log.info('SET COL %r', self.pos) + self.cursor_col = self.pos + return key + + def enter(self, main): + self.pos = min(main.cursor_col, len(self.value)) + + +class PopUpMenu: + offset = 0 + pos = 0 + height = 0 + + def __repr__(self): + return f'PopUp(pos={self.pos}/{len(self.selection)}, selected={self.selection[self.pos]!r})' + + def advance(self, step): + pos = self.pos + step + if 0 <= pos < self.height: + self.pos = pos + return True + return False + + def draw(self, wr): + raise NotImplementedError + + def get_key(self, main): + while True: + key = main.get_key() + if key == K.DOWN: + self.advance(1) + elif key == K.UP: + self.advance(-1) + else: + return key + + +class CompletionMenu(PopUpMenu): + def __init__(self, selection): + self.selection = selection + self.height = len(selection) + self.width = max(len(v) for v in self.selection) + + def get_value(self): + return self.selection[self.pos] + + def draw(self, wr): + row = wr.nextrow + wr.left = max(1, wr.left) + col = wr.left - 1 + wr.nextrow -= self.pos + height = self.height + 1 + if self.pos == 0: + # rectangle is open on top + wr.rectangle(row - self.pos - 1, col, height, self.width + 1, row + 1) + else: + wr.rectangle(row - self.pos - 1, col, height, self.width + 1) + for i, value in enumerate(self.selection): + wr.startrow() + if i == self.pos: + if self.pos: + wr.high(value.ljust(self.width)) + else: + wr.menu(value.ljust(self.width)) + + +class ConfirmDialog(Widget): + def __init__(self, query, positive_answers=('Y', 'y')): + self.query = query + self.answers = positive_answers + self.width = len(query) + + def draw(self, wr): + col = wr.col - 1 + wr.high(self.query) + wr.rectangle(wr.row - 1, col, 2, self.width + 1) + + def handle(self, main): + try: + main.refresh() + key = Main.get_key(main) + if key in self.answers: + return key + self.key = key + return False + finally: + main.popupmenu = None + + +class MenuItem: + def __init__(self, name, shortcut=None, action=None, returnkey=K.GOTO_MAIN): + """a menu item for a context menu + + :param name: the displayed command name + :param shortcut: a letter. if '^' is preceeded, it means a control-char + :param action: the action to be performed. when None, the control key (or letter) is returned + :param returnkey: key to return after action. ignored if action is None + """ + self.name = name + self.action = action + self.shortcut = shortcut or '' + if shortcut: + if isinstance(shortcut, Key): + self.shortcut = f'^{chr(shortcut+96)}' + key = shortcut + letter = chr(96 + shortcut) + self.keys = (shortcut, letter) + else: + self.keys = tuple(shortcut) + key = shortcut[0] + else: + self.keys = () + key = None + if action: + self.returnkey = returnkey + else: + self.returnkey = key + self.width = len(name) + + def do_action(self): + if self.action: + self.action() + return self.returnkey + + def __repr__(self): + return f'MenuItem({self.name})' + + +class ContextMenu(PopUpMenu): + def __init__(self, menuitems): + """creates a context menu + + :param menuitems: list of (, , ) + """ + self.menuitems = [MenuItem('')] + menuitems + self.height = len(self.menuitems) + self.width = max(v.width for v in menuitems) + # self.selection = [v.name for v in menuitems] + self.hotkeys = {k: m for m in menuitems for k in m.keys} + + def do_hotkey(self, key): + menuitem = self.hotkeys.get(key) + if not menuitem: + return K.UNHANDLED + return menuitem.do_action() + + def do_action(self): + menuitem = self.menuitems[self.pos] + return menuitem.do_action() + + def draw(self, wr): + height = self.height + 1 + row = wr.nextrow + toprow = row - wr.offset + fixrow = max(0, min(toprow, wr.height - height)) + wr.left = max(1, wr.left) + wr.nextrow = row = row + fixrow - toprow + col = wr.left - 1 + for i, menuitem in enumerate(self.menuitems): + wr.startrow() + wr.menu(menuitem.shortcut.ljust(2)) + wr.norm(' ') + if i == self.pos: + wr.high(menuitem.name, self.width) + else: + wr.menu(menuitem.name, self.width) + wr.rectangle(row, col, height - 1, self.width + 4) + + +class BaseWriter: + """base for writer. does nothing else than keeping track of position""" + highstyle = editstyle = errorstyle = barstyle = menustyle = querystyle = warnstyle = None + errorflag = '! ' + + def __init__(self): + self.width = None + self.nextrow = 0 + self.left = 0 + + def move(self, row, col): + self.row = self.nextrow = row + self.col = self.left = col + + def startrow(self): + self.row = self.nextrow + self.col = self.left + self.nextrow = self.row + 1 + + def norm(self, text, width=0): + self.wr(text, extend_to=width) + + def bar(self, text, width=0): + self.wr(text, self.barstyle, extend_to=width) + + def edit(self, text, width, pos): + self.bright(text, width) + + def bright(self, text, width=0): + self.wr(text.ljust(width), self.editstyle, extend_to=width) + + def high(self, text, width=0): + self.wr(text.ljust(width), self.highstyle, extend_to=width) + + def menu(self, text, width=0): + self.wr(text, self.menustyle, extend_to=width) + + def error(self, text, width=0): + self.wr(f'{self.errorflag}{text}', self.errorstyle, extend_to=width) + + def write_raw(self, row, text, *attr): + self.col += len(text) + + def wr(self, text, *attr, extend_to=0): + """write text on screen + + :param text: the text + :param attr: attributes + :param extend_to: extend_to >= 0: fill up to given value + extend_to < 0: extend to right margin + extend_to + """ + if self.width: + limit = self.width - self.col + if limit <= 0: + return + if extend_to < 0: + limit += extend_to + text = text.ljust(limit) + else: # elif extend_to >= limit + text = text.ljust(extend_to)[:limit] + else: + text = text.ljust(extend_to) + self.write_raw(self.row, text, *attr) + + +class Writer(BaseWriter): + highstyle = curses.A_REVERSE + menustyle = curses.A_BOLD + barstyle = curses.A_REVERSE + editstyle = 0 + errorstyle = 0 + errorflag = '! ' + newoffset = None + querystyle = curses.A_BOLD + warnstyle = curses.A_REVERSE + pairs = {} + colors = {} + nextcolor = 16 + lock = threading.RLock() + leftwidth = 8 # minimum left column width + + def __init__(self, stdscr, main): + super().__init__() + self.main = main + self.scr = stdscr + self.scr.clear() + self.height, self.width = stdscr.getmaxyx() + self.popup = None + self.top = 0 + self.bot = self.height + self.adjust_offset_to = None + self.preferred_window_row = None + self.offset = 0 + + def set_leftwidth(self, leftwidth): + if leftwidth < 1: + self.leftwidth = int(self.width * leftwidth + 1) + else: + self.leftwidth = leftwidth + + def write_raw(self, row, text, *attr): + row = self.row - self.offset # screen row + if self.top <= row < self.bot and self.col < self.width: + col = self.col + try: + self.scr.addstr(row, self.col, text, *attr) + self.col += len(text) + except curses.error: + self.col += len(text) + if self.col < self.width or row < self.height - 1: + raise + @classmethod + def make_color(cls, rgb): + if isinstance(rgb, int): + # a color number was given + return rgb + idx = cls.colors.get(rgb) + if idx is None: + idx = cls.nextcolor + cls.nextcolor += 1 + cls.colors[idx] = rgb + curses.init_color(idx, *rgb) + return idx + + @classmethod + def make_pair(cls, fg, bg): + fg = cls.make_color(fg) + bg = cls.make_color(bg) + pair = cls.pairs.get((fg, bg)) + if pair is not None: + return pair + idx = len(cls.pairs) + 1 + curses.init_pair(idx, fg, bg) + cls.pairs[(fg, bg)] = pair = curses.color_pair(idx) + return pair + + @classmethod + def init_colors(cls, stdscr): + curses.start_color() + for nr in range(cls.nextcolor): + rgb = curses.color_content(nr) + cls.colors.setdefault(rgb, nr) + black = cls.make_color((0, 0, 0)) + dim_white = (680, 680, 680) + stdscr.bkgd(' ', cls.make_pair(black, dim_white)) + bright_white = 1000, 1000, 1000 + cls.editstyle = cls.make_pair(black, bright_white) + red = cls.make_color((680, 0, 0)) + cls.errorstyle = cls.make_pair(red, dim_white) + cls.errorflag = '' + very_light_blue = (800, 900, 1000) + cls.highstyle = cls.make_pair(black, very_light_blue) + light_white = (900, 900, 900) + cls.menustyle = cls.make_pair(black, light_white) + light_green = 0, 1000, 0 + cls.querystyle = cls.make_pair(black, light_green) + yellow = 1000, 1000, 0 + cls.warnstyle = cls.make_pair(black, yellow) + + def edit(self, text, width, pos): + """write text and set cursor at given pos + + also scroll horizontally if cursor would be outside screen + """ + if pos is not None: + maxwidth = self.width - self.col + scrollrange = len(text) - maxwidth + 1 + if scrollrange <= 0 or pos < scrollrange: + offset = 0 + else: + offset = min(pos - scrollrange, scrollrange) + text = text[offset:] + if len(text) - offset < maxwidth: + text += ' ' + self.set_cursor_pos(pos - offset, True) + self.bright(text, width) + + def set_cursor_pos(self, pos=0, visible=False): + self.cursor_visible = visible + self.main.cursor_pos = self.row - self.offset, self.col + pos + + def vline(self, row, col, length, top, bottom, left, right): + """draw a vertical line + + :param row, col: upper start point + :param length: unclipped length + :param top, bottom, left, right: clipping + :return: , (None is returned, when clipped on this corner) + """ + beg = None + end = None + if left <= col < right and row < bottom: + end = row + length + if end >= bottom: + length -= end - bottom + end = None + beg = row + if beg < top: + row = top + length -= top - beg + beg = None + if length > 0: + self.scr.vline(row, col, curses.ACS_VLINE, length) + return beg, end + + def hline(self, row, col, length, top, bottom, left, right): + """draw a horizontal line + + :param row, col: left start point + :param length: unclipped length + :param top, bottom, left, right: clipping + :return: , (None is returned, when clipped on this corner) + """ + beg = None + end = None + if top <= row < bottom and col < right: + end = col + length + if end >= right: + length -= end - right + end = None + beg = col + if beg < left: + col = left + length -= left - beg + beg = None + if length > 0: + self.scr.hline(row, col, curses.ACS_HLINE, length) + else: + raise ValueError(length) + return None, None + if end is None and beg is not None: + raise ValueError('end None') + return beg, end + + def rectangle(self, row, col, height, width, top=None, bottom=None, left=None, right=None): + """clipped rectangle""" + row = row - self.offset + clip = ( + max(0, (top or 0) - self.offset), + self.height if bottom is None else min(self.height, bottom - self.offset), + max(0, (left or 0)), + self.width if right is None else min(self.width, right) + ) + self.vline(row, col, height, *clip) + self.vline(row, col + width, height, *clip) + left, right = self.hline(row, col, width, *clip) + if left is not None: + self.scr.addch(row, left, curses.ACS_ULCORNER) + if right is not None: + self.scr.addch(row, right, curses.ACS_URCORNER) + row += height + left, right = self.hline(row, col, width, *clip) + if left is not None: + self.scr.addch(row, left, curses.ACS_LLCORNER) + if right is not None: + self.scr.addch(row, right, curses.ACS_LRCORNER) + + +class Empty(Widget): + default_height = 0 + + def handle(self, main): + return None + + +HELP_TEXT = """ +ctrl-X Exit +ctrl-C context menu +""" + +class Main(HasWidgets): + parent = None + popupmenu = None + statusbar = None + menubar = None + help_text = HELP_TEXT + log = Widget.log + leftwidth = 0.2 # if < 1: a fraction + + def __init__(self, widgets, /, headers=(), footers=()): + self.focus = 0 + self.headers = headers + self.footers = footers + self.current_widget = self + self.status(None) # default text + self.widgets = widgets + self.screen_row = 0 + self.offset = None + self.scr = None # inititalized in run() + self.help_mode = False + self.help_offset = 0 + self.cursor_col = 0 # position of cursor in MultiLineEdit + self.cursor_pos = (0, 0) + self.popup_offset = 0 + self.quit = False + self.cut_lines = [] + self.cut_extend = False + self.context_menu = [ + MenuItem('help', K.HELP, self.handle_help, None), + MenuItem('exit', K.QUIT, self.do_quit), + ] + + def do_quit(self): + self.quit = True + + def get_topmargin(self): + return sum(v.height() for v in self.headers) + + def get_menu(self): + return self.context_menu + + def get_key(self, timeout=None): + while True: + self.refresh() + if timeout is None: + key = self.scr.getch() + else: + self.scr.refresh() + if select([sys.stdin], [], [], timeout)[0]: + key = self.scr.getch() + else: + continue + key = K.bynumber.get(key, key) + self.status(None) + if isinstance(key, str): + self.log.info('key letter %r', key) + self.cut_extend = False + return key + if 0 <= key < 32: # control keys + self.log.info('key ctrl %r', key) + if key != K.CUT: + self.cut_extend = False + self.log.info('extend False') + menu = ContextMenu(self.current_widget.get_menu()) + if key == K.MENU: + try: + if self.popupmenu: + return key + self.popupmenu = menu + key = menu.get_key(self) + if key == K.MENU: + continue + if key in (K.TAB, K.RETURN, K.ENTER): + return menu.do_action() + key = menu.do_hotkey(key) + if key != K.UNHANDLED: + self.log.info('got key %r', key) + return key + self.status(f'can not handle key {key}') + raise ValueError(menu.menuitems) + finally: + self.popupmenu = None + result = menu.do_hotkey(key) + if result != K.UNHANDLED: + if result is None: + continue + else: + if key != K.GOTO_MAIN: + if self.cut_extend: + self.log.info('extend False %r', key) + self.cut_extend = False + self.log.info('key special %r', key) + return key + + def handle_help(self): + self.help_mode = True + self.popupmenu = None + key = self.get_key() + self.help_mode = False + return key if key == K.QUIT else None + + def persistent_status(self): + return 'ctrl-C: context menu/get help ctrl-X: exit' + + def current_row(self): + if self.help_mode: + # TODO: scroll help + return 0 + return super().current_row() + + def refresh(self): + with self.writercls.lock: + wr = self.writercls(self.scr, self) + wr.set_leftwidth(self.leftwidth) + topmargin = self.get_topmargin() + botmargin = sum(v.height() for v in self.footers) + + bot = wr.height - botmargin + end_scroll = bot - topmargin - 1 + + current_row = self.current_row() + + offsetlimit = max(0, self.height() - end_scroll) + bot_limit = max(self.screen_row, int(end_scroll * 0.7)) + top_limit = min(self.screen_row, int(end_scroll * 0.2) + 1, bot_limit) + if self.offset is None: + self.offset = current_row - self.screen_row + offset = clamp(self.offset, current_row - top_limit, current_row - bot_limit) + self.offset = clamp(offset, 0, offsetlimit) + self.screen_row = current_row - self.offset + + wr.cursor_visible = False + + wr.move(bot, 0) + for widget in self.footers: + widget.draw(wr) + wr.move(0, 0) + + for widget in self.headers: + widget.draw(wr) + wr.offset = self.offset + wr.top, wr.bot = topmargin, bot + if self.help_mode: + for line in self.help_text.split('\n'): + wr.startrow() + wr.norm(line) + else: + self.draw_widgets(wr, True) + wr.top, wr.bot = 0, wr.height + if self.popupmenu: + col = self.popup_offset + col += self.cursor_pos[1] + wr.move(current_row, col) + # wr.move(*self.popupmenu.row_col) + self.popupmenu.draw(wr) + if wr.cursor_visible: + self.scr.move(*self.cursor_pos) + curses.curs_set(1) + else: + curses.curs_set(0) + + def status(self, text, warn=None): + if self.statusbar: + st = self.persistent_status() + if text is None: # reset + self.statusbar.set(st) + else: + self.statusbar.set(f'{text} {st}', warn) + + def finish(self, exc): + pass + + def run(self, writercls): + try: + self.scr = curses.initscr() + self.writercls = writercls + try: + writercls.init_colors(self.scr) + except Exception: + raise RuntimeError('it seems you have a terminal without colors. for a test, remove this line') + pass + curses.noecho() + curses.raw() # disable ctrl-C interrupt and ctrl-Q/S flow control + curses.nonl() # accept ctrl-J + self.scr.keypad(1) + while not self.quit: + key = self.handle(self) + if key not in (K.GOTO_MAIN, None, K.UP, K.DOWN): + self.status(f'unknown key {key}') + self.finish(None) + except Exception as e: + self.finish(e) + raise + finally: + if self.scr: + self.scr.keypad(0) + curses.echo() + curses.nocbreak() + curses.endwin() + for logline in self.log.loglines: + print(logline) diff --git a/frappy_demo/lakeshore.py b/frappy_demo/lakeshore.py index 77d8f050..b97bd9c6 100644 --- a/frappy_demo/lakeshore.py +++ b/frappy_demo/lakeshore.py @@ -71,7 +71,7 @@ class TemperatureLoop(TemperatureSensor, Drivable): # lakeshore loop number to be used for this module loop = Property('lakeshore loop', IntRange(1, 2), default=1) target = Parameter(datatype=FloatRange(unit='K', min=0, max=1500)) - heater_range = Property('heater power range', IntRange(0, 3), readonly=False) + heater_range = Parameter('heater power range', IntRange(0, 3), readonly=False) tolerance = Parameter('convergence criterion', FloatRange(0), default=0.1, readonly=False) _driving = False @@ -103,7 +103,7 @@ class TemperatureLoop(TemperatureSensor, Drivable): class TemperatureLoop340(TemperatureLoop): # slightly different behaviour for model 340 - heater_range = Property('heater power range', IntRange(0, 5)) + heater_range = Parameter('heater power range', IntRange(0, 5)) def write_heater_range(self, value): self.communicate(f'RANGE {value};RANGE?') diff --git a/frappy_psi/lakeshore.py b/frappy_psi/lakeshore.py index 87fad4a2..5db8fe44 100644 --- a/frappy_psi/lakeshore.py +++ b/frappy_psi/lakeshore.py @@ -22,8 +22,6 @@ import time import math import random import threading -import numpy as np -from numpy.testing import assert_approx_equal from frappy.core import Module, Readable, Parameter, Property, \ HasIO, StringIO, Writable, IDLE, ERROR, BUSY, DISABLED, nopoll, Attached @@ -32,11 +30,14 @@ from frappy.datatypes import IntRange, FloatRange, StringType, \ from frappy.errors import CommunicationFailedError, ConfigError, \ HardwareError, DisabledError, ImpossibleError, secop_error, SECoPError from frappy.lib.units import NumberWithUnit, format_with_unit -from frappy.lib import formatStatusBits +from frappy.lib import formatStatusBits, LazyImport from frappy_psi.convergence import HasConvergence from frappy.mixins import HasOutputModule, HasControlledBy from frappy.extparams import StructParam -from frappy_psi.calcurve import CalCurve + +np = LazyImport('numpy') +np_testing = LazyImport('numpy.testing') +calcurve_module = LazyImport('frappy_psi.calcurve') def string_to_num(string): @@ -419,7 +420,7 @@ class Device(HasLscIO, Module): """check whether a returned calibration point is equal within curve point precision""" for v1, v2, eps in zip(left, right, fixeps): try: - assert_approx_equal(v1, v2, significant, verbose=False) + np_testing.assert_approx_equal(v1, v2, significant, verbose=False) except AssertionError: return abs(v1 - v2) < eps return True @@ -464,7 +465,7 @@ class CurveRequest: self.action = device.find_curve self.new_sensors = set() self.sensors = {sensor.channel: sensor} - calcurve = CalCurve(sensor.calcurve) + calcurve = calcurve_module.CalCurve(sensor.calcurve) equipment_id = device.propertyValues.get('original_id') or device.secNode.equipment_id name = f"{equipment_id.split('.')[0]}.{sensor.name}" sn = calcurve.calibname diff --git a/frappy_psi/picontrol.py b/frappy_psi/picontrol.py index f7877068..ab55d397 100644 --- a/frappy_psi/picontrol.py +++ b/frappy_psi/picontrol.py @@ -101,9 +101,8 @@ class PImixin(HasOutputModule, Writable): _lastdiff = None _lasttime = 0 _get_range = None # a function get output range from output_module - _overflow = 0 + _overflow = None # history of overflow (is not zero when integration overflows output range) _itime_set = None # True: 'itime' was set, False: 'i' was set - _history = None __errcnt = 0 __inside_poll = False __cache = None @@ -114,6 +113,7 @@ class PImixin(HasOutputModule, Writable): def initModule(self): self.__cache = {} + self._overflow = np.zeros(10) super().initModule() if self.output_range != (0, 0): # legacy ! self.output_min, self.output_max = self.output_range @@ -131,13 +131,6 @@ class PImixin(HasOutputModule, Writable): self.__cache = {} now = time.time() value = self.read_value() - if self._history is None: - # initialize a fixed size array, with fake time axis to avoid errors in np.polyfit - self._history = np.array([(now+i, self.value) for i in range(-9,1)]) - else: - # shift fixed size array, and change last point - self._history[:-1] = self._history[1:] - self._history[-1] = (now, value) if not self.control_active: self._lastdiff = 0 return @@ -150,30 +143,34 @@ class PImixin(HasOutputModule, Writable): self._lastdiff = diff deltadiff = diff - self._lastdiff self._lastdiff = diff - if diff: - ref = self.itime / diff - (slope, _), cov = np.polyfit(self._history[:, 0] - now, self._history[:, 1], 1, cov=True) - slope_stddev = np.sqrt(max(0, cov[0, 0])) - if slope * ref > 1 + 2 * slope_stddev * abs(ref): - # extrapolated value will cross target in less than itime - if self._overflow: - self._overflow = 0 - self.log.info('clear overflow') output, omin, omax = self.cvt2int(out.target) - output += self._overflow + ( + output += self._overflow[-1] + ( self.p * deltadiff + self.i * deltat * diff / self.time_scale) / self.input_scale if omin <= output <= omax: - self._overflow = 0 + overflow = 0 else: # save overflow for next step if output < omin: - self._overflow = output - omin + overflow = output - omin output = omin else: - self._overflow = output - omax + overflow = output - omax output = omax + if overflow: + # fit a straight line + (slope, beg), cov = np.polyfit(range(self._overflow), self._overflow, 1, cov=True) + sign = np.copysign(1, overflow) + end = beg + slope * len(self._overflow) + # reduce the absolute value of overflow by the minimum distance of the fitted + # line to zero, with a margin of 3 * stddev + shift = max(0, min(overflow * sign, min(beg * sign, end * sign) - 3 * np.sqrt(cov[1, 1]))) * sign + if shift: + overflow -= shift + self._overflow -= shift + self._overflow[:-1] = self._overflow[1:] + self._overflow[-1] = overflow out.update_target(self.name, self.cvt2ext(output)) self.__errcnt = 0 except Exception as e: @@ -187,10 +184,10 @@ class PImixin(HasOutputModule, Writable): finally: self.__inside_poll = False self.__cache = {} - self.overflow = self._overflow + self.overflow = self._overflow[-1] def write_overflow(self, value): - self._overflow = value + self._overflow.fill(value) def internal_poll(self): super().doPoll() diff --git a/paramedit.py b/paramedit.py new file mode 100644 index 00000000..e35a23b5 --- /dev/null +++ b/paramedit.py @@ -0,0 +1,814 @@ +import sys +import os +import time +from subprocess import Popen, PIPE +from pathlib import Path +from psutil import pid_exists +from frappy.core import Module +from frappy.errors import ConfigError +from frappy.lib import generalConfig +from frappy.lib.comparestring import compare +from frappy.lib import mkthread, formatExtendedTraceback +from frappy.config import process_file, to_config_path +from frappy.tools.configdata import Value, cfgdata_to_py, cfgdata_from_py, get_datatype, site, ModuleClass, stringtype +from frappy.tools.completion import class_completion, recommended_prs +from frappy.tools.terminalgui import Main, Container, LineEdit, MultiLineEdit, MenuItem, Writer, ContextMenu, \ + TextWidget, TextEdit, NameEdit, TextEditCompl, Completion, TitleBar, StatusBar, Widget, ConfirmDialog, \ + K + +K.add(TOGGLE_DETAILED='^t', NEXT_VERSION='^n', PREV_VERSION='^b') + +VERSION_SEPARATOR = "\n''''" +TIMESTAMP_FMT = '%Y-%m-%d-%H%M%S' +# TODO: +# - ctrl-K / ctrlV/U for cutting/pasting module(s) or parameter(2) +# * separate buffer for modules and parameters? +# * when inserting a parameter which already exists, overwrite the value only +# - implement IO modules +# - version control: create one file, with a separator (save disk space, no numbering housekeeping needed) +# - use Exceptions for ctrl-X and ctrl-C +# - use also shift-Tab for level up? +# - directory to save to + +def unix_cmd(cmd, *args): + out = Popen(cmd.split() + list(args), stdout=PIPE).communicate()[0] + return list(out.decode().split('\n')) + + +class StringValue: # TODO: unused? + error = None + + def __init__(self, value, from_string=False, datatype=None): + self.strvalue = value + + def set_value(self, value): + self.strvalue = value + + def set_from_string(self, strvalue): + self.strvalue = strvalue + + def get_repr(self): + return repr(self.strvalue) + + +class TopWidget: + parent_cls = Main + + +class Child(Widget): + """child widget of NodeWidget ot ModuleWidget""" + parent = TopWidget + + def get_name(self): + return None + + def collect(self, cfgdict): + pass + + def check_data(self): + pass + + def is_valid(self): + return True + + +class HasValue(Child): + clsobj = None + + def init_value_widget(self, parent, valobj): + self.init_parent(parent) + self.valobj = valobj + + def validate(self, strvalue, main=None): + pname = self.get_name() + try: + if pname != 'cls': + if self.clsobj != self.parent.clsobj: + self.clsobj = self.parent.clsobj + self.log.info('validate %r %r %r %r', self.parent.get_name(), self.get_name(), self.clsobj, self.valobj) + self.valobj.datatype, self.valobj.error = get_datatype( + self.get_name(), self.clsobj, self.valobj.value) + self.log.info('validate %r %r %r %r', self.parent.get_name(), self.get_name(), self.clsobj, self.valobj) + self.valobj.validate_from_string(strvalue) + self.error = None + except Exception as e: + self.error = str(e) + if self.get_name() == 'tolerance': + self.log.info('checked %r %r', self.valobj, self.valobj.error) + if main: + main.touch() + return strvalue + + def check_data(self): + self.validate(self.valobj.strvalue) + + def is_valid(self): + return self.get_name() and self.valobj.strvalue + + +class ValueWidget(HasValue, LineEdit): + fixedname = None + + def __init__(self, parent, name, valobj, label=None): + """init a value widget + + :param parent: the parent widget + :param name: the initial name + :param valobj: the object containing value and datatype + :param label: None: the name is changeable, else: a label (which might be different from name) + """ + self.init_value_widget(parent, valobj) + if label is not None: + labelwidget = TextWidget(label) + self.fixedname = name + else: + labelwidget = NameEdit(name, self.validate_name) + # self.log.info('value widget %r %r', name, self.fixedname) + if valobj.completion: + valueedit = TextEditCompl(valobj.strvalue, self.validate, valobj.completion) + else: + valueedit = TextEdit(valobj.strvalue, self.validate) + super().__init__(labelwidget, valueedit) + + def validate_name(self, name, main): + widget_dict = self.parent.widget_dict + if name.isidentifier(): + other = widget_dict.get(name) + if other and other != self: + self.error = f'duplicate name {name!r}' + return self.get_name() + self.clsobj = None + self.error = None + widget = widget_dict.pop(self.get_name(), None) + if widget: + widget_dict[name] = widget + else: + self.error = f'illegal name {name!r}' + return self.get_name() + return name + + def get_name(self): + if self.fixedname: + return self.fixedname + return self.labelwidget.value + + def collect(self, as_dict): + """collect data""" + name = self.get_name() + if name: + as_dict[name] = self.valobj + + +class DocWidget(HasValue, MultiLineEdit): + parent_cls = TopWidget + + def __init__(self, parent, name, valobj): + self.init_value_widget(parent, valobj) + self.valobj = valobj + self.name = name + super().__init__(name, valobj.strvalue, 'doc: ') + + def get_name(self): + return self.name + + def collect(self, config): + self.valobj.set_value(self.value) + config[self.name] = self.valobj + + +class BaseWidget(TopWidget, Container): + """base for Module or Node""" + clsobj = None + header = 'Module' + special_names = 'name', 'cls', 'description' + + def init(self, parent): + self.widgets = [] + self.init_parent(parent, EditorMain) + self.focus = 0 + self.widget_dict = {} + self.fixed_names = self.get_fixed_names() + + def get_menu(self): + main = self.parent + if main.version_view: + return main.get_menu() + return self.context_menu + main.get_menu() + + def get_fixed_names(self): + result = {k: k for k in self.special_names} + result['name'] = self.header + return result + + def add_widget(self, name, valobj, pos=None): + label = self.fixed_names.get(name) + widget = ValueWidget(self, name, valobj, label) + self.widget_dict[name] = widget + # self.log.info('add widget %r: label=%r name=%r', name, label, widget.get_name()) + if pos is None: + self.widgets.append(widget) + else: + if pos < 0: + pos += len(self.widgets) + self.widgets.insert(pos, widget) + return widget + + def new_widget(self): + raise NotImplementedError + + def add_module(self, after_current=False): + modcfg = {'name': Value(''), 'cls': Value(f'{site.frappy_subdir}.'), 'description': Value('')} + main = self.parent + module = ModuleWidget(main, '', modcfg) + main.insert(main.focus + after_current, module) + main.set_focus(main.focus + 1) + if not after_current: + self.set_focus(1) + main.advance(-1) + module.set_focus(1) # go to cls widget + + def get_widget_value(self, key): + try: + return self.widget_dict[key].valobj.strvalue + except KeyError: + return '' + + def get_name(self): + return self.get_widget_value('name') + + def draw_summary(self, wr, in_focus): + raise NotImplementedError + + def draw(self, wr, in_focus=False): + main = self.parent + wr.set_leftwidth(main.leftwidth) + + if main.detailed: + self.draw_widgets(wr, in_focus) + else: + self.draw_summary(wr, in_focus) + + def current_row(self): + main = self.parent + return super().current_row() if main.detailed else 0 + + def height(self): + main = self.parent + return super().height() if main.detailed else 1 + + def collect(self, result): + name = self.get_name() + if name: + result[name] = modcfg = {} + for w in self.widgets: + w.collect(modcfg) + + +class ModuleName(Value): + def __init__(self, main, name): + self.main = main + super().__init__(name) + + def validate_from_string(self, value): + if not value: + self.strvalue = self.value = '' + raise ValueError('empty name') + if value != self.value and value in self.main.widget_dict: + self.strvalue = self.value = '' + raise ValueError(f'duplicate name {value!r}') + self.value = self.strvalue = value + + +class ModuleWidget(BaseWidget): + def __init__(self, parent, name, modulecfg): + assert name == modulecfg['name'].value + modulecfg['name'] = ModuleName(parent, name) + self.init(parent) + self.context_menu = [ + MenuItem('add parameter/property', 'p', self.new_widget), + MenuItem('add module', 'm', self.add_module), + MenuItem('purge empty prs', 'e', self.purge_prs), + MenuItem('add recommended prs', '+', self.complete_prs), + MenuItem('cut module', K.CUT, parent.cut_module), + ] + + clsvalue = modulecfg['cls'] + clsvalue.callback = self.update_cls + clsvalue.completion = class_completion + clsobj = clsvalue.value + if clsobj: + self.fixed_names.update({k: k for k, v in recommended_prs(clsobj).items() if v}) + for name, valobj in modulecfg.items(): + self.add_widget(name, valobj) + self.widgets.append(EndModule(self)) + + def new_widget(self, name='', pos=None): + self.add_widget(name, Value('', *get_datatype('', self.clsobj, ''), from_string=True), self.focus) + + def update_widget_dict(self): + self.widget_dict = {w.get_name(): w for w in self.widgets} + + def get_name_info(self): + return self.clsobj, self.widget_dict + + def handle(self, main): + while True: + key = super().handle(main) if main.detailed else main.get_key() + if key in (K.RIGHT, K.TAB): + main.detailed = True + main.status('') + main.offset = None # recalculate offset from screen pos + else: + return key + + def check_data(self): + """check clsobj is valid and check all params and props""" + # clswidget, = self.find_widgets('cls') + # clsobj = clswidget.valobj.value + for widget in self.widgets: + widget.check_data() + + def update_cls(self, cls): + if cls != self.clsobj: + self.complete_prs(True) + self.clsobj = cls + self.check_data() + return cls + + def complete_prs(self, only_mandatory=False): + if self.clsobj: + fixed_names = self.get_fixed_names() + names = set(w.get_name() for w in self.widgets) + for name, mandatory in recommended_prs(self.clsobj).items(): + if mandatory: + fixed_names[name] = name + if name not in names and mandatory >= only_mandatory: + valobj = Value('', *get_datatype(name, self.clsobj, '')) + if name == 'cls': + self.log.info('add needed %r', valobj) + widget = self.add_widget(name, valobj, -1) + if mandatory: + widget.error = 'please set this mandatory property' + self.fixed_names = fixed_names + self.update_widget_dict() + + def purge_prs(self): + self.widgets = [w for w in self.widgets if w.get_name() not in self.fixed_names and w.is_valid()] + self.update_widget_dict() + + def draw_summary(self, wr, in_focus): + wr.startrow() + wr.norm('Module ') + name = self.get_widget_value('name') + if in_focus: + wr.set_cursor_pos() + wr.bright(name, round(wr.width * 0.2)) + else: + wr.norm(name.ljust(round(wr.width * 0.2))) + half = (wr.width - wr.col) // 2 + wr.norm(f"{self.get_widget_value('description').ljust(half)} {self.get_widget_value('cls')} ") + + def collect(self, result): + super().collect(result) + name = self.get_name() + if name: + assert result[name].pop('name').value == name + + +class EndNode(Child): + parent_cls = TopWidget + helptext = 'RET: add module p: add parameter or property' + + def __init__(self, parent): + self.init_parent(parent) + super().__init__() + + def draw(self, wr, in_focus=False): + wr.startrow() + if in_focus: + wr.set_cursor_pos(wr.leftwidth) + wr.col = wr.leftwidth + wr.bright(' ') + wr.norm(' ' + self.helptext) + + def collect(self, result): + pass + + def check_data(self): + pass + + def get_name(self): + return None + + def handle(self, main): + self.showhelp = False + while True: + key = main.get_key() + if key in (K.RETURN, K.ENTER): + self.parent.add_module(True) + elif key in (K.UP, K.DOWN, K.QUIT): + return key + elif key == 'p': + self.parent.new_widget() + return K.GOTO_MAIN + return None + + +class EndModule(EndNode): + parent_cls = ModuleWidget + + +class IOWidget(ModuleWidget): + def __init__(self, parent, name, modulecfg): + assert name == modulecfg['name'].value + modulecfg['name'] = ModuleName(parent, name) + self.init(parent) + self.context_menu = [ + MenuItem('add module', 'm', self.add_module), + MenuItem('cut module', K.CUT, parent.cut_module), + ] + urivalue = modulecfg.get('uri') + if urivalue is None: + urivalue = Value('uri', stringtype) + self.add_widget('uri', urivalue) + self.widgets.append(EndModule(self)) + + +class EndIO(EndNode): + parent_cls = IOWidget + helptext = 'RET: add module' + + +class NodeName(Value): + def __init__(self, main, name): + self.main = main + super().__init__(name) + + def set_from_string(self, value): + self.error = self.main.set_node_name(value) + if self.error: + self.value = self.strvalue = self.main.cfgname + else: + self.value = self.strvalue = value + + +class NodeWidget(BaseWidget): + header = 'Node' + special_names = 'name', 'equipment_id', 'interface', 'title', 'doc' + summ_edit = {'name', 'title', 'doc'} # editable widgets in summary + + def __init__(self, parent, name, nodecfg): + nodecfg['name'] = NodeName(parent, name) + self.init(parent) + self.context_menu = [ + MenuItem('add parameter/property', 'p', self.new_widget), + # MenuItem('select line', '^K', self.select, None), + ] + for name, valobj in nodecfg.items(): + if name == 'doc': + docwidget = DocWidget(self, name, valobj) + self.widgets.append(docwidget) + self.widget_dict['doc'] = docwidget + else: + self.add_widget(name, valobj) + self.widgets.append(EndNode(self)) + + def new_widget(self, name=''): + """insert new widget at focus pos""" + self.add_widget(name, Value('', None, from_string=True), self.focus) + + def get_name(self): + return 'node' + + def set_focus(self, focus, step=1): + self.log.info('node focus %r %r %r', self.focus, focus, step) + while super().set_focus(focus, step): + self.log.info('node find %r name %r', self.focus, self.get_focus_widget().get_name()) + if self.parent.detailed or self.get_focus_widget().get_name() in self.summ_edit: + self.log.info('found') + return True + focus = self.focus + step + self.log.info('node focus end %r', self.focus) + + def height(self): + main = self.parent + return sum(w.height() for w in self.widgets if main.detailed or w.get_name() in self.summ_edit) + + + # def handle(self, main): + # if main.detailed: + # return super().handle(main) + # advance = self.next + # key = None + # itry = 10 + # while itry > 0: + # widget = self.get_focus_widget() + # if widget.get_name() in ('title', 'doc'): + # key = widget.handle(main) + # itry = 10 + # # if key in (CTRL_X, K.LEFT): + # # return key + # if key == K.UP: + # advance = self.prev + # elif key in (K.DOWN, K.RETURN, K.ENTER): + # advance = self.next + # else: + # return key + # if not advance(): + # return key + # itry -= 1 + # raise ValueError('x') + + def draw_summary(self, wr, in_focus): + # wr.startrow() + # wr.norm('Node ') + wr.set_leftwidth(7) + focus = self.focus if in_focus else None + for nr, widget in enumerate(self.widgets): + name = widget.get_name() + if name in self.summ_edit: + widget.draw(wr, nr == focus) + + +HELP_TEXT = """ +ctrl-X Exit +ctrl-T Toggle view mode (summary <-> detailed) +""" + + +class EditorMain(Main): + name = 'Main' + detailed = False + tmppath = None + help_text = HELP_TEXT + version_view = 0 # current version or when > 0 previous versions (not editable) + completion_widget = None # widget currently running a thread for guesses + leftwidth = 0.15 + cut_modules = () + cut_extend = False + + def __init__(self, cfg): + self.titlebar = TitleBar('Frappy Cfg Editor') + super().__init__([], [self.titlebar], [StatusBar(self)]) + # self.select_menu = MenuItem('select module', CUT_KEY) + self.version_menu = [ + MenuItem('previous version', K.PREV_VERSION, self.prev_version), + MenuItem('next version', K.NEXT_VERSION, self.next_version), + MenuItem('restore this version', 'r', self.restore_version), + ] + self.main_menu = [ + MenuItem('show previous version', K.PREV_VERSION, self.prev_version), + ] + self.detailed_menuitem = MenuItem('toggle detailed', K.TOGGLE_DETAILED, self.toggle_detailed) + self.cut_menuitem = MenuItem('insert cut modules', K.PASTE, self.insert_cut) + self.version_dir = Path('~/.local/share/frappy_config_editor').expanduser() + self.version_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + self.cfgname = None + self.pidfile = None + self.dirty = False + # cleanup pidfiles + for file in self.version_dir.glob('*.pid'): + pidstr = file.read_text() + if not pid_exists(int(pidstr)): + file.unlink() + cfgpath = Path(cfg) + if cfg != cfgpath.stem: # this is a filename + if cfg.endswith('_cfg.py'): + cfg = cfgpath.name[:-7] + else: + cfg = self.cfgpath.stem + else: + cfgpath = None + self.filecontent = None + error = self.set_node_name(cfg, cfgpath) + if error: + raise RuntimeError(error) + self.init_from_content(self.filecontent) + self.module_clipboard = {} + self.pr_clipboard = {} + + def get_menu(self): + self.detailed_menuitem.name = 'summary view' if self.detailed else 'detailed view' + cmenu = [self.cut_menuitem] if self.cut_modules else [] + vmenu = self.version_menu if self.version_view else self.main_menu + return cmenu + vmenu + [self.detailed_menuitem] + self.context_menu + + def init_from_content(self, filecontent): + self.log.info('init from content %r', len(filecontent)) + widgets = [] + nodedata, moddata = cfgdata_from_py(self.cfgname, self.cfgpath, filecontent, self.log) + widgets.append(NodeWidget(self, self.cfgname, nodedata)) + for key, modcfg in moddata.items(): + clsvalue = modcfg.get('cls') + if clsvalue: + if clsvalue.value == '': + widgets.append(IOWidget(self, key, modcfg)) + continue + else: + modcfg['cls'] = Value('', ModuleClass, from_string=True) + widgets.append(ModuleWidget(self, key, modcfg)) + self.log.info('widgets %r', len(self.widgets)) + self.widgets = widgets + + def toggle_detailed(self): + self.detailed = not self.detailed + self.offset = None # recalculate offset from screen pos + self.status(None) + + def get_key(self): + if self.dirty: + if not self.version_view: + self.save() + self.dirty = False + while True: + if self.completion_widget: + key = super().get_key(0.1) + if key is None: + continue + else: + key = super().get_key() + if self.version_view: + if isinstance(key, str) or key in [K.DEL]: + self.status('', 'can not edit previous version') + else: + break + else: + break + return key + + def cut_module(self): + if not self.cut_modules: + self.cut_extend = False + module = self.get_focus_widget() + if not isinstance(module, ModuleWidget): + self.status('', warn='can not cut node') + return + self.widgets[self.focus:self.focus+1] = [] + if not self.cut_extend: + self.cut_modules = [] + self.log.info('start cut modules') + self.cut_modules.append(module) + self.cut_extend = True + self.status(f'{len(self.cut_modules)} modules buffered') + + def insert_cut(self): + if self.cut_modules: + self.widgets[self.focus:self.focus] = self.cut_modules + self.cut_modules = [] + + def set_node_name(self, name, cfgpath=None): + if name == self.cfgname: + return None + if not name: + return f'{name!r} is not a valid node name' + if self.pidfile: + self.pidfile.unlink() + self.pidfile = self.version_dir / f'{name}.pid' + try: + pidstr = self.pidfile.read_text() + if pid_exists(int(pidstr)): + return f'{name} is already edited by process {pidstr}' + # overwrite pidfile from dead process + except FileNotFoundError: + pass + self.cfgname = name + versions_path = self.version_dir / f'{name}.versions' + try: + sections = versions_path.read_text().split(VERSION_SEPARATOR) + assert sections.pop(0) == '' + except FileNotFoundError: + sections = [] + self.versions = dict((v.split('\n', 1) for v in sections)) + self.tmppath = self.version_dir / f'{name}.current' + if cfgpath: + cfgpaths = [cfgpath] + else: + try: + cfgpaths = [to_config_path(name, self.log)] + except ConfigError: + cfgpaths = [] + cfgpaths.append(self.tmppath) + for cfgpath in cfgpaths: + try: + filecontent = cfgpath.read_text() + self.cfgpath = cfgpath + timestamp = time.strftime(TIMESTAMP_FMT, time.localtime(cfgpath.stat().st_mtime)) + break + except FileNotFoundError: + pass + else: + filecontent = None + timestamp = time.strftime(TIMESTAMP_FMT) + self.cfgpath = cfgpaths[0] + self.filecontent = filecontent + self.add_version(filecontent, timestamp) + return None + + def add_version(self, filecontent, timestamp): + if self.versions: + # remove last version if unchanged + key, content = next(reversed(self.versions.items())) + if content == filecontent: + self.versions.pop(key) + if filecontent: + self.versions[timestamp] = filecontent + sep = VERSION_SEPARATOR + versions_path = self.version_dir / f'{self.cfgname}.versions' + tmpname = versions_path.with_suffix('.tmp') + with open(tmpname, 'w') as f: + for ts, section in self.versions.items(): + f.write(sep) + f.write(f'{ts}\n') + f.write(section) + os.rename(tmpname, versions_path) + + def restore_version(self): + if not self.version_view: + self.status('this is already the current version') + return + self.popupmenu = menu = ConfirmDialog('restore this version? [N]') + if not self.popupmenu.handle(self): + self.status('cancelled restore') + return + version = list(self.versions)[-self.version_view] + self.status(f'restored from {version}') + content = self.versions.pop(version) + self.add_version(self.filecontent, time.strftime(TIMESTAMP_FMT)) + self.filecontent = content + self.version_view = 0 + self.titlebar.right = '' + self.init_from_content(self.filecontent) + self.save() + + def set_version(self): + if self.version_view: + version = list(self.versions)[-self.version_view] + self.titlebar.right = f'version {version}' + try: + self.init_from_content(self.versions[version]) + self.status(None) + except Exception as e: + self.status('', f'bad version: {e}') + else: + self.init_from_content(self.filecontent) + self.titlebar.right = '' + + def prev_version(self): + maxv = len(self.versions) + self.version_view += 1 + self.log.info('back to version %r', self.version_view) + if self.version_view > maxv: + self.status('this is the oldest version') + self.version_view = maxv + else: + self.set_version() + + def next_version(self): + if self.version_view: + self.version_view -= 1 + self.set_version() + else: + self.status('this is the current version') + + def current_row(self): + return super().current_row() + self.get_topmargin() + + def touch(self): + self.dirty = True + + def save(self): + cfgdata = {} + for widget in self.widgets: + widget.collect(cfgdata) + if 'node' not in cfgdata: + raise ValueError(list(cfgdata), len(self.widgets)) + config_code = cfgdata_to_py(**cfgdata) + # if self.cfgpath: + # self.cfgpath.write_text(config_code) + self.tmppath.write_text(config_code) + return config_code + + def finish(self, exc): + # TODO: ask where to save tmp file + self.save() + + def advance(self, step): + done = super().advance(step) + if done: + self.get_focus_widget().set_focus(None, step) + return done + + def run(self): + try: + self.pidfile.write_text(str(os.getpid())) + super().run(Writer) + except Exception: + print(formatExtendedTraceback()) + finally: + self.pidfile.unlink() + + +if __name__ == "__main__": + os.environ['FRAPPY_CONFDIR'] = 'cfg:cfg/main:cfg/stick:cfg/addons' + generalConfig.init() + EditorMain(sys.argv[1]).run()