From bd246c5ca74a88141c9a730dffe5b95a5b63fc12 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Mon, 7 Mar 2022 17:49:08 +0100 Subject: [PATCH] result from merge with gerrit secop subdir only Change-Id: I65ab7049719b374ae3ec0259483e7e7d16aafcd1 --- secop/core.py | 5 +- secop/datatypes.py | 86 +++++--- secop/gui/cfg_editor/utils.py | 6 +- secop/io.py | 102 ++++++--- secop/iohandler.py | 14 +- secop/lib/__init__.py | 110 ++++++---- secop/lib/classdoc.py | 21 +- secop/lib/multievent.py | 120 ++++++++--- secop/lib/sequence.py | 6 +- secop/logging.py | 192 ++++++++++------- secop/modules.py | 388 ++++++++++++++++++++-------------- secop/params.py | 67 ++---- secop/persistent.py | 7 +- secop/properties.py | 21 +- secop/protocol/__init__.py | 2 +- secop/protocol/dispatcher.py | 46 ++-- secop/protocol/messages.py | 14 +- secop/proxy.py | 23 +- secop/server.py | 103 ++++----- secop/simulation.py | 10 +- 20 files changed, 760 insertions(+), 583 deletions(-) diff --git a/secop/core.py b/secop/core.py index 986f57c..be3d338 100644 --- a/secop/core.py +++ b/secop/core.py @@ -33,8 +33,9 @@ from secop.lib.enum import Enum from secop.modules import Attached, Communicator, \ Done, Drivable, Module, Readable, Writable from secop.params import Command, Parameter -from secop.poller import AUTO, DYNAMIC, REGULAR, SLOW from secop.properties import Property from secop.proxy import Proxy, SecNode, proxy_class -from secop.io import HasIodev, StringIO, BytesIO +from secop.io import HasIO, StringIO, BytesIO, HasIodev # TODO: remove HasIodev (legacy stuff) from secop.persistent import PersistentMixin, PersistentParam +from secop.rwhandler import ReadHandler, WriteHandler, CommonReadHandler, \ + CommonWriteHandler, nopoll diff --git a/secop/datatypes.py b/secop/datatypes.py index ef11d07..ada53f6 100644 --- a/secop/datatypes.py +++ b/secop/datatypes.py @@ -30,29 +30,25 @@ from base64 import b64decode, b64encode from secop.errors import BadValueError, \ ConfigError, ProgrammingError, ProtocolError -from secop.lib import clamp +from secop.lib import clamp, generalConfig from secop.lib.enum import Enum from secop.parse import Parser from secop.properties import HasProperties, Property -# Only export these classes for 'from secop.datatypes import *' -__all__ = [ - 'DataType', 'get_datatype', - 'FloatRange', 'IntRange', 'ScaledInteger', - 'BoolType', 'EnumType', - 'BLOBType', 'StringType', 'TextType', - 'TupleOf', 'ArrayOf', 'StructOf', - 'CommandType', 'StatusType', -] - # *DEFAULT* limits for IntRange/ScaledIntegers transport serialisation DEFAULT_MIN_INT = -16777216 DEFAULT_MAX_INT = 16777216 UNLIMITED = 1 << 64 # internal limit for integers, is probably high enough for any datatype size +generalConfig.defaults['lazy_number_validation'] = False Parser = Parser() +class DiscouragedConversion(BadValueError): + """the discouraged conversion string - > float happened""" + log_message = True + + # base class for all DataTypes class DataType(HasProperties): """base class for all data types""" @@ -63,7 +59,7 @@ class DataType(HasProperties): def __call__(self, value): """check if given value (a python obj) is valid for this datatype - returns the value or raises an appropriate exception""" + returns the (possibly converted) value or raises an appropriate exception""" raise NotImplementedError def from_string(self, text): @@ -192,9 +188,15 @@ class FloatRange(DataType): def __call__(self, value): try: - value = float(value) + value += 0.0 # do not accept strings here except Exception: - raise BadValueError('Can not convert %r to float' % value) from None + try: + value = float(value) + except Exception: + raise BadValueError('Can not convert %r to float' % value) from None + if not generalConfig.lazy_number_validation: + raise DiscouragedConversion('automatic string to float conversion no longer supported') from None + # map +/-infty to +/-max possible number value = clamp(-sys.float_info.max, value, sys.float_info.max) @@ -232,6 +234,18 @@ class FloatRange(DataType): return ' '.join([self.fmtstr % value, unit]) return self.fmtstr % value + def problematic_range(self, target_type): + """check problematic range + + returns True when self.min or self.max is given, not 0 and equal to the same limit on target_type. + """ + value_info = self.get_info() + target_info = target_type.get_info() + minval = value_info.get('min') # None when -infinite + maxval = value_info.get('max') # None when +infinite + return ((minval and minval == target_info.get('min')) or + (maxval and maxval == target_info.get('max'))) + def compatible(self, other): if not isinstance(other, (FloatRange, ScaledInteger)): raise BadValueError('incompatible datatypes') @@ -265,10 +279,16 @@ class IntRange(DataType): def __call__(self, value): try: - fvalue = float(value) + fvalue = value + 0.0 # do not accept strings here value = int(value) except Exception: - raise BadValueError('Can not convert %r to int' % value) from None + try: + fvalue = float(value) + value = int(value) + except Exception: + raise BadValueError('Can not convert %r to int' % value) from None + if not generalConfig.lazy_number_validation: + raise DiscouragedConversion('automatic string to float conversion no longer supported') from None if not self.min <= value <= self.max or round(fvalue) != fvalue: raise BadValueError('%r should be an int between %d and %d' % (value, self.min, self.max)) @@ -298,13 +318,15 @@ class IntRange(DataType): return '%d' % value def compatible(self, other): - if isinstance(other, IntRange): + if isinstance(other, (IntRange, FloatRange, ScaledInteger)): other(self.min) other(self.max) return - # this will accept some EnumType, BoolType - for i in range(self.min, self.max + 1): - other(i) + if isinstance(other, (EnumType, BoolType)): + # the following loop will not cycle more than the number of Enum elements + for i in range(self.min, self.max + 1): + other(i) + raise BadValueError('incompatible datatypes') class ScaledInteger(DataType): @@ -369,9 +391,14 @@ class ScaledInteger(DataType): def __call__(self, value): try: - value = float(value) + value += 0.0 # do not accept strings here except Exception: - raise BadValueError('Can not convert %r to float' % value) from None + try: + value = float(value) + except Exception: + raise BadValueError('Can not convert %r to float' % value) from None + if not generalConfig.lazy_number_validation: + raise DiscouragedConversion('automatic string to float conversion no longer supported') from None prec = max(self.scale, abs(value * self.relative_resolution), self.absolute_resolution) if self.min - prec <= value <= self.max + prec: @@ -849,7 +876,7 @@ class ImmutableDict(dict): class StructOf(DataType): """data structure with named fields - :param optional: a list of optional members (default None: all members optional) + :param optional: a list of optional members :param members: names as keys and types as values for all members """ def __init__(self, optional=None, **members): @@ -857,7 +884,7 @@ class StructOf(DataType): self.members = members if not members: raise BadValueError('Empty structs are not allowed!') - self.optional = list(members) if optional is None else list(optional) + self.optional = list(optional or []) for name, subtype in list(members.items()): if not isinstance(subtype, DataType): raise ProgrammingError( @@ -1118,37 +1145,26 @@ def floatargs(kwds): DATATYPES = dict( bool = lambda **kwds: BoolType(), - int = lambda min, max, **kwds: IntRange(minval=min, maxval=max), - scaled = lambda scale, min, max, **kwds: ScaledInteger(scale=scale, minval=min*scale, maxval=max*scale, **floatargs(kwds)), - double = lambda min=None, max=None, **kwds: FloatRange(minval=min, maxval=max, **floatargs(kwds)), - blob = lambda maxbytes, minbytes=0, **kwds: BLOBType(minbytes=minbytes, maxbytes=maxbytes), - string = lambda minchars=0, maxchars=None, isUTF8=False, **kwds: StringType(minchars=minchars, maxchars=maxchars, isUTF8=isUTF8), - array = lambda maxlen, members, minlen=0, pname='', **kwds: ArrayOf(get_datatype(members, pname), minlen=minlen, maxlen=maxlen), - tuple = lambda members, pname='', **kwds: TupleOf(*tuple((get_datatype(t, pname) for t in members))), - enum = lambda members, pname='', **kwds: EnumType(pname, members=members), - struct = lambda members, optional=None, pname='', **kwds: StructOf(optional, **dict((n, get_datatype(t, pname)) for n, t in list(members.items()))), - command = lambda argument=None, result=None, pname='', **kwds: CommandType(get_datatype(argument, pname), get_datatype(result)), - limit = lambda members, pname='', **kwds: LimitsType(get_datatype(members, pname)), ) diff --git a/secop/gui/cfg_editor/utils.py b/secop/gui/cfg_editor/utils.py index d85ee96..5e3b80a 100644 --- a/secop/gui/cfg_editor/utils.py +++ b/secop/gui/cfg_editor/utils.py @@ -29,7 +29,7 @@ from secop.modules import Module from secop.params import Parameter from secop.properties import Property from secop.protocol.interface.tcp import TCPServer -from secop.server import getGeneralConfig +from secop.server import generalConfig uipath = path.dirname(__file__) @@ -106,7 +106,7 @@ def get_file_paths(widget, open_file=True): def get_modules(): modules = {} - base_path = getGeneralConfig()['basedir'] + base_path = generalConfig.basedir # pylint: disable=too-many-nested-blocks for dirname in listdir(base_path): if dirname.startswith('secop_'): @@ -156,7 +156,7 @@ def get_interface_class_from_name(name): def get_interfaces(): # TODO class must be found out like for modules interfaces = [] - interface_path = path.join(getGeneralConfig()['basedir'], 'secop', + interface_path = path.join(generalConfig.basedir, 'secop', 'protocol', 'interface') for filename in listdir(interface_path): if path.isfile(path.join(interface_path, filename)) and \ diff --git a/secop/io.py b/secop/io.py index 4a4203d..48edf0c 100644 --- a/secop/io.py +++ b/secop/io.py @@ -29,50 +29,78 @@ import time import threading from secop.lib.asynconn import AsynConn, ConnectionClosed -from secop.datatypes import ArrayOf, BLOBType, BoolType, FloatRange, IntRange, StringType, TupleOf, ValueType -from secop.errors import CommunicationFailedError, CommunicationSilentError, ConfigError +from secop.datatypes import ArrayOf, BLOBType, BoolType, FloatRange, IntRange, \ + StringType, TupleOf, ValueType +from secop.errors import CommunicationFailedError, CommunicationSilentError, \ + ConfigError, ProgrammingError from secop.modules import Attached, Command, \ Communicator, Done, Module, Parameter, Property -from secop.poller import REGULAR +from secop.lib import generalConfig +generalConfig.defaults['legacy_hasiodev'] = False HEX_CODE = re.compile(r'[0-9a-fA-F][0-9a-fA-F]$') -class HasIodev(Module): +class HasIO(Module): """Mixin for modules using a communicator""" - iodev = Attached() + io = Attached() uri = Property('uri for automatic creation of the attached communication module', StringType(), default='') - iodevDict = {} + ioDict = {} + ioClass = None def __init__(self, name, logger, opts, srv): - iodev = opts.get('iodev') - Module.__init__(self, name, logger, opts, srv) + io = opts.get('io') + super().__init__(name, logger, opts, srv) if self.uri: opts = {'uri': self.uri, 'description': 'communication device for %s' % name, 'export': False} - ioname = self.iodevDict.get(self.uri) + ioname = self.ioDict.get(self.uri) if not ioname: - ioname = iodev or name + '_iodev' - iodev = self.iodevClass(ioname, srv.log.getChild(ioname), opts, srv) - srv.modules[ioname] = iodev - self.iodevDict[self.uri] = ioname - self.iodev = ioname - elif not self.iodev: - raise ConfigError("Module %s needs a value for either 'uri' or 'iodev'" % name) + ioname = io or name + '_io' + io = self.ioClass(ioname, srv.log.getChild(ioname), opts, srv) # pylint: disable=not-callable + io.callingModule = [] + srv.modules[ioname] = io + self.ioDict[self.uri] = ioname + self.io = ioname + elif not io: + raise ConfigError("Module %s needs a value for either 'uri' or 'io'" % name) def initModule(self): try: - self._iodev.read_is_connected() + self.io.read_is_connected() except (CommunicationFailedError, AttributeError): - # AttributeError: for missing _iodev? + # AttributeError: read_is_connected is not required for an io object pass super().initModule() - def sendRecv(self, command): - return self._iodev.communicate(command) + def communicate(self, *args): + return self.io.communicate(*args) + + def multicomm(self, *args): + return self.io.multicomm(*args) + + +class HasIodev(HasIO): + # TODO: remove this legacy mixin + iodevClass = None + + @property + def _iodev(self): + return self.io + + def __init__(self, name, logger, opts, srv): + self.ioClass = self.iodevClass + super().__init__(name, logger, opts, srv) + if generalConfig.legacy_hasiodev: + self.log.warn('using the HasIodev mixin is deprecated - use HasIO instead') + else: + self.log.error('legacy HasIodev no longer supported') + self.log.error('you may suppress this error message by running the server with --relaxed') + raise ProgrammingError('legacy HasIodev no longer supported') + self.sendRecv = self.communicate class IOBase(Communicator): @@ -80,7 +108,7 @@ class IOBase(Communicator): uri = Property('hostname:portnumber', datatype=StringType()) timeout = Parameter('timeout', datatype=FloatRange(0), default=2) wait_before = Parameter('wait time before sending', datatype=FloatRange(), default=0) - is_connected = Parameter('connection state', datatype=BoolType(), readonly=False, poll=REGULAR) + is_connected = Parameter('connection state', datatype=BoolType(), readonly=False, default=False) pollinterval = Parameter('reconnect interval', datatype=FloatRange(0), readonly=False, default=10) _reconnectCallbacks = None @@ -89,8 +117,8 @@ class IOBase(Communicator): _lock = None def earlyInit(self): - self._lock = threading.RLock() super().earlyInit() + self._lock = threading.RLock() def connectStart(self): raise NotImplementedError @@ -104,6 +132,9 @@ class IOBase(Communicator): self._conn = None self.is_connected = False + def doPoll(self): + self.read_is_connected() + def read_is_connected(self): """try to reconnect, when not connected @@ -155,6 +186,9 @@ class IOBase(Communicator): if removeme: self._reconnectCallbacks.pop(key) + def communicate(self, command): + return NotImplementedError + class StringIO(IOBase): """line oriented communicator @@ -219,7 +253,6 @@ class StringIO(IOBase): if not self.is_connected: self.read_is_connected() # try to reconnect if not self._conn: - self.log.debug('can not connect to %r' % self.uri) raise CommunicationSilentError('can not connect to %r' % self.uri) try: with self._lock: @@ -236,15 +269,15 @@ class StringIO(IOBase): if garbage is None: # read garbage only once garbage = self._conn.flush_recv() if garbage: - self.log.debug('garbage: %r' % garbage) + self.comLog('garbage: %r', garbage) self._conn.send(cmd + self._eol_write) - self.log.debug('> %s' % cmd.decode(self.encoding)) + self.comLog('> %s', cmd.decode(self.encoding)) reply = self._conn.readline(self.timeout) except ConnectionClosed as e: self.closeConnection() raise CommunicationFailedError('disconnected') from None reply = reply.decode(self.encoding) - self.log.debug('< %s' % reply) + self.comLog('< %s', reply) return reply except Exception as e: if str(e) == self._last_error: @@ -336,14 +369,14 @@ class BytesIO(IOBase): time.sleep(self.wait_before) garbage = self._conn.flush_recv() if garbage: - self.log.debug('garbage: %s', hexify(garbage)) + self.comLog('garbage: %r', garbage) self._conn.send(request) - self.log.debug('> %s', hexify(request)) + self.comLog('> %s', hexify(request)) reply = self._conn.readbytes(replylen, self.timeout) except ConnectionClosed as e: self.closeConnection() raise CommunicationFailedError('disconnected') from None - self.log.debug('< %s', hexify(reply)) + self.comLog('< %s', hexify(reply)) return self.getFullReply(request, reply) except Exception as e: if str(e) == self._last_error: @@ -352,6 +385,15 @@ class BytesIO(IOBase): self.log.error(self._last_error) raise + @Command((ArrayOf(TupleOf(BLOBType(), IntRange(0)))), result=ArrayOf(BLOBType())) + def multicomm(self, requests): + """communicate multiple request/replies in one row""" + replies = [] + with self._lock: + for request in requests: + replies.append(self.communicate(*request)) + return replies + def readBytes(self, nbytes): """read bytes @@ -368,7 +410,7 @@ class BytesIO(IOBase): :return: the full reply (replyheader + additional bytes) When the reply length is variable, :meth:`communicate` should be called - with the `replylen` argument set to the minimum expected length of the reply. + with the `replylen` argument set to minimum expected length of the reply. Typically this method determines then the length of additional bytes from the already received bytes (replyheader) and/or the request and calls :meth:`readBytes` to get the remaining bytes. diff --git a/secop/iohandler.py b/secop/iohandler.py index 072c5e5..770f2c1 100644 --- a/secop/iohandler.py +++ b/secop/iohandler.py @@ -126,7 +126,7 @@ class CmdParser: try: argformat % ((0,) * len(casts)) # validate argformat except ValueError as e: - raise ValueError("%s in %r" % (e, argformat)) + raise ValueError("%s in %r" % (e, argformat)) from None def format(self, *values): return self.fmt % values @@ -153,7 +153,7 @@ class Change: self._reply = None def __getattr__(self, key): - """return attribute from module key when not in self._valuedict""" + """return attribute from module key is not in self._valuedict""" if key in self._valuedict: return self._valuedict[key] return getattr(self._module, key) @@ -174,9 +174,6 @@ class Change: self._valuedict.update(result) return self._reply - def __repr__(self): - return 'Change<%s>' % ', '.join('%s=%r' % kv for kv in self._valuedict.items()) - class IOHandlerBase: """abstract IO handler @@ -245,7 +242,7 @@ class IOHandler(IOHandlerBase): contain the command separator at the end. """ querycmd = self.make_query(module) - reply = module.sendRecv(changecmd + querycmd) + reply = module.communicate(changecmd + querycmd) return self.parse_reply(reply) def send_change(self, module, *values): @@ -256,7 +253,7 @@ class IOHandler(IOHandlerBase): """ changecmd = self.make_change(module, *values) if self.CMDSEPARATOR is None: - module.sendRecv(changecmd) # ignore result + module.communicate(changecmd) # ignore result return self.send_command(module) return self.send_command(module, changecmd + self.CMDSEPARATOR) @@ -283,8 +280,6 @@ class IOHandler(IOHandlerBase): reply = self.send_command(module) # convert them to parameters result = self.analyze(module, *reply) - module.log.debug('result of analyze_%s: %s', self.group, - ', '.join('%s=%r' % kv for kv in result.items())) for pname, value in result.items(): setattr(module, pname, value) for pname in self.parameters: @@ -327,7 +322,6 @@ class IOHandler(IOHandlerBase): change = Change(self, module, valuedict) if force_read: change.readValues() - module.log.debug('call change_%s(%r)', self.group, change) values = self.change(module, change) if values is None: # this indicates that nothing has to be written return diff --git a/secop/lib/__init__.py b/secop/lib/__init__.py index 99826bb..0c2c59d 100644 --- a/secop/lib/__init__.py +++ b/secop/lib/__init__.py @@ -30,57 +30,95 @@ import traceback from configparser import ConfigParser from os import environ, path -CONFIG = {} -unset_value = object() +class GeneralConfig: -def getGeneralConfig(confdir=None): - global CONFIG # pylint: disable=global-statement + def __init__(self): + self._config = None + self.defaults = {} #: default values. may be set before or after :meth:`init` - if CONFIG: - if confdir: - raise ValueError('getGeneralConfig with argument must be called first') - else: + def init(self, configfile=None): + cfg = {} + mandatory = 'piddir', 'logdir', 'confdir' repodir = path.abspath(path.join(path.dirname(__file__), '..', '..')) + # create default paths if path.splitext(sys.executable)[1] == ".exe" and not path.basename(sys.executable).startswith('python'): # special MS windows environment - CONFIG = { - 'piddir': './', - 'logdir': './log', - 'confdir': './', - } - elif not path.exists(path.join(repodir, '.git')): - CONFIG = { - 'piddir': '/var/run/secop', - 'logdir': '/var/log', - 'confdir': '/etc/secop', - } + cfg.update(piddir='./', logdir='./log', confdir='./') + elif path.exists(path.join(repodir, '.git')): + # running from git repo + cfg['confdir'] = path.join(repodir, 'cfg') + # take logdir and piddir from /cfg/generalConfig.cfg else: - CONFIG = { - 'piddir': path.join(repodir, 'pid'), - 'logdir': path.join(repodir, 'log'), - 'confdir': path.join(repodir, 'cfg'), - } - gen_config_path = confdir or environ.get('FRAPPY_CONFIG_FILE', - path.join(CONFIG['confdir'], 'generalConfig.cfg')) - if gen_config_path and path.exists(gen_config_path): + # running on installed system (typically with systemd) + cfg.update(piddir='/var/run/frappy', logdir='/var/log', confdir='/etc/frappy') + if configfile is None: + configfile = environ.get('FRAPPY_CONFIG_FILE', + path.join(cfg['confdir'], 'generalConfig.cfg')) + if configfile and path.exists(configfile): parser = ConfigParser() parser.optionxform = str - parser.read([gen_config_path]) - CONFIG = {} + parser.read([configfile]) + # mandatory in a general config file: + cfg['logdir'] = cfg['piddir'] = None + cfg['confdir'] = path.dirname(configfile) # only the FRAPPY section is relevant, other sections might be used by others for key, value in parser['FRAPPY'].items(): if value.startswith('./'): - CONFIG[key] = path.abspath(path.join(repodir, value)) + cfg[key] = path.abspath(path.join(repodir, value)) else: # expand ~ to username, also in path lists separated with ':' - CONFIG[key] = ':'.join(path.expanduser(v) for v in value.split(':')) + cfg[key] = ':'.join(path.expanduser(v) for v in value.split(':')) else: - for dirname in CONFIG: - CONFIG[dirname] = environ.get('SECOP_%s' % dirname.upper(), CONFIG[dirname]) + for key in mandatory: + cfg[key] = environ.get('FRAPPY_%s' % key.upper(), cfg[key]) + missing_keys = [key for key in mandatory if cfg[key] is None] + if missing_keys: + if path.exists(configfile): + raise KeyError('missing value for %s in %s' % (' and '.join(missing_keys), configfile)) + raise FileNotFoundError(configfile) # this is not customizable - CONFIG['basedir'] = repodir - return CONFIG + cfg['basedir'] = repodir + self._config = cfg + + def __getitem__(self, key): + try: + return self._config[key] + except KeyError: + return self.defaults[key] + except TypeError: + if key in self.defaults: + # accept retrieving defaults before init + # e.g. 'lazy_number_validation' in secop.datatypes + return self.defaults[key] + raise TypeError('generalConfig.init() has to be called first') from None + + def get(self, key, default=None): + try: + return self.__getitem__(key) + except KeyError: + return default + + def getint(self, key, default=None): + try: + return int(self.__getitem__(key)) + except KeyError: + return default + + def __getattr__(self, key): + """goodie: use generalConfig. instead of generalConfig.get('')""" + return self.get(key) + + @property + def initialized(self): + return bool(self._config) + + def testinit(self, **kwds): + """for test purposes""" + self._config = kwds + + +generalConfig = GeneralConfig() class lazy_property: @@ -289,4 +327,4 @@ class UniqueObject: self.name = name def __repr__(self): - return 'UniqueObject(%r)' % self.name + return self.name diff --git a/secop/lib/classdoc.py b/secop/lib/classdoc.py index 4cece4b..18392d7 100644 --- a/secop/lib/classdoc.py +++ b/secop/lib/classdoc.py @@ -74,29 +74,29 @@ SIMPLETYPES = { } -def short_doc(datatype): +def short_doc(datatype, internal=False): # pylint: disable=possibly-unused-variable def doc_EnumType(dt): return 'one of %s' % str(tuple(dt._enum.keys())) def doc_ArrayOf(dt): - return 'array of %s' % short_doc(dt.members) + return 'array of %s' % short_doc(dt.members, True) def doc_TupleOf(dt): - return 'tuple of (%s)' % ', '.join(short_doc(m) for m in dt.members) + return 'tuple of (%s)' % ', '.join(short_doc(m, True) for m in dt.members) def doc_CommandType(dt): - argument = short_doc(dt.argument) if dt.argument else '' - result = ' -> %s' % short_doc(dt.result) if dt.result else '' + argument = short_doc(dt.argument, True) if dt.argument else '' + result = ' -> %s' % short_doc(dt.result, True) if dt.result else '' return '(%s)%s' % (argument, result) # return argument list only def doc_NoneOr(dt): - other = short_doc(dt.other) + other = short_doc(dt.other, True) return '%s or None' % other if other else None def doc_OrType(dt): - types = [short_doc(t) for t in dt.types] + types = [short_doc(t, True) for t in dt.types] if None in types: # type is anyway broad: no doc return None return ' or '.join(types) @@ -104,14 +104,17 @@ def short_doc(datatype): def doc_Stub(dt): return dt.name.replace('Type', '').replace('Range', '').lower() - clsname = datatype.__class__.__name__ + def doc_BLOBType(dt): + return 'byte array' + + clsname = type(datatype).__name__ result = SIMPLETYPES.get(clsname) if result: return result fun = locals().get('doc_' + clsname) if fun: return fun(datatype) - return None # broad type like ValueType: no doc + return clsname if internal else None # broad types like ValueType: no doc def append_to_doc(cls, lines, itemcls, name, attrname, fmtfunc): diff --git a/secop/lib/multievent.py b/secop/lib/multievent.py index 6b3337e..a009c4b 100644 --- a/secop/lib/multievent.py +++ b/secop/lib/multievent.py @@ -21,41 +21,51 @@ # ***************************************************************************** import threading +import time + + +ETERNITY = 1e99 + + +class _SingleEvent: + """Single Event + + remark: :meth:`wait` is not implemented on purpose + """ + def __init__(self, multievent, timeout, name=None): + self.multievent = multievent + self.multievent.clear_(self) + self.name = name + if timeout is None: + self.deadline = ETERNITY + else: + self.deadline = time.monotonic() + timeout + + def clear(self): + self.multievent.clear_(self) + + def set(self): + self.multievent.set_(self) + + def is_set(self): + return self in self.multievent.events class MultiEvent(threading.Event): - """Class implementing multi event objects. + """Class implementing multi event objects.""" - meth:`new` creates Event like objects - meth:'wait` waits for all of them being set - """ - - class SingleEvent: - """Single Event - - remark: :meth:`wait` is not implemented on purpose - """ - def __init__(self, multievent): - self.multievent = multievent - self.multievent._clear(self) - - def clear(self): - self.multievent._clear(self) - - def set(self): - self.multievent._set(self) - - def is_set(self): - return self in self.multievent.events - - def __init__(self): + def __init__(self, default_timeout=None): self.events = set() self._lock = threading.Lock() + self.default_timeout = default_timeout or None # treat 0 as None + self.name = None # default event name + self._actions = [] # actions to be executed on trigger super().__init__() - def new(self): - """create a new SingleEvent""" - return self.SingleEvent(self) + def new(self, timeout=None, name=None): + """create a single event like object""" + return _SingleEvent(self, timeout or self.default_timeout, + name or self.name or '') def set(self): raise ValueError('a multievent must not be set directly') @@ -63,21 +73,69 @@ class MultiEvent(threading.Event): def clear(self): raise ValueError('a multievent must not be cleared directly') - def _set(self, event): + def is_set(self): + return not self.events + + def set_(self, event): """internal: remove event from the event list""" with self._lock: self.events.discard(event) if self.events: return + try: + for action in self._actions: + action() + except Exception: + pass # we silently ignore errors here + self._actions = [] super().set() - def _clear(self, event): + def clear_(self, event): """internal: add event to the event list""" with self._lock: self.events.add(event) super().clear() + def deadline(self): + deadline = 0 + for event in self.events: + deadline = max(event.deadline, deadline) + return None if deadline == ETERNITY else deadline + def wait(self, timeout=None): + """wait for all events being set or timed out""" if not self.events: # do not wait if events are empty - return - super().wait(timeout) + return True + deadline = self.deadline() + if deadline is not None: + deadline -= time.monotonic() + timeout = deadline if timeout is None else min(deadline, timeout) + if timeout <= 0: + return False + return super().wait(timeout) + + def waiting_for(self): + return set(event.name for event in self.events) + + def get_trigger(self, timeout=None, name=None): + """create a new single event and return its set method + + as a convenience method + """ + return self.new(timeout, name).set + + def queue(self, action): + """add an action to the queue of actions to be executed at end + + :param action: a function, to be executed after the last event is triggered, + and before the multievent is set + + - if no events are waiting, the actions are executed immediately + - if an action raises an exception, it is silently ignore and further + actions in the queue are skipped + - if this is not desired, the action should handle errors by itself + """ + with self._lock: + self._actions.append(action) + if self.is_set(): + self.set_(None) diff --git a/secop/lib/sequence.py b/secop/lib/sequence.py index 08e047a..5d416ee 100644 --- a/secop/lib/sequence.py +++ b/secop/lib/sequence.py @@ -137,8 +137,8 @@ class SequencerMixin: if self._seq_fault_on_stop: return self.Status.ERROR, self._seq_stopped return self.Status.WARN, self._seq_stopped - if hasattr(self, 'read_hw_status'): - return self.read_hw_status() + if hasattr(self, 'readHwStatus'): + return self.readHwStatus() return self.Status.IDLE, '' def stop(self): @@ -153,7 +153,7 @@ class SequencerMixin: self._seq_error = str(e) finally: self._seq_thread = None - self.pollParams(0) + self.doPoll() def _seq_thread_inner(self, seq, store_init): store = Namespace() diff --git a/secop/logging.py b/secop/logging.py index 92f009f..2b3291e 100644 --- a/secop/logging.py +++ b/secop/logging.py @@ -1,6 +1,6 @@ +#!/usr/bin/env python # -*- coding: utf-8 -*- # ***************************************************************************** -# # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later @@ -17,24 +17,27 @@ # # Module authors: # Markus Zolliker -# # ***************************************************************************** + import os from os.path import dirname, join +from logging import DEBUG, INFO, addLevelName import mlzlog -from secop.lib import getGeneralConfig +from secop.lib import generalConfig +from secop.datatypes import BoolType +from secop.properties import Property OFF = 99 - -LOG_LEVELS = dict(mlzlog.LOGLEVELS, off=OFF) +COMLOG = 15 +addLevelName(COMLOG, 'COMLOG') +assert DEBUG < COMLOG < INFO +LOG_LEVELS = dict(mlzlog.LOGLEVELS, off=OFF, comlog=COMLOG) LEVEL_NAMES = {v: k for k, v in LOG_LEVELS.items()} -log = None -rootlogdir = None -def checkLevel(level): +def check_level(level): try: if isinstance(level, str): return LOG_LEVELS[level.lower()] @@ -45,83 +48,120 @@ def checkLevel(level): raise ValueError('%r is not a valid level' % level) -def initLogging(loglevel='info'): - global log, rootlogdir # pylint: disable-global-statement +class RemoteLogHandler(mlzlog.Handler): + """handler for remote logging""" + def __init__(self): + super().__init__() + self.subscriptions = {} # dict[modname] of tuple(mobobj, dict [conn] of level) - loglevel = checkLevel(loglevel) - genConfig = getGeneralConfig() - rootname = genConfig.get('rootname', 'secop') - logdir = genConfig.get('logdir') - rootlogdir = join(logdir, rootname) - mlzlog.initLogging(rootname, 'debug', logdir) - for hdl in mlzlog.log.handlers: - hdl.setLevel(loglevel) - return mlzlog.log + def emit(self, record): + """unused""" + + def handle(self, record): + modname = record.name.split('.')[-1] + try: + modobj, subscriptions = self.subscriptions[modname] + except KeyError: + return + for conn, lev in subscriptions.items(): + if record.levelno >= lev: + modobj.DISPATCHER.send_log_msg( + conn, modobj.name, LEVEL_NAMES[record.levelno], + record.getMessage()) + + def set_conn_level(self, modobj, conn, level): + level = check_level(level) + modobj, subscriptions = self.subscriptions.setdefault(modobj.name, (modobj, {})) + if level == OFF: + subscriptions.pop(conn, None) + else: + subscriptions[conn] = level + + def __repr__(self): + return 'RemoteLogHandler()' -class ComlogHandler(mlzlog.LogfileHandler): - """handler for logging communication +class LogfileHandler(mlzlog.LogfileHandler): - communication is - """ + def __init__(self, logdir, rootname, max_days=0): + self.logdir = logdir + self.rootname = rootname + self.max_days = max_days + super().__init__(logdir, rootname) + + def emit(self, record): + if record.levelno != COMLOG: + super().emit(record) + + def doRollover(self): + super().doRollover() + if self.max_days: + # keep only the last max_days files + with os.scandir(dirname(self.baseFilename)) as it: + files = sorted(entry.path for entry in it if entry.name != 'current') + for filepath in files[-self.max_days:]: + os.remove(filepath) + + +class ComLogfileHandler(LogfileHandler): + """handler for logging communication""" def format(self, record): return '%s %s' % (self.formatter.formatTime(record), record.getMessage()) - def doRollover(self): - super().doRollover() - max_days = getGeneralConfig().get('comlog_days', 31) - # keep only the last max_days files - with os.scandir(dirname(self.baseFilename)) as it: - files = sorted(entry.path for entry in it if entry.name != 'current') - for filepath in files[-max_days:]: - os.remove(filepath) + +class HasComlog: + """mixin for modules with comlog""" + comlog = Property('whether communication is logged ', BoolType(), + default=True, export=False) + _comLog = None + + def earlyInit(self): + super().earlyInit() + if self.comlog and generalConfig.initialized and generalConfig.comlog: + self._comLog = mlzlog.Logger('COMLOG.%s' % self.name) + self._comLog.handlers[:] = [] + directory = join(logger.logdir, logger.rootname, 'comlog', self.DISPATCHER.name) + self._comLog.addHandler(ComLogfileHandler( + directory, self.name, max_days=generalConfig.getint('comlog_days', 7))) + return + + def comLog(self, msg, *args, **kwds): + self.log.log(COMLOG, msg, *args, **kwds) + if self._comLog: + self._comLog.info(msg, *args) -def add_comlog_handler(modobj): - global rootlogdir # pylint: disable-global-statement - comlog = getGeneralConfig().get('comlog') - if comlog: - comlog = join(rootlogdir, comlog) - modobj.log.addHandler(ComlogHandler(comlog, modobj.name)) +class MainLogger: + def __init__(self): + self.log = None + self.logdir = None + self.rootname = None + self.console_handler = None + + def init(self, console_level='info'): + self.rootname = generalConfig.get('logger_root', 'frappy') + # set log level to minimum on the logger, effective levels on the handlers + # needed also for RemoteLogHandler + # modified from mlzlog.initLogging + mlzlog.setLoggerClass(mlzlog.MLZLogger) + assert self.log is None + self.log = mlzlog.log = mlzlog.MLZLogger(self.rootname) + + self.log.setLevel(DEBUG) + self.log.addHandler(mlzlog.ColoredConsoleHandler()) + + self.logdir = generalConfig.get('logdir', '/tmp/log') + if self.logdir: + logfile_days = generalConfig.getint('logfile_days') + logfile_handler = LogfileHandler(self.logdir, self.rootname, max_days=logfile_days) + if generalConfig.logfile_days: + logfile_handler.max_days = int(generalConfig.logfile_days) + logfile_handler.setLevel(LOG_LEVELS[generalConfig.get('logfile_level', 'info')]) + self.log.addHandler(logfile_handler) + + self.log.addHandler(RemoteLogHandler()) + self.log.handlers[0].setLevel(LOG_LEVELS[console_level]) -class RemoteLogHandler(mlzlog.Handler): - """handler for remote logging""" - def __init__(self, modobj): - super().__init__() - self.subscriptions = {} # dict [conn] of level - self.modobj = modobj - self.modobj.log.addHandler(self) - self.used_by = set() - - def handle(self, record, name=None): - result = False - for conn, lev in self.subscriptions.items(): - if record.levelno >= lev: - msg = record.getMessage() - if self.modobj.DISPATCHER.send_log_msg( - conn, name or self.modobj.name, LEVEL_NAMES[record.levelno], msg): - result = True - if result: - return True - for master in self.used_by: - # this is an iodev, try to handle by one of our masters - if master.remoteLogHandler.handle(record, self.modobj.name): - return True - return False - - def set_conn_level(self, conn, level): - level = checkLevel(level) - if level == mlzlog.DEBUG: - iodev = getattr(self.modobj, '_iodev', None) - if iodev: - # we want also to see debug messages of iodev - if iodev.remoteLogHandler is None: - iodev.remoteLogHandler = RemoteLogHandler(self) - iodev.remoteLogHandler.used_by.add(self.modobj) - level = checkLevel(level) - if level == OFF: - self.subscriptions.pop(conn, None) - else: - self.subscriptions[conn] = level +logger = MainLogger() diff --git a/secop/modules.py b/secop/modules.py index c2ffb42..fc1929c 100644 --- a/secop/modules.py +++ b/secop/modules.py @@ -23,29 +23,28 @@ """Define base classes for real Modules implemented in the server""" -import sys import time +from queue import Queue, Empty from collections import OrderedDict +from functools import wraps from secop.datatypes import ArrayOf, BoolType, EnumType, FloatRange, \ - IntRange, StatusType, StringType, TextType, TupleOf + IntRange, StatusType, StringType, TextType, TupleOf, DiscouragedConversion from secop.errors import BadValueError, ConfigError, \ ProgrammingError, SECoPError, SilentError, secop_error -from secop.lib import formatException, mkthread, UniqueObject +from secop.lib import formatException, mkthread, UniqueObject, generalConfig from secop.lib.enum import Enum from secop.params import Accessible, Command, Parameter -from secop.poller import BasicPoller, Poller from secop.properties import HasProperties, Property -from secop.logging import RemoteLogHandler, add_comlog_handler +from secop.logging import RemoteLogHandler, HasComlog -class DoneClass: - @classmethod - def __repr__(cls): - return 'Done' +generalConfig.defaults['disable_value_range_check'] = False # check for problematic value range by default +Done = UniqueObject('already set') +"""a special return value for a read/write function -Done = UniqueObject('Done') +indicating that the setter is triggered already""" class HasAccessibles(HasProperties): @@ -90,15 +89,18 @@ class HasAccessibles(HasProperties): else: aobj.merge(merged_properties[aname]) accessibles[aname] = aobj + # rebuild order: (1) inherited items, (2) items from paramOrder, (3) new accessibles # move (2) to the end - for aname in list(cls.__dict__.get('paramOrder', ())): + paramOrder = cls.__dict__.get('paramOrder', ()) + for aname in paramOrder: if aname in accessibles: accessibles.move_to_end(aname) # ignore unknown names # move (3) to the end for aname in new_names: - accessibles.move_to_end(aname) + if aname not in paramOrder: + accessibles.move_to_end(aname) # note: for python < 3.6 the order of inherited items is not ensured between # declarations within the same class cls.accessibles = accessibles @@ -111,12 +113,14 @@ class HasAccessibles(HasProperties): # XXX: create getters for the units of params ?? # wrap of reading/writing funcs - if isinstance(pobj, Command): - # nothing to do for now + if not isinstance(pobj, Parameter): + # nothing to do for Commands continue + rfunc = getattr(cls, 'read_' + pname, None) + # TODO: remove handler stuff here rfunc_handler = pobj.handler.get_read_func(cls, pname) if pobj.handler else None - wrapped = hasattr(rfunc, '__wrapped__') + wrapped = getattr(rfunc, 'wrapped', False) # meaning: wrapped or auto generated if rfunc_handler: if 'read_' + pname in cls.__dict__: if pname in cls.__dict__: @@ -130,72 +134,81 @@ class HasAccessibles(HasProperties): # create wrapper except when read function is already wrapped if not wrapped: - def wrapped_rfunc(self, pname=pname, rfunc=rfunc): - if rfunc: - self.log.debug("call read_%s" % pname) + if rfunc: + + @wraps(rfunc) # handles __wrapped__ and __doc__ + def new_rfunc(self, pname=pname, rfunc=rfunc): try: value = rfunc(self) - self.log.debug("read_%s returned %r" % (pname, value)) - if value is Done: # the setter is already triggered - return getattr(self, pname) + self.log.debug("read_%s returned %r", pname, value) except Exception as e: - self.log.debug("read_%s failed %r" % (pname, e)) - self.announceUpdate(pname, None, e) + self.log.debug("read_%s failed with %r", pname, e) raise - else: - # return cached value - value = self.accessibles[pname].value - self.log.debug("return cached %s = %r" % (pname, value)) - setattr(self, pname, value) # important! trigger the setter - return value + if value is Done: + return getattr(self, pname) + setattr(self, pname, value) # important! trigger the setter + return value - if rfunc: - wrapped_rfunc.__doc__ = rfunc.__doc__ - setattr(cls, 'read_' + pname, wrapped_rfunc) - wrapped_rfunc.__wrapped__ = True + new_rfunc.poll = getattr(rfunc, 'poll', True) + else: + + def new_rfunc(self, pname=pname): + return getattr(self, pname) + + new_rfunc.poll = False + new_rfunc.__doc__ = 'auto generated read method for ' + pname + + new_rfunc.wrapped = True # indicate to subclasses that no more wrapping is needed + setattr(cls, 'read_' + pname, new_rfunc) if not pobj.readonly: wfunc = getattr(cls, 'write_' + pname, None) - wrapped = hasattr(wfunc, '__wrapped__') + wrapped = getattr(wfunc, 'wrapped', False) # meaning: wrapped or auto generated if (wfunc is None or wrapped) and pobj.handler: # ignore the handler, if a write function is present + # TODO: remove handler stuff here wfunc = pobj.handler.get_write_func(pname) wrapped = False # create wrapper except when write function is already wrapped if not wrapped: - def wrapped_wfunc(self, value, pname=pname, wfunc=wfunc): - pobj = self.accessibles[pname] - if wfunc: - self.log.debug("check and call write_%s(%r)" % (pname, value)) + if wfunc: + + @wraps(wfunc) # handles __wrapped__ and __doc__ + def new_wfunc(self, value, pname=pname, wfunc=wfunc): + pobj = self.accessibles[pname] + self.log.debug('validate %r for %r', value, pname) + # we do not need to handle errors here, we do not + # want to make a parameter invalid, when a write failed value = pobj.datatype(value) returned_value = wfunc(self, value) - self.log.debug('write_%s returned %r' % (pname, returned_value)) - if returned_value is Done: # the setter is already triggered + self.log.debug('write_%s(%r) returned %r', pname, value, returned_value) + if returned_value is Done: + # setattr(self, pname, getattr(self, pname)) return getattr(self, pname) - if returned_value is not None: # goodie: accept missing return value - value = returned_value - else: - self.log.debug("check %s = %r" % (pname, value)) - value = pobj.datatype(value) - setattr(self, pname, value) - return value + setattr(self, pname, value) # important! trigger the setter + return value + else: - if wfunc: - wrapped_wfunc.__doc__ = wfunc.__doc__ - setattr(cls, 'write_' + pname, wrapped_wfunc) - wrapped_wfunc.__wrapped__ = True + def new_wfunc(self, value, pname=pname): + setattr(self, pname, value) + return value + + new_wfunc.__doc__ = 'auto generated write method for ' + pname + + new_wfunc.wrapped = True # indicate to subclasses that no more wrapping is needed + setattr(cls, 'write_' + pname, new_wfunc) # check for programming errors - for attrname in cls.__dict__: + for attrname, attrvalue in cls.__dict__.items(): prefix, _, pname = attrname.partition('_') if not pname: continue if prefix == 'do': raise ProgrammingError('%r: old style command %r not supported anymore' % (cls.__name__, attrname)) - elif prefix in ('read', 'write') and not isinstance(accessibles.get(pname), Parameter): + if prefix in ('read', 'write') and not getattr(attrvalue, 'wrapped', False): raise ProgrammingError('%s.%s defined, but %r is no parameter' % (cls.__name__, attrname, pname)) @@ -257,6 +270,9 @@ class Module(HasAccessibles): extname='implementation') interface_classes = Property('offical highest interface-class of the module', ArrayOf(StringType()), extname='interface_classes') + pollinterval = Property('poll interval for parameters handled by doPoll', FloatRange(0.1, 120), default=5) + slowinterval = Property('poll interval for other parameters', FloatRange(0.1, 120), default=15) + enablePoll = True # properties, parameters and commands are auto-merged upon subclassing parameters = {} @@ -264,7 +280,6 @@ class Module(HasAccessibles): # reference to the dispatcher (used for sending async updates) DISPATCHER = None - pollerClass = Poller #: default poller used def __init__(self, name, logger, cfgdict, srv): # remember the dispatcher object (for the async callbacks) @@ -277,6 +292,8 @@ class Module(HasAccessibles): self.earlyInitDone = False self.initModuleDone = False self.startModuleDone = False + self.remoteLogHandler = None + self.changePollinterval = Queue() # used for waiting between polls and transmit info to the thread errors = [] # handle module properties @@ -321,13 +338,6 @@ class Module(HasAccessibles): for aname, aobj in self.accessibles.items(): # make a copy of the Parameter/Command object aobj = aobj.copy() - if isinstance(aobj, Parameter): - # fix default properties poll and needscfg - if aobj.poll is None: - aobj.poll = bool(aobj.handler) - if aobj.needscfg is None: - aobj.needscfg = not aobj.poll - if not self.export: # do not export parameters of a module not exported aobj.export = False if aobj.export: @@ -396,7 +406,7 @@ class Module(HasAccessibles): 'value and was not given in config!' % pname) # we do not want to call the setter for this parameter for now, # this should happen on the first read - pobj.readerror = ConfigError('not initialized') + pobj.readerror = ConfigError('parameter %r not initialized' % pname) # above error will be triggered on activate after startup, # when not all hardware parameters are read because of startup timeout pobj.value = pobj.datatype(pobj.datatype.default) @@ -451,10 +461,6 @@ class Module(HasAccessibles): errors.append('%s: %s' % (aname, e)) if errors: raise ConfigError(errors) - self.remoteLogHandler = None - self._earlyInitDone = False - self._initModuleDone = False - self._startModuleDone = False # helper cfg-editor def __iter__(self): @@ -465,6 +471,8 @@ class Module(HasAccessibles): def announceUpdate(self, pname, value=None, err=None, timestamp=None): """announce a changed value or readerror""" + + # TODO: remove readerror 'property' and replace value with exception pobj = self.parameters[pname] timestamp = timestamp or time.time() changed = pobj.value != value @@ -472,6 +480,11 @@ class Module(HasAccessibles): # store the value even in case of error pobj.value = pobj.datatype(value) except Exception as e: + if isinstance(e, DiscouragedConversion): + if DiscouragedConversion.log_message: + self.log.error(str(e)) + self.log.error('you may disable this behaviour by running the server with --relaxed') + DiscouragedConversion.log_message = False if not err: # do not overwrite given error err = e if err: @@ -554,10 +567,42 @@ class Module(HasAccessibles): """initialise module with stuff to be done after all modules are created""" self.initModuleDone = True - def pollOneParam(self, pname): - """poll parameter with proper error handling""" + def startModule(self, start_events): + """runs after init of all modules + + when a thread is started, a trigger function may signal that it + has finished its initial work + start_events.get_trigger() creates such a trigger and + registers it in the server for waiting + defaults to 30 seconds + """ + if self.enablePoll or self.writeDict: + # enablePoll == False: start poll thread for writing values from writeDict only + mkthread(self.__pollThread, start_events.get_trigger()) + self.startModuleDone = True + + def doPoll(self): + """polls important parameters like value and status + + all other parameters are polled automatically + """ + + def setFastPoll(self, pollinterval): + """change poll interval + + :param pollinterval: a new (typically lower) pollinterval + special values: True: set to 0.25 (default fast poll interval) + False: set to self.pollinterval (value for idle) + """ + if pollinterval is False: + self.changePollinterval.put(self.pollinterval) + return + self.changePollinterval.put(0.25 if pollinterval is True else pollinterval) + + def callPollFunc(self, rfunc): + """call read method with proper error handling""" try: - getattr(self, 'read_' + pname)() + rfunc() except SilentError: pass except SECoPError as e: @@ -565,6 +610,65 @@ class Module(HasAccessibles): except Exception: self.log.error(formatException()) + def __pollThread(self, started_callback): + self.writeInitParams() + if not self.enablePoll: + return + polled_parameters = [] + # collect and call all read functions a first time + for pname, pobj in self.parameters.items(): + rfunc = getattr(self, 'read_' + pname) + if rfunc.poll: + polled_parameters.append((rfunc, pobj)) + self.callPollFunc(rfunc) + started_callback() + pollinterval = self.pollinterval + last_slow = last_main = 0 + last_error = None + error_count = 0 + to_poll = () + while True: + now = time.time() + wait_main = last_main + pollinterval - now + wait_slow = last_slow + self.slowinterval - now + wait_time = min(wait_main, wait_slow) + if wait_time > 0: + try: + result = self.changePollinterval.get(timeout=wait_time) + except Empty: + result = None + if result is not None: + pollinterval = result + continue + # call doPoll, if due + if wait_main <= 0: + last_main = (now // pollinterval) * pollinterval + try: + self.doPoll() + if last_error and error_count > 1: + self.log.info('recovered after %d calls to doPoll (%r)', error_count, last_error) + last_error = None + except Exception as e: + if type(e) != last_error: + error_count = 0 + self.log.error('error in doPoll: %r', e) + error_count += 1 + last_error = e + now = time.time() + # find ONE due slow poll and call it + loop = True + while loop: # loops max. 2 times, when to_poll is at end + for rfunc, pobj in to_poll: + if now > pobj.timestamp + self.slowinterval * 0.5: + self.callPollFunc(rfunc) + loop = False + break + else: + if now < last_slow + self.slowinterval: + break + last_slow = (now // self.slowinterval) * self.slowinterval + to_poll = iter(polled_parameters) + def writeInitParams(self, started_callback=None): """write values for parameters with configured values @@ -587,23 +691,15 @@ class Module(HasAccessibles): if started_callback: started_callback() - def startModule(self, started_callback): - """runs after init of all modules - - started_callback to be called when the thread spawned by startModule - has finished its initial work - might return a timeout value, if different from default - """ - if self.writeDict: - mkthread(self.writeInitParams, started_callback) - else: - started_callback() - self.startModuleDone = True - def setRemoteLogging(self, conn, level): if self.remoteLogHandler is None: - self.remoteLogHandler = RemoteLogHandler(self) - self.remoteLogHandler.set_conn_level(conn, level) + for handler in self.log.handlers: + if isinstance(handler, RemoteLogHandler): + self.remoteLogHandler = handler + break + else: + raise ValueError('remote handler not found') + self.remoteLogHandler.set_conn_level(self, conn, level) class Readable(Module): @@ -618,63 +714,49 @@ class Readable(Module): UNKNOWN=401, ) #: status codes - value = Parameter('current value of the module', FloatRange(), poll=True) + value = Parameter('current value of the module', FloatRange()) status = Parameter('current status of the module', TupleOf(EnumType(Status), StringType()), - default=(Status.IDLE, ''), poll=True) - pollinterval = Parameter('sleeptime between polls', FloatRange(0.1, 120), - default=5, readonly=False) + default=(Status.IDLE, '')) + pollinterval = Parameter('default poll interval', FloatRange(0.1, 120), + default=5, readonly=False, export=True) - def startModule(self, started_callback): - """start basic polling thread""" - if self.pollerClass and issubclass(self.pollerClass, BasicPoller): - # use basic poller for legacy code - mkthread(self.__pollThread, started_callback) - else: - super().startModule(started_callback) + def earlyInit(self): + super().earlyInit() + # trigger a poll interval change when self.pollinterval changes. + # self.setFastPoll with a float argument does the job here + self.valueCallbacks['pollinterval'].append(self.setFastPoll) - def __pollThread(self, started_callback): - while True: - try: - self.__pollThread_inner(started_callback) - except Exception as e: - self.log.exception(e) - self.status = (self.Status.ERROR, 'polling thread could not start') - started_callback() - print(formatException(0, sys.exc_info(), verbose=True)) - time.sleep(10) - - def __pollThread_inner(self, started_callback): - """super simple and super stupid per-module polling thread""" - self.writeInitParams() - i = 0 - fastpoll = self.pollParams(i) - started_callback() - while True: - i += 1 - try: - time.sleep(self.pollinterval * (0.1 if fastpoll else 1)) - except TypeError: - time.sleep(min(self.pollinterval) - if fastpoll else max(self.pollinterval)) - fastpoll = self.pollParams(i) - - def pollParams(self, nr=0): - # Just poll all parameters regularly where polling is enabled - for pname, pobj in self.parameters.items(): - if not pobj.poll: - continue - if nr % abs(int(pobj.poll)) == 0: - # pollParams every 'pobj.pollParams' iteration - self.pollOneParam(pname) - return False + def doPoll(self): + self.read_value() + self.read_status() class Writable(Readable): """basic writable module""" - + disable_value_range_check = Property('disable value range check', BoolType(), default=False) target = Parameter('target value of the module', default=0, readonly=False, datatype=FloatRange(unit='$')) + def __init__(self, name, logger, cfgdict, srv): + super().__init__(name, logger, cfgdict, srv) + value_dt = self.parameters['value'].datatype + target_dt = self.parameters['target'].datatype + try: + # this handles also the cases where the limits on the value are more + # restrictive than on the target + target_dt.compatible(value_dt) + except Exception: + if type(value_dt) == type(target_dt): + raise ConfigError('the target range extends beyond the value range') from None + raise ProgrammingError('the datatypes of target and value are not compatible') from None + if isinstance(value_dt, FloatRange): + if (not self.disable_value_range_check and not generalConfig.disable_value_range_check + and value_dt.problematic_range(target_dt)): + self.log.error('the value range must be bigger than the target range!') + self.log.error('you may disable this error message by running the server with --relaxed') + self.log.error('or by setting the disable_value_range_check property of the module to True') + raise ConfigError('the value range must be bigger than the target range') + class Drivable(Writable): """basic drivable module""" @@ -697,36 +779,14 @@ class Drivable(Writable): """ return 300 <= (status or self.status)[0] < 390 - # improved polling: may poll faster if module is BUSY - def pollParams(self, nr=0): - # poll status first - self.read_status() - fastpoll = self.isBusy() - for pname, pobj in self.parameters.items(): - if not pobj.poll: - continue - if pname == 'status': - # status was already polled above - continue - if ((int(pobj.poll) < 0) and fastpoll) or ( - nr % abs(int(pobj.poll))) == 0: - # poll always if pobj.poll is negative and fastpoll (i.e. Module is busy) - # otherwise poll every 'pobj.poll' iteration - self.pollOneParam(pname) - return fastpoll - @Command(None, result=None) def stop(self): """cease driving, go to IDLE state""" -class Communicator(Module): +class Communicator(HasComlog, Module): """basic abstract communication module""" - def initModule(self): - super().initModule() - add_comlog_handler(self) - @Command(StringType(), result=StringType()) def communicate(self, command): """communicate command @@ -742,15 +802,15 @@ class Attached(Property): assign a module name to this property in the cfg file, and the server will create an attribute with this module - - :param attrname: the name of the to be created attribute. if not given - the attribute name is the property name prepended by an underscore. """ - # we can not put this to properties.py, as it needs datatypes - def __init__(self, attrname=None): - self.attrname = attrname - # we can not make it mandatory, as the check in Module.__init__ will be before auto-assign in HasIodev - super().__init__('attached module', StringType(), mandatory=False) + module = None - def __repr__(self): - return 'Attached(%s)' % (repr(self.attrname) if self.attrname else '') + def __init__(self, description='attached module'): + super().__init__(description, StringType(), mandatory=False) + + def __get__(self, obj, owner): + if obj is None: + return self + if self.module is None: + self.module = obj.DISPATCHER.get_module(super().__get__(obj, owner)) + return self.module diff --git a/secop/params.py b/secop/params.py index 52da84f..c592153 100644 --- a/secop/params.py +++ b/secop/params.py @@ -26,12 +26,11 @@ import inspect from secop.datatypes import BoolType, CommandType, DataType, \ - DataTypeType, EnumType, IntRange, NoneOr, OrType, FloatRange, \ - StringType, StructOf, TextType, TupleOf, ValueType, ArrayOf + DataTypeType, EnumType, NoneOr, OrType, \ + StringType, StructOf, TextType, TupleOf, ValueType from secop.errors import BadValueError, ProgrammingError from secop.properties import HasProperties, Property - -UNSET = object() # an argument not given, not even None +from secop.lib import generalConfig class Accessible(HasProperties): @@ -94,11 +93,6 @@ class Accessible(HasProperties): return '%s(%s)' % (self.__class__.__name__, ', '.join(props)) -historyStruct = StructOf(category=StringType(), label=StringType(), group=StringType(), - stepped=OrType(BoolType(), StringType()), timestep=FloatRange(0, 1), - record_unchanged=BoolType()) - - class Parameter(Accessible): """defines a parameter @@ -139,24 +133,9 @@ class Parameter(Accessible): * True: exported, name automatic. * a string: exported with custom name''', OrType(BoolType(), StringType()), export=False, default=True) - poll = Property( - '''[internal] polling indicator - - may be: - - * None (omitted): will be converted to True/False if handler is/is not None - * False or 0 (never poll this parameter) - * True or 1 (AUTO), converted to SLOW (readonly=False) - DYNAMIC (*status* and *value*) or REGULAR (else) - * 2 (SLOW), polled with lower priority and a multiple of pollinterval - * 3 (REGULAR), polled with pollperiod - * 4 (DYNAMIC), if BUSY, with a fraction of pollinterval, - else polled with pollperiod - ''', NoneOr(IntRange()), - export=False, default=None) needscfg = Property( '[internal] needs value in config', NoneOr(BoolType()), - export=False, default=None) + export=False, default=False) optional = Property( '[internal] is this parameter optional?', BoolType(), export=False, settable=False, default=False) @@ -168,35 +147,6 @@ class Parameter(Accessible): default None: write if given in config''', NoneOr(BoolType()), export=False, default=None, settable=False) - history = Property( - '''[custom] options for history - - for structured types, this is an array of options, to be applied in the order - of the created elements. - - list of options: - - category - - major: should be shown by default in a history chart, default for value and target - - minor: to be shown optionally in a history chart, default for other parameters - - no: history is not saved. default for TextType and ArrayOf - - category is ignored (forced to no) for BlobType - - label - default: : or for main value - - group: - default: unit - - stepped: - whether a curve has to be drawn stepped or connected. - default: True when readonly=False, else False - - timestep: - the desired time step for the curve storage. maximum and default value is 1 sec - ''', - OrType(historyStruct, ArrayOf(historyStruct)), export=True, default={}, settable=False) # used on the instance copy only value = None @@ -205,6 +155,8 @@ class Parameter(Accessible): def __init__(self, description=None, datatype=None, inherit=True, **kwds): super().__init__() + if 'poll' in kwds and generalConfig.tolerate_poll_property: + kwds.pop('poll') if datatype is None: # collect datatype properties. these are not applied, as we have no datatype self.ownProperties = {k: kwds.pop(k) for k in list(kwds) if k not in self.propertyDict} @@ -232,7 +184,6 @@ class Parameter(Accessible): self.ownProperties = {k: getattr(self, k) for k in self.propertyDict} def __get__(self, instance, owner): - # not used yet if instance is None: return self return instance.parameters[self.name].value @@ -253,6 +204,9 @@ class Parameter(Accessible): self.export = '_' + self.name else: raise ProgrammingError('can not use %r as name of a Parameter' % self.name) + if 'export' in self.ownProperties: + # avoid export=True overrides export= + self.ownProperties['export'] = self.export def copy(self): """return a (deep) copy of ourselfs""" @@ -418,6 +372,9 @@ class Command(Accessible): self.export = '_' + name else: raise ProgrammingError('can not use %r as name of a Command' % name) from None + if 'export' in self.ownProperties: + # avoid export=True overrides export= + self.ownProperties['export'] = self.export if not self._inherit: for key, pobj in self.properties.items(): if key not in self.propertyValues: diff --git a/secop/persistent.py b/secop/persistent.py index d426463..0360b84 100644 --- a/secop/persistent.py +++ b/secop/persistent.py @@ -55,7 +55,7 @@ class MyClass(PersistentMixin, ...): import os import json -from secop.lib import getGeneralConfig +from secop.lib import generalConfig from secop.datatypes import EnumType from secop.params import Parameter, Property, Command from secop.modules import HasAccessibles @@ -69,7 +69,7 @@ class PersistentParam(Parameter): class PersistentMixin(HasAccessibles): def __init__(self, *args, **kwds): super().__init__(*args, **kwds) - persistentdir = os.path.join(getGeneralConfig()['logdir'], 'persistent') + persistentdir = os.path.join(generalConfig.logdir, 'persistent') os.makedirs(persistentdir, exist_ok=True) self.persistentFile = os.path.join(persistentdir, '%s.%s.json' % (self.DISPATCHER.equipment_id, self.name)) self.initData = {} @@ -103,6 +103,7 @@ class PersistentMixin(HasAccessibles): try: value = pobj.datatype.import_value(self.persistentData[pname]) pobj.value = value + pobj.readerror = None if not pobj.readonly: writeDict[pname] = value except Exception as e: @@ -144,6 +145,6 @@ class PersistentMixin(HasAccessibles): @Command() def factory_reset(self): - """reset to initial values, forget persistent data""" + """reset to values from config / default values""" self.writeDict.update(self.initData) self.writeInitParams() diff --git a/secop/properties.py b/secop/properties.py index 047c147..f12d5a9 100644 --- a/secop/properties.py +++ b/secop/properties.py @@ -26,8 +26,11 @@ import inspect from secop.errors import BadValueError, ConfigError, ProgrammingError +from secop.lib import UniqueObject from secop.lib.py35compat import Object +UNSET = UniqueObject('undefined value') #: an unset value, not even None + class HasDescriptors(Object): @classmethod @@ -39,9 +42,6 @@ class HasDescriptors(Object): raise ProgrammingError('misplaced trailing comma after %s.%s' % (cls.__name__, '/'.join(bad))) -UNSET = object() # an unset value, not even None - - # storage for 'properties of a property' class Property: """base class holding info about a property @@ -138,17 +138,18 @@ class HasProperties(HasDescriptors): # treat overriding properties with bare values for pn, po in properties.items(): value = getattr(cls, pn, po) - if not isinstance(value, Property): # attribute is a bare value + if not isinstance(value, (Property, HasProperties)): # attribute may be a bare value + # HasProperties is a base class of Parameter -> allow a Parameter to override a Property () po = po.copy() try: + # try to apply bare value to Property po.value = po.datatype(value) except BadValueError: - if pn in properties: - if callable(value): - raise ProgrammingError('method %s.%s collides with property of %s' % - (cls.__name__, pn, base.__name__)) from None - raise ProgrammingError('can not set property %s.%s to %r' % - (cls.__name__, pn, value)) from None + if callable(value): + raise ProgrammingError('method %s.%s collides with property of %s' % + (cls.__name__, pn, base.__name__)) from None + raise ProgrammingError('can not set property %s.%s to %r' % + (cls.__name__, pn, value)) from None cls.propertyDict[pn] = po def checkProperties(self): diff --git a/secop/protocol/__init__.py b/secop/protocol/__init__.py index a9803ba..25e867f 100644 --- a/secop/protocol/__init__.py +++ b/secop/protocol/__init__.py @@ -19,4 +19,4 @@ # Enrico Faulhaber # # ***************************************************************************** -"""SECoP protocl specific stuff""" +"""SECoP protocol specific stuff""" diff --git a/secop/protocol/dispatcher.py b/secop/protocol/dispatcher.py index fc392e4..311f1d8 100644 --- a/secop/protocol/dispatcher.py +++ b/secop/protocol/dispatcher.py @@ -48,7 +48,7 @@ from secop.params import Parameter from secop.protocol.messages import COMMANDREPLY, DESCRIPTIONREPLY, \ DISABLEEVENTSREPLY, ENABLEEVENTSREPLY, ERRORPREFIX, EVENTREPLY, \ HEARTBEATREPLY, IDENTREPLY, IDENTREQUEST, READREPLY, WRITEREPLY, \ - ERRORCLOSED + LOGGING_REPLY, LOG_EVENT def make_update(modulename, pobj): @@ -83,6 +83,7 @@ class Dispatcher: # eventname is or : self._subscriptions = {} self._lock = threading.RLock() + self.name = name self.restart = srv.restart self.shutdown = srv.shutdown @@ -215,9 +216,14 @@ class Dispatcher: if cobj is None: raise NoSuchCommandError('Module %r has no command %r' % (modulename, cname or exportedname)) + if cobj.argument: + argument = cobj.argument.import_value(argument) # now call func # note: exceptions are handled in handle_request, not here! - return cobj.do(moduleobj, argument), dict(t=currenttime()) + result = cobj.do(moduleobj, argument) + if cobj.result: + result = cobj.result.export_value(result) + return result, dict(t=currenttime()) def _setParameterValue(self, modulename, exportedname, value): moduleobj = self.get_module(modulename) @@ -237,13 +243,9 @@ class Dispatcher: # validate! value = pobj.datatype(value) - writefunc = getattr(moduleobj, 'write_%s' % pname, None) # note: exceptions are handled in handle_request, not here! - if writefunc: - # return value is ignored here, as it is automatically set on the pobj and broadcast - writefunc(value) - else: - setattr(moduleobj, pname, value) + getattr(moduleobj, 'write_' + pname)(value) + # return value is ignored here, as already handled return pobj.export_value(), dict(t=pobj.timestamp) if pobj.timestamp else {} def _getParameterValue(self, modulename, exportedname): @@ -260,11 +262,9 @@ class Dispatcher: # raise ReadOnlyError('This parameter is constant and can not be accessed remotely.') return pobj.datatype.export_value(pobj.constant) - readfunc = getattr(moduleobj, 'read_%s' % pname, None) - if readfunc: - # should also update the pobj (via the setter from the metaclass) - # note: exceptions are handled in handle_request, not here! - readfunc() + # note: exceptions are handled in handle_request, not here! + getattr(moduleobj, 'read_' + pname)() + # return value is ignored here, as already handled return pobj.export_value(), dict(t=pobj.timestamp) if pobj.timestamp else {} # @@ -299,7 +299,6 @@ class Dispatcher: self.log.error('should have been handled in the interface!') def handle__ident(self, conn, specifier, data): - self._active_connections.discard(conn) return (IDENTREPLY, None, None) def handle_describe(self, conn, specifier, data): @@ -378,10 +377,7 @@ class Dispatcher: def send_log_msg(self, conn, modname, level, msg): """send log message """ - if conn in self._connections: - conn.send_reply(('log', '%s:%s' % (modname, level), msg)) - return True - return False + conn.send_reply((LOG_EVENT, '%s:%s' % (modname, level), msg)) def set_all_log_levels(self, conn, level): for modobj in self._modules.values(): @@ -390,19 +386,7 @@ class Dispatcher: def handle_logging(self, conn, specifier, level): if specifier and specifier != '.': modobj = self._modules[specifier] - iodev = getattr(modobj, '_iodev', None) - if iodev and iodev.remoteLogHandler is None: - iodev.setRemoteLogging(conn, 'off') - iodev.remoteLogHandler.used_by.add(modobj) modobj.setRemoteLogging(conn, level) else: self.set_all_log_levels(conn, level) - return 'logging', specifier, level - - def close(self): - for conn in self._connections: - try: - # - may be used for the 'closed' message in serial interface - conn.close_message((ERRORCLOSED, None, None)) - except AttributeError: - pass + return LOGGING_REPLY, specifier, level diff --git a/secop/protocol/messages.py b/secop/protocol/messages.py index b6d08f4..44edfc4 100644 --- a/secop/protocol/messages.py +++ b/secop/protocol/messages.py @@ -62,11 +62,16 @@ HEARTBEATREPLY = 'pong' # +nonce_without_space ERRORPREFIX = 'error_' # + specifier + json_extended_info(error_report) -ERRORCLOSED = 'error_closed' - HELPREQUEST = 'help' # literal HELPREPLY = 'helping' # +line number +json_text +LOGGING_REQUEST = 'logging' +LOGGING_REPLY = 'logging' +# + [module] + json string (loglevel) + +LOG_EVENT = 'log' +# + [module:level] + json_string (message) + # helper mapping to find the REPLY for a REQUEST # do not put IDENTREQUEST/IDENTREPLY here, as this needs anyway extra treatment REQUEST2REPLY = { @@ -79,6 +84,7 @@ REQUEST2REPLY = { READREQUEST: READREPLY, HEARTBEATREQUEST: HEARTBEATREPLY, HELPREQUEST: HELPREPLY, + LOGGING_REQUEST: LOGGING_REPLY, } @@ -91,6 +97,8 @@ HelpMessage = """Try one of the following: '%s ' to request a heartbeat response '%s' to activate async updates '%s' to deactivate updates + '%s [] ' to activate logging events """ % (IDENTREQUEST, DESCRIPTIONREQUEST, READREQUEST, WRITEREQUEST, COMMANDREQUEST, HEARTBEATREQUEST, - ENABLEEVENTSREQUEST, DISABLEEVENTSREQUEST) + ENABLEEVENTSREQUEST, DISABLEEVENTSREQUEST, + LOGGING_REQUEST) diff --git a/secop/proxy.py b/secop/proxy.py index 5d01401..7189fa8 100644 --- a/secop/proxy.py +++ b/secop/proxy.py @@ -29,17 +29,17 @@ from secop.lib import get_class from secop.modules import Drivable, Module, Readable, Writable from secop.params import Command, Parameter from secop.properties import Property -from secop.io import HasIodev +from secop.io import HasIO -class ProxyModule(HasIodev, Module): +class ProxyModule(HasIO, Module): module = Property('remote module name', datatype=StringType(), default='') - pollerClass = None _consistency_check_done = False _secnode = None + enablePoll = False - def iodevClass(self, name, logger, opts, srv): + def ioClass(self, name, logger, opts, srv): opts['description'] = 'secnode %s on %s' % (opts.get('module', name), opts['uri']) return SecNode(name, logger, opts, srv) @@ -54,7 +54,7 @@ class ProxyModule(HasIodev, Module): def initModule(self): if not self.module: self.module = self.name - self._secnode = self._iodev.secnode + self._secnode = self.io.secnode self._secnode.register_callback(self.module, self.updateEvent, self.descriptiveDataChange, self.nodeStateChange) super().initModule() @@ -123,7 +123,8 @@ class ProxyModule(HasIodev, Module): self.announceUpdate('status', newstatus) def checkProperties(self): - pass # skip + pass # skip + class ProxyReadable(ProxyModule, Readable): pass @@ -144,10 +145,12 @@ class SecNode(Module): uri = Property('uri of a SEC node', datatype=StringType()) def earlyInit(self): + super().earlyInit() self.secnode = SecopClient(self.uri, self.log) - def startModule(self, started_callback): - self.secnode.spawn_connect(started_callback) + def startModule(self, start_events): + super().startModule(start_events) + self.secnode.spawn_connect(start_events.get_trigger()) @Command(StringType(), result=StringType()) def request(self, msg): @@ -182,7 +185,7 @@ def proxy_class(remote_class, name=None): for aname, aobj in rcls.accessibles.items(): if isinstance(aobj, Parameter): - pobj = aobj.merge(dict(poll=False, handler=None, needscfg=False)) + pobj = aobj.merge(dict(handler=None, needscfg=False)) attrs[aname] = pobj def rfunc(self, pname=aname): @@ -225,5 +228,5 @@ def Proxy(name, logger, cfgdict, srv): remote_class = cfgdict.pop('remote_class') if 'description' not in cfgdict: cfgdict['description'] = 'remote module %s on %s' % ( - cfgdict.get('module', name), cfgdict.get('iodev', '?')) + cfgdict.get('module', name), cfgdict.get('io', '?')) return proxy_class(remote_class)(name, logger, cfgdict, srv) diff --git a/secop/server.py b/secop/server.py index c09ac3b..d6253f8 100644 --- a/secop/server.py +++ b/secop/server.py @@ -27,14 +27,13 @@ import ast import configparser import os import sys -import threading -import time import traceback from collections import OrderedDict -from secop.errors import ConfigError, SECoPError -from secop.lib import formatException, get_class, getGeneralConfig -from secop.modules import Attached +from secop.errors import ConfigError +from secop.lib import formatException, get_class, generalConfig +from secop.lib.multievent import MultiEvent +from secop.params import PREDEFINED_ACCESSIBLES try: from daemon import DaemonContext @@ -88,7 +87,6 @@ class Server: ... """ self._testonly = testonly - cfg = getGeneralConfig() self.log = parent_logger.getChild(name, True) if not cfgfiles: @@ -113,23 +111,21 @@ class Server: if ambiguous_sections: self.log.warning('ambiguous sections in %s: %r' % (cfgfiles, tuple(ambiguous_sections))) self._cfgfiles = cfgfiles - self._pidfile = os.path.join(cfg['piddir'], name + '.pid') - self.close_callbacks = [] + self._pidfile = os.path.join(generalConfig.piddir, name + '.pid') def loadCfgFile(self, cfgfile): if not cfgfile.endswith('.cfg'): cfgfile += '.cfg' - cfg = getGeneralConfig() if os.sep in cfgfile: # specified as full path filename = cfgfile if os.path.exists(cfgfile) else None else: - for filename in [os.path.join(d, cfgfile) for d in cfg['confdir'].split(os.pathsep)]: + for filename in [os.path.join(d, cfgfile) for d in generalConfig.confdir.split(os.pathsep)]: if os.path.exists(filename): break else: filename = None if filename is None: - raise ConfigError("Couldn't find cfg file %r in %s" % (cfgfile, cfg['confdir'])) + raise ConfigError("Couldn't find cfg file %r in %s" % (cfgfile, generalConfig.confdir)) self.log.debug('Parse config file %s ...' % filename) result = OrderedDict() parser = configparser.ConfigParser() @@ -207,11 +203,8 @@ class Server: self.log.info('startup done, handling transport messages') if systemd: systemd.daemon.notify("READY=1\nSTATUS=accepting requests") - try: - self.interface.serve_forever() - except KeyboardInterrupt as e: - self._restart = False - self.close() + self.interface.serve_forever() + self.interface.server_close() if self._restart: self.restart_hook() self.log.info('restart') @@ -227,12 +220,6 @@ class Server: self._restart = False self.interface.shutdown() - def close(self): - self.dispatcher.close() - self.interface.server_close() - for cb in self.close_callbacks: - cb() - def _processCfg(self): errors = [] opts = dict(self.node_cfg) @@ -277,40 +264,41 @@ class Server: failure_traceback = traceback.format_exc() errors.append('error creating %s' % modname) - poll_table = dict() + missing_super = set() # all objs created, now start them up and interconnect for modname, modobj in self.modules.items(): self.log.info('registering module %r' % modname) self.dispatcher.register_module(modobj, modname, modobj.export) - if modobj.pollerClass is not None: - # a module might be explicitly excluded from polling by setting pollerClass to None - modobj.pollerClass.add_to_table(poll_table, modobj) # also call earlyInit on the modules modobj.earlyInit() if not modobj.earlyInitDone: - modobj.log.warning('missing supercall to earlyInit') - - # handle attached modules - for modname, modobj in self.modules.items(): - for propname, propobj in modobj.propertyDict.items(): - if isinstance(propobj, Attached): - try: - setattr(modobj, propobj.attrname or '_' + propname, - self.dispatcher.get_module(getattr(modobj, propname))) - except SECoPError as e: - errors.append('module %s, attached %s: %s' % (modname, propname, str(e))) + missing_super.add('%s was not called, probably missing super call' + % modobj.earlyInit.__qualname__) # call init on each module after registering all for modname, modobj in self.modules.items(): try: modobj.initModule() if not modobj.initModuleDone: - modobj.log.warning('missing supercall to initModule') + missing_super.add('%s was not called, probably missing super call' + % modobj.initModule.__qualname__) except Exception as e: if failure_traceback is None: failure_traceback = traceback.format_exc() errors.append('error initializing %s: %r' % (modname, e)) + if self._testonly: + return + start_events = MultiEvent(default_timeout=30) + for modname, modobj in self.modules.items(): + # startModule must return either a timeout value or None (default 30 sec) + start_events.name = 'module %s' % modname + modobj.startModule(start_events) + if not modobj.startModuleDone: + missing_super.add('%s was not called, probably missing super call' + % modobj.startModule.__qualname__) + + errors.extend(missing_super) if errors: for errtxt in errors: for line in errtxt.split('\n'): @@ -322,35 +310,20 @@ class Server: sys.stderr.write(failure_traceback) sys.exit(1) - if self._testonly: - return - start_events = [] - for modname, modobj in self.modules.items(): - event = threading.Event() - # startModule must return either a timeout value or None (default 30 sec) - timeout = modobj.startModule(started_callback=event.set) or 30 - if not modobj.startModuleDone: - modobj.log.warning('missing supercall to startModule') - start_events.append((time.time() + timeout, 'module %s' % modname, event)) - for poller in poll_table.values(): - event = threading.Event() - # poller.start must return either a timeout value or None (default 30 sec) - timeout = poller.start(started_callback=event.set) or 30 - start_events.append((time.time() + timeout, repr(poller), event)) - self.log.info('waiting for modules and pollers being started') - for deadline, name, event in sorted(start_events): - if not event.wait(timeout=max(0, deadline - time.time())): - self.log.info('WARNING: timeout when starting %s' % name) - self.log.info('all modules and pollers started') + self.log.info('waiting for modules being started') + start_events.name = None + if not start_events.wait(): + # some timeout happened + for name in start_events.waiting_for(): + self.log.warning('timeout when starting %s' % name) + self.log.info('all modules started') history_path = os.environ.get('FRAPPY_HISTORY') if history_path: - try: - from secop.historywriter import FrappyHistory # pylint: disable=import-outside-toplevel - history = FrappyHistory(history_path, self.modules, self.log.getChild('history')) - self.close_callbacks.append(history.close) - except ImportError: - raise - self.log.warning('FRAPPY_HISTORY is defined, but frappyhistory package not available') + from secop_psi.historywriter import FrappyHistoryWriter # pylint: disable=import-outside-toplevel + writer = FrappyHistoryWriter(history_path, PREDEFINED_ACCESSIBLES.keys(), self.dispatcher) + # treat writer as a connection + self.dispatcher.add_connection(writer) + writer.init(self.dispatcher.handle_describe(writer, None, None)) # TODO: if ever somebody wants to implement an other history writer: # - a general config file /etc/secp/secop.conf or /etc/secop.conf # might be introduced, which contains the log, pid and cfg directory path and diff --git a/secop/simulation.py b/secop/simulation.py index c4b220d..c974ff9 100644 --- a/secop/simulation.py +++ b/secop/simulation.py @@ -27,13 +27,10 @@ from time import sleep from secop.datatypes import FloatRange from secop.lib import mkthread -from secop.modules import BasicPoller, Drivable, \ - Module, Parameter, Readable, Writable, Command +from secop.modules import Drivable, Module, Parameter, Readable, Writable, Command class SimBase: - pollerClass = BasicPoller - def __new__(cls, devname, logger, cfgdict, dispatcher): extra_params = cfgdict.pop('extra_params', '') or cfgdict.pop('.extra_params', '') attrs = {} @@ -60,6 +57,7 @@ class SimBase: return object.__new__(type('SimBase_%s' % devname, (cls,), attrs)) def initModule(self): + super().initModule() self._sim_thread = mkthread(self._sim) def _sim(self): @@ -119,7 +117,7 @@ class SimDrivable(SimReadable, Drivable): self._value = self.target speed *= self.interval try: - self.pollParams(0) + self.doPoll() except Exception: pass @@ -132,7 +130,7 @@ class SimDrivable(SimReadable, Drivable): self._value = self.target sleep(self.interval) try: - self.pollParams(0) + self.doPoll() except Exception: pass self.status = self.Status.IDLE, ''