From 5c33cbf7a5faad5630e3eae12f2927e7f1a3da07 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Fri, 15 May 2020 09:30:52 +0200 Subject: [PATCH] introduce update callbacks includes a use case: - a software calibration, to be applied to any Readable. - calibration could be changed on the fly + refactored a little bit update events mechanism Change-Id: Ifa340770caa9eb2185fe7e912c51bd9ddb411ece Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/23093 Tested-by: JenkinsCodeReview Reviewed-by: Markus Zolliker --- cfg/softcal.cfg | 12 +++ secop/core.py | 1 + secop/iohandler.py | 2 +- secop/metaclass.py | 15 +-- secop/modules.py | 74 ++++++++++++-- secop/protocol/dispatcher.py | 23 +---- secop/proxy.py | 55 ++++++----- secop_psi/softcal.py | 185 +++++++++++++++++++++++++++++++++++ test/test_iohandler.py | 12 +-- test/test_modules.py | 14 +-- 10 files changed, 315 insertions(+), 78 deletions(-) create mode 100644 cfg/softcal.cfg create mode 100644 secop_psi/softcal.py diff --git a/cfg/softcal.cfg b/cfg/softcal.cfg new file mode 100644 index 0000000..c6f2439 --- /dev/null +++ b/cfg/softcal.cfg @@ -0,0 +1,12 @@ +[r3] +class = secop.core.Proxy +remote_class = secop.core.Readable +description = temp sensor on 3He system +uri = tcp://pc12694:5000 +export = False + +[t3] +class = secop_psi.softcal.Sensor +rawsensor = r3 +calib = X131346 +value.unit = K diff --git a/secop/core.py b/secop/core.py index 534868a..d792c4c 100644 --- a/secop/core.py +++ b/secop/core.py @@ -30,6 +30,7 @@ from secop.datatypes import FloatRange, IntRange, ScaledInteger, \ BoolType, EnumType, BLOBType, StringType, TupleOf, ArrayOf, StructOf from secop.lib.enum import Enum from secop.modules import Module, Readable, Writable, Drivable, Communicator, Attached +from secop.properties import Property from secop.params import Parameter, Command, Override from secop.metaclass import Done from secop.iohandler import IOHandler, IOHandlerBase diff --git a/secop/iohandler.py b/secop/iohandler.py index e88dba7..eb6908e 100644 --- a/secop/iohandler.py +++ b/secop/iohandler.py @@ -286,7 +286,7 @@ class IOHandler(IOHandlerBase): except Exception as e: # set all parameters of this handler to error for pname in self.parameters: - module.setError(pname, e) + module.announceUpdate(pname, None, e) raise return Done diff --git a/secop/metaclass.py b/secop/metaclass.py index 548c121..becdb8d 100644 --- a/secop/metaclass.py +++ b/secop/metaclass.py @@ -23,7 +23,6 @@ """Define Metaclass for Modules/Features""" -import time from collections import OrderedDict from secop.errors import ProgrammingError, BadValueError @@ -31,8 +30,6 @@ from secop.params import Command, Override, Parameter from secop.datatypes import EnumType from secop.properties import PropertyMeta -EVENT_ONLY_ON_CHANGED_VALUES = False - class Done: """a special return value for a read/write function @@ -149,7 +146,7 @@ class ModuleMeta(PropertyMeta): return getattr(self, pname) except Exception as e: self.log.debug("rfunc(%s) failed %r" % (pname, e)) - self.setError(pname, e) + self.announceUpdate(pname, None, e) raise else: # return cached value @@ -198,15 +195,7 @@ class ModuleMeta(PropertyMeta): return self.accessibles[pname].value def setter(self, value, pname=pname): - pobj = self.accessibles[pname] - value = pobj.datatype(value) - pobj.timestamp = time.time() - if (not EVENT_ONLY_ON_CHANGED_VALUES) or (value != pobj.value): - pobj.value = value - # also send notification - if pobj.export: - self.log.debug('%s is now %r' % (pname, value)) - self.DISPATCHER.announce_update(self, pname, pobj) + self.announceUpdate(pname, value) setattr(newtype, pname, property(getter, setter)) diff --git a/secop/modules.py b/secop/modules.py index e04fc8d..0940ba5 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -29,7 +29,8 @@ from collections import OrderedDict from secop.datatypes import EnumType, FloatRange, BoolType, IntRange, \ StringType, TupleOf, get_datatype, ArrayOf, TextType, StatusType -from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError, SilentError +from secop.errors import ConfigError, ProgrammingError, SECoPError, BadValueError,\ + SilentError, InternalError, secop_error from secop.lib import formatException, formatExtendedStack, mkthread from secop.lib.enum import Enum from secop.metaclass import ModuleMeta @@ -94,6 +95,8 @@ class Module(HasProperties, metaclass=ModuleMeta): self.DISPATCHER = srv.dispatcher self.log = logger self.name = name + self.valueCallbacks = {} + self.errorCallbacks = {} # handle module properties # 1) make local copies of properties @@ -199,6 +202,9 @@ class Module(HasProperties, metaclass=ModuleMeta): # is not specified in cfgdict and deal with parameters to be written. self.writeDict = {} # values of parameters to be written for pname, pobj in self.parameters.items(): + self.valueCallbacks[pname] = [] + self.errorCallbacks[pname] = [] + if pname in cfgdict: if not pobj.readonly and pobj.initwrite is not False: # parameters given in cfgdict have to call write_ @@ -265,14 +271,68 @@ class Module(HasProperties, metaclass=ModuleMeta): def __getitem__(self, item): return self.accessibles.__getitem__(item) - def setError(self, pname, exception): - """sets a parameter to a read error state - - the error will be cleared when the parameter is set - """ + def announceUpdate(self, pname, value=None, err=None, timestamp=None): + """announce a changed value or readerror""" pobj = self.parameters[pname] + if value is not None: + pobj.value = value # store the value even in case of error + if err: + if not isinstance(err, SECoPError): + err = InternalError(err) + if str(err) == str(pobj.readerror): + return # do call updates for repeated errors + else: + try: + pobj.value = pobj.datatype(value) + except Exception as e: + err = secop_error(e) + pobj.timestamp = timestamp or time.time() + pobj.readerror = err if pobj.export: - self.DISPATCHER.announce_update_error(self, pname, pobj, exception) + self.DISPATCHER.announce_update(self.name, pname, pobj) + if err: + callbacks = self.errorCallbacks + arg = err + else: + callbacks = self.valueCallbacks + arg = value + cblist = callbacks[pname] + for cb in cblist: + try: + cb(arg) + except Exception as e: + # print(formatExtendedTraceback()) + pass + + def registerCallbacks(self, modobj, autoupdate=()): + for pname in self.parameters: + errfunc = getattr(modobj, 'error_update_' + pname, None) + if errfunc: + def errcb(err, p=pname, efunc=errfunc): + try: + efunc(err) + except Exception as e: + modobj.announceUpdate(p, err=e) + self.errorCallbacks[pname].append(errcb) + else: + def errcb(err, p=pname): + modobj.announceUpdate(p, err=err) + if pname in autoupdate: + self.errorCallbacks[pname].append(errcb) + + updfunc = getattr(modobj, 'update_' + pname, None) + if updfunc: + def cb(value, ufunc=updfunc, efunc=errcb): + try: + ufunc(value) + except Exception as e: + efunc(e) + self.valueCallbacks[pname].append(cb) + elif pname in autoupdate: + def cb(value, p=pname): + modobj.announceUpdate(p, value) + self.valueCallbacks[pname].append(cb) + def isBusy(self, status=None): """helper function for treating substates of BUSY correctly""" diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index cd2360e..1cff14d 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -43,8 +43,7 @@ from collections import OrderedDict from time import time as currenttime from secop.errors import BadValueError, NoSuchCommandError, NoSuchModuleError, \ - NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPServerError, InternalError,\ - SECoPError + NoSuchParameterError, ProtocolError, ReadOnlyError, SECoPServerError from secop.params import Parameter from secop.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ @@ -101,26 +100,10 @@ class Dispatcher: for conn in listeners: conn.queue_async_reply(msg) - def announce_update(self, moduleobj, pname, pobj): + def announce_update(self, modulename, pname, pobj): """called by modules param setters to notify subscribers of new values """ - # argument pname is no longer used here - should we remove it? - pobj.readerror = None - self.broadcast_event(make_update(moduleobj.name, pobj)) - - def announce_update_error(self, moduleobj, pname, pobj, err): - """called by modules param setters/getters to notify subscribers - - of problems - """ - # argument pname is no longer used here - should we remove it? - if not isinstance(err, SECoPError): - err = InternalError(err) - if str(err) == str(pobj.readerror): - return # do not send updates for repeated errors - pobj.readerror = err - pobj.timestamp = currenttime() # indicates the first time this error appeared - self.broadcast_event(make_update(moduleobj.name, pobj)) + self.broadcast_event(make_update(modulename, pobj)) def subscribe(self, conn, eventname): self._subscriptions.setdefault(eventname, set()).add(conn) diff --git a/secop/proxy.py b/secop/proxy.py index a66c867..b494839 100644 --- a/secop/proxy.py +++ b/secop/proxy.py @@ -21,41 +21,37 @@ # ***************************************************************************** """SECoP proxy modules""" -from secop.lib import get_class -from secop.modules import Module, Writable, Readable, Drivable, Attached -from secop.datatypes import StringType -from secop.protocol.dispatcher import make_update -from secop.properties import Property -from secop.client import SecopClient, decode_msg, encode_msg_frame from secop.params import Parameter, Command -from secop.errors import ConfigError, make_secop_error, secop_error +from secop.modules import Module, Writable, Readable, Drivable +from secop.datatypes import StringType +from secop.properties import Property +from secop.stringio import HasIodev +from secop.lib import get_class +from secop.client import SecopClient, decode_msg, encode_msg_frame +from secop.errors import ConfigError, make_secop_error, CommunicationFailedError - -class ProxyModule(Module): +class ProxyModule(HasIodev, Module): properties = { - 'iodev': Attached(), 'module': Property('remote module name', datatype=StringType(), default=''), } + pollerClass = None _consistency_check_done = False _secnode = None + def iodevClass(self, name, logger, opts, srv): + opts['description'] = 'secnode %s on %s' % (opts.get('module', name), opts['uri']) + return SecNode(name, logger, opts, srv) + def updateEvent(self, module, parameter, value, timestamp, readerror): - pobj = self.parameters[parameter] - pobj.timestamp = timestamp + if parameter not in self.parameters: + return # ignore unknown parameters # should be done here: deal with clock differences if readerror: readerror = make_secop_error(*readerror) - if not readerror: - try: - pobj.value = value # store the value even in case of a validation error - pobj.value = pobj.datatype(value) - except Exception as e: - readerror = secop_error(e) - pobj.readerror = readerror - self.DISPATCHER.broadcast_event(make_update(self.name, pobj)) + self.announceUpdate(parameter, value, readerror, timestamp) def initModule(self): if not self.module: @@ -116,9 +112,17 @@ class ProxyModule(Module): # for now, the error message must be enough def nodeStateChange(self, online, state): - if online and not self._consistency_check_done: - self._check_descriptive_data() - self._consistency_check_done = True + if online: + if not self._consistency_check_done: + self._check_descriptive_data() + self._consistency_check_done = True + else: + newstatus = Readable.Status.ERROR, 'disconnected' + readerror = CommunicationFailedError('disconnected') + if self.status != newstatus: + for pname in set(self.parameters) - set(('module', 'status')): + self.announceUpdate(pname, None, readerror) + self.announceUpdate('status', newstatus) class ProxyReadable(ProxyModule, Readable): @@ -164,7 +168,7 @@ def proxy_class(remote_class, name=None): remote class is . of a class used on the remote node if name is not given, 'Proxy' + is used """ - if issubclass(remote_class, Module): + if isinstance(remote_class, type) and issubclass(remote_class, Module): rcls = remote_class remote_class = rcls.__name__ else: @@ -231,4 +235,7 @@ def Proxy(name, logger, cfgdict, srv): title cased as it acts like a class """ remote_class = cfgdict.pop('remote_class') + if 'description' not in cfgdict: + cfgdict['description'] = 'remote module %s on %s' % ( + cfgdict.get('module', name), cfgdict.get('iodev', '?')) return proxy_class(remote_class)(name, logger, cfgdict, srv) diff --git a/secop_psi/softcal.py b/secop_psi/softcal.py new file mode 100644 index 0000000..f8c0d78 --- /dev/null +++ b/secop_psi/softcal.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""Software calibration""" + +import os +from os.path import join, exists, basename +import math +import numpy as np +from scipy.interpolate import splrep, splev # pylint: disable=import-error + +from secop.core import Readable, Parameter, Override, Attached, StringType + + +def linear(x): + return x + + +nplog = np.vectorize(math.log10) +npexp = np.vectorize(lambda x: 10 ** x) + + +class StdParser: + """parser used for reading columns""" + def __init__(self, **kwds): + """keys may be other 'x' or 'logx' and either 'y' or 'logy' + + default is x=0, y=1 + """ + self.xcol = int(kwds.get('x', kwds.get('logx', 0))) + self.ycol = int(kwds.get('y', kwds.get('logy', 1))) + self.logx = 'logx' in kwds + self.logy = 'logy' in kwds + self.xdata, self.ydata = [], [] + + def parse(self, line): + """get numbers from a line and put them to self.output""" + row = line.split() + try: + self.xdata.append(float(row[self.xcol])) + self.ydata.append(float(row[self.ycol])) + except (IndexError, ValueError): + # skip bad lines + return + + +class Parser340(StdParser): + """parser for LakeShore *.340 files""" + + def __init__(self): + super().__init__() + self.header = True + self.xcol, self.ycol = 1, 2 + self.logx, self.logy = False, False + + def parse(self, line): + """scan header for data format""" + if self.header: + if line.startswith("Data Format"): + dataformat = line.split(":")[1].strip()[0] + if dataformat == '4': + self.logx, self.logy = True, False # logOhm + elif dataformat == '5': + self.logx, self.logy = True, True # logOhm, logK + elif line.startswith("No."): + self.header = False + return + super().parse(line) + + +KINDS = { + "340": (Parser340, {}), # lakeshore 340 format + "inp": (StdParser, {}), # M. Zollikers *.inp calcurve format + "caldat": (StdParser, dict(x=1, y=2)), # format from sea/tcl/startup/calib_ext.tcl + "dat": (StdParser, {}), # lakeshore raw data *.dat format +} + + +class CalCurve: + def __init__(self, calibspec): + """calibspec format: + [ | ][,= ...] + for / as in parser arguments + """ + sensopt = calibspec.split(',') + calibname = sensopt.pop(0) + _, dot, ext = basename(calibname).rpartition('.') + for path in os.environ.get('FRAPPY_CALIB_PATH', '').split(','): + # first try without adding kind + filename = join(path.strip(), calibname) + if exists(filename): + kind = ext if dot else None + break + # then try adding all kinds as extension + for kind in KINDS: + filename = join(path.strip(), '%s.%s' % (calibname, kind)) + if exists(filename): + break + else: + continue + break + else: + raise FileNotFoundError(calibname) + optargs = {} + for opts in sensopt: + key, _, value = opts.lower().rpartition('=') + value = value.strip() + if value: + optargs[key.strip()] = value + kind = optargs.pop('kind', kind) + cls, args = KINDS.get(kind, (StdParser, {})) + args.update(optargs) + + parser = cls(**args) + with open(filename) as f: + for line in f: + parser.parse(line) + self.convert_x = nplog if parser.logx else linear + self.convert_y = npexp if parser.logy else linear + self.spline = splrep(np.asarray(parser.xdata), np.asarray(parser.ydata), s=0) + + def __call__(self, value): + """convert value + + value might be a single value or an numpy array + """ + result = splev(self.convert_x(value), self.spline) + return self.convert_y(result) + + +class Sensor(Readable): + properties = { + 'rawsensor': Attached(), + } + parameters = { + 'calib': Parameter('calibration name', datatype=StringType(), readonly=False), + 'value': Override(unit='K'), + 'pollinterval': Override(export=False), + 'status': Override(default=(Readable.Status.ERROR, 'unintialized')) + } + pollerClass = None + description = 'a calibrated sensor value' + _value_error = None + + def initModule(self): + self._rawsensor.registerCallbacks(self, ['status']) # auto update status + self._calib = CalCurve(self.calib) + + def write_calib(self, value): + self._calib = CalCurve(value) + return value + + def update_value(self, value): + self.value = self._calib(value) + self._value_error = None + + def error_update_value(self, err): + self._value_error = repr(err) + raise err + + def update_status(self, value): + if self._value_error is None: + self.status = value + else: + self.status = self.Status.ERROR, self._value_error + + def read_value(self): + return self._calib(self._rawsensor.read_value()) diff --git a/test/test_iohandler.py b/test/test_iohandler.py index 162204a..18ff740 100644 --- a/test/test_iohandler.py +++ b/test/test_iohandler.py @@ -73,17 +73,17 @@ class DispatcherStub: def __init__(self, updates): self.updates = updates - def announce_update(self, moduleobj, pname, pobj): - self.updates[pname] = pobj.value - - def announce_update_error(self, moduleobj, pname, pobj, err): - self.updates[('error', pname)] = str(err) + def announce_update(self, module, pname, pobj): + if pobj.readerror: + self.updates['error', pname] = str(pobj.readerror) + else: + self.updates[pname] = pobj.value class LoggerStub: def debug(self, *args): pass - info = exception = debug + info = warning = exception = debug class ServerStub: diff --git a/test/test_modules.py b/test/test_modules.py index 900eae6..9d4ae97 100644 --- a/test/test_modules.py +++ b/test/test_modules.py @@ -36,18 +36,18 @@ class DispatcherStub: def __init__(self, updates): self.updates = updates - def announce_update(self, moduleobj, pname, pobj): - self.updates.setdefault(moduleobj.name, {}) - self.updates[moduleobj.name][pname] = pobj.value - - def announce_update_error(self, moduleobj, pname, pobj, err): - self.updates['error', moduleobj.name, pname] = str(err) + def announce_update(self, modulename, pname, pobj): + self.updates.setdefault(modulename, {}) + if pobj.readerror: + self.updates[modulename]['error', pname] = str(pobj.readerror) + else: + self.updates[modulename][pname] = pobj.value class LoggerStub: def debug(self, *args): print(*args) - info = exception = debug + info = warning = exception = debug class ServerStub: