# -*- coding: utf-8 -*- # ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Enrico Faulhaber # Markus Zolliker # # ***************************************************************************** """Define validated data types.""" # pylint: disable=abstract-method import sys from base64 import b64decode, b64encode from secop.errors import ProgrammingError, ProtocolError, BadValueError, ConfigError from secop.lib import clamp from secop.lib.enum import Enum from secop.parse import Parser from secop.properties import HasProperties, Property # Only export these classes for 'from secop.datatypes import *' __all__ = [ 'DataType', 'get_datatype', 'FloatRange', 'IntRange', 'ScaledInteger', 'BoolType', 'EnumType', 'BLOBType', 'StringType', 'TextType', 'TupleOf', 'ArrayOf', 'StructOf', 'CommandType', 'StatusType', ] # *DEFAULT* limits for IntRange/ScaledIntegers transport serialisation DEFAULT_MIN_INT = -16777216 DEFAULT_MAX_INT = 16777216 UNLIMITED = 1 << 64 # internal limit for integers, is probably high enough for any datatype size Parser = Parser() # base class for all DataTypes class DataType(HasProperties): """base class for all data types""" IS_COMMAND = False unit = '' default = None def __call__(self, value): """check if given value (a python obj) is valid for this datatype returns the value or raises an appropriate exception""" raise NotImplementedError def from_string(self, text): """interprets a given string and returns a validated (internal) value""" # to evaluate values from configfiles, ui, etc... raise NotImplementedError def export_datatype(self): """return a python object which after jsonifying identifies this datatype""" raise NotImplementedError def export_value(self, value): """if needed, reformat value for transport""" return value def import_value(self, value): """opposite of export_value, reformat from transport to internal repr note: for importing from gui/configfile/commandline use :meth:`from_string` instead. """ return value def format_value(self, value, unit=None): """format a value of this type into a str string This is intended for 'nice' formatting for humans and is NOT the opposite of :meth:`from_string` if unit is given, use it, else use the unit of the datatype (if any)""" raise NotImplementedError def set_properties(self, **kwds): """init datatype properties""" try: for k, v in kwds.items(): self.setProperty(k, v) self.checkProperties() except Exception as e: raise ProgrammingError(str(e)) def get_info(self, **kwds): """prepare dict for export or repr get a dict with all items different from default plus mandatory keys from kwds""" result = self.exportProperties() result.update(kwds) return result def copy(self): """make a deep copy of the datatype""" # looks like the simplest way to make a deep copy return get_datatype(self.export_datatype()) def compatible(self, other): """check other for compatibility raise an exception if is not compatible, i.e. there exists a value which is valid for ourselfs, but not for """ raise NotImplementedError class Stub(DataType): """incomplete datatype, to be replaced with a proper one later during module load this workaround because datatypes need properties with datatypes defined later """ def __init__(self, datatype_name, *args): super().__init__() self.name = datatype_name self.args = args def __call__(self, value): """validate""" return value @classmethod def fix_datatypes(cls): """replace stubs with real datatypes for all DataType classes in this module to be called after all involved datatypes are defined """ for dtcls in globals().values(): if isinstance(dtcls, type) and issubclass(dtcls, DataType): for prop in dtcls.propertyDict.values(): stub = prop.datatype if isinstance(stub, cls): prop.datatype = globals()[stub.name](*stub.args) # SECoP types: class FloatRange(DataType): """(restricted) float type :param minval: (property **min**) :param maxval: (property **max**) :param kwds: any of the properties below """ min = Property('low limit', Stub('FloatRange'), extname='min', default=-sys.float_info.max) max = Property('high limit', Stub('FloatRange'), extname='max', default=sys.float_info.max) unit = Property('physical unit', Stub('StringType'), extname='unit', default='') fmtstr = Property('format string', Stub('StringType'), extname='fmtstr', default='%g') absolute_resolution = Property('absolute resolution', Stub('FloatRange', 0), extname='absolute_resolution', default=0.0) relative_resolution = Property('relative resolution', Stub('FloatRange', 0), extname='relative_resolution', default=1.2e-7) def __init__(self, minval=None, maxval=None, **kwds): super().__init__() kwds['min'] = minval if minval is not None else -sys.float_info.max kwds['max'] = maxval if maxval is not None else sys.float_info.max self.set_properties(**kwds) def checkProperties(self): self.default = 0 if self.min <= 0 <= self.max else self.min super().checkProperties() if '%' not in self.fmtstr: raise ConfigError('Invalid fmtstr!') def export_datatype(self): return self.get_info(type='double') def __call__(self, value): try: value = float(value) except Exception: raise BadValueError('Can not __call__ %r to float' % value) # map +/-infty to +/-max possible number value = clamp(-sys.float_info.max, value, sys.float_info.max) # now check the limits prec = max(abs(value * self.relative_resolution), self.absolute_resolution) if self.min - prec <= value <= self.max + prec: return min(max(value, self.min), self.max) raise BadValueError('%.14g should be a float between %.14g and %.14g' % (value, self.min, self.max)) def __repr__(self): hints = self.get_info() if 'min' in hints: hints['minval'] = hints.pop('min') if 'max' in hints: hints['maxval'] = hints.pop('max') return 'FloatRange(%s)' % (', '.join('%s=%r' % (k, v) for k, v in hints.items())) def export_value(self, value): """returns a python object fit for serialisation""" return float(value) def import_value(self, value): """returns a python object from serialisation""" return float(value) def from_string(self, text): value = float(text) return self(value) def format_value(self, value, unit=None): if unit is None: unit = self.unit if unit: return ' '.join([self.fmtstr % value, unit]) return self.fmtstr % value def compatible(self, other): if not isinstance(other, (FloatRange, ScaledInteger)): raise BadValueError('incompatible datatypes') # avoid infinity other(max(sys.float_info.min, self.min)) other(min(sys.float_info.max, self.max)) class IntRange(DataType): """restricted int type :param minval: (property **min**) :param maxval: (property **max**) """ min = Property('minimum value', Stub('IntRange', -UNLIMITED, UNLIMITED), extname='min', mandatory=True) max = Property('maximum value', Stub('IntRange', -UNLIMITED, UNLIMITED), extname='max', mandatory=True) # a unit on an int is now allowed in SECoP, but do we need them in Frappy? # unit = Property('physical unit', StringType(), extname='unit', default='') def __init__(self, minval=None, maxval=None): super().__init__() self.set_properties(min=DEFAULT_MIN_INT if minval is None else minval, max=DEFAULT_MAX_INT if maxval is None else maxval) def checkProperties(self): self.default = 0 if self.min <= 0 <= self.max else self.min super().checkProperties() def export_datatype(self): return self.get_info(type='int') def __call__(self, value): try: fvalue = float(value) value = int(value) if not self.min <= value <= self.max or round(fvalue) != fvalue: raise BadValueError('%r should be an int between %d and %d' % (value, self.min, self.max)) return value except Exception: raise BadValueError('Can not convert %r to int' % value) def __repr__(self): args = (self.min, self.max) if args[1] == DEFAULT_MAX_INT: args = args[:1] if args[0] == DEFAULT_MIN_INT: args = () return 'IntRange%s' % repr(args) def export_value(self, value): """returns a python object fit for serialisation""" return int(value) def import_value(self, value): """returns a python object from serialisation""" return int(value) def from_string(self, text): value = int(text) return self(value) def format_value(self, value, unit=None): return '%d' % value def compatible(self, other): if isinstance(other, IntRange): other(self.min) other(self.max) return # this will accept some EnumType, BoolType for i in range(self.min, self.max + 1): other(i) class ScaledInteger(DataType): """scaled integer (= fixed resolution float) type :param minval: (property **min**) :param maxval: (property **max**) :param kwds: any of the properties below note: limits are for the scaled float value the scale is only used for calculating to/from transport serialisation """ scale = Property('scale factor', FloatRange(sys.float_info.min), extname='scale', mandatory=True) min = Property('low limit', FloatRange(), extname='min', mandatory=True) max = Property('high limit', FloatRange(), extname='max', mandatory=True) unit = Property('physical unit', Stub('StringType'), extname='unit', default='') fmtstr = Property('format string', Stub('StringType'), extname='fmtstr', default='%g') absolute_resolution = Property('absolute resolution', FloatRange(0), extname='absolute_resolution', default=0.0) relative_resolution = Property('relative resolution', FloatRange(0), extname='relative_resolution', default=1.2e-7) def __init__(self, scale, minval=None, maxval=None, absolute_resolution=None, **kwds): super().__init__() scale = float(scale) if absolute_resolution is None: absolute_resolution = scale self.set_properties( scale=scale, min=DEFAULT_MIN_INT * scale if minval is None else float(minval), max=DEFAULT_MAX_INT * scale if maxval is None else float(maxval), absolute_resolution=absolute_resolution, **kwds) def checkProperties(self): self.default = 0 if self.min <= 0 <= self.max else self.min super().checkProperties() # check values if '%' not in self.fmtstr: raise BadValueError('Invalid fmtstr!') # Remark: Datatype.copy() will round min, max to a multiple of self.scale # this should be o.k. def exportProperties(self): result = super().exportProperties() if self.absolute_resolution == 0: result['absolute_resolution'] = 0 elif self.absolute_resolution == self.scale: result.pop('absolute_resolution', 0) return result def setProperty(self, key, value): if key == 'scale' and self.absolute_resolution == self.scale: super().setProperty('absolute_resolution', value) super().setProperty(key, value) def export_datatype(self): return self.get_info(type='scaled', min=int((self.min + self.scale * 0.5) // self.scale), max=int((self.max + self.scale * 0.5) // self.scale)) def __call__(self, value): try: value = float(value) except Exception: raise BadValueError('Can not convert %r to float' % value) prec = max(self.scale, abs(value * self.relative_resolution), self.absolute_resolution) if self.min - prec <= value <= self.max + prec: value = min(max(value, self.min), self.max) else: raise BadValueError('%g should be a float between %g and %g' % (value, self.min, self.max)) intval = int((value + self.scale * 0.5) // self.scale) value = float(intval * self.scale) return value # return 'actual' value (which is more discrete than a float) def __repr__(self): hints = self.get_info(scale=float('%g' % self.scale), min=int((self.min + self.scale * 0.5) // self.scale), max=int((self.max + self.scale * 0.5) // self.scale)) return 'ScaledInteger(%s)' % (', '.join('%s=%r' % kv for kv in hints.items())) def export_value(self, value): """returns a python object fit for serialisation""" # note: round behaves different in Py2 vs. Py3, so use floor division return int((value + self.scale * 0.5) // self.scale) def import_value(self, value): """returns a python object from serialisation""" return self.scale * int(value) def from_string(self, text): value = float(text) return self(value) def format_value(self, value, unit=None): if unit is None: unit = self.unit if unit: return ' '.join([self.fmtstr % value, unit]) return self.fmtstr % value def compatible(self, other): if not isinstance(other, (FloatRange, ScaledInteger)): raise BadValueError('incompatible datatypes') other(self.min) other(self.max) class EnumType(DataType): """enumeration :param enum_or_name: the name of the Enum or an Enum to inherit from :param members: members dict or None when using kwds only :param kwds: (additional) members """ def __init__(self, enum_or_name='', *, members=None, **kwds): super().__init__() if members is not None: kwds.update(members) self._enum = Enum(enum_or_name, **kwds) self.default = self._enum[self._enum.members[0]] def copy(self): # as the name is not exported, we have to implement copy ourselfs return EnumType(self._enum) def export_datatype(self): return {'type': 'enum', 'members': dict((m.name, m.value) for m in self._enum.members)} def __repr__(self): return "EnumType(%r, %s)" % (self._enum.name, ', '.join('%s=%d' % (m.name, m.value) for m in self._enum.members)) def export_value(self, value): """returns a python object fit for serialisation""" return int(self(value)) def import_value(self, value): """returns a python object from serialisation""" return self(value) def __call__(self, value): """return the validated (internal) value or raise""" try: return self._enum[value] except (KeyError, TypeError): # TypeError will be raised when value is not hashable raise BadValueError('%r is not a member of enum %r' % (value, self._enum)) def from_string(self, text): return self(text) def format_value(self, value, unit=None): return '%s<%s>' % (self._enum[value].name, self._enum[value].value) def set_name(self, name): self._enum.name = name def compatible(self, other): for m in self._enum.members: other(m) class BLOBType(DataType): """binary large object internally treated as bytes """ minbytes = Property('minimum number of bytes', IntRange(0), extname='minbytes', default=0) maxbytes = Property('maximum number of bytes', IntRange(0), extname='maxbytes', mandatory=True) def __init__(self, minbytes=0, maxbytes=None): super().__init__() # if only one argument is given, use exactly that many bytes # if nothing is given, default to 255 if maxbytes is None: maxbytes = minbytes or 255 self.set_properties(minbytes=minbytes, maxbytes=maxbytes) def checkProperties(self): self.default = b'\0' * self.minbytes super().checkProperties() def export_datatype(self): return self.get_info(type='blob') def __repr__(self): return 'BLOBType(%d, %d)' % (self.minbytes, self.maxbytes) def __call__(self, value): """return the validated (internal) value or raise""" if not isinstance(value, bytes): raise BadValueError('%s has the wrong type!' % repr(value)) size = len(value) if size < self.minbytes: raise BadValueError( '%r must be at least %d bytes long!' % (value, self.minbytes)) if size > self.maxbytes: raise BadValueError( '%r must be at most %d bytes long!' % (value, self.maxbytes)) return value def export_value(self, value): """returns a python object fit for serialisation""" return b64encode(value).decode('ascii') def import_value(self, value): """returns a python object from serialisation""" return b64decode(value) def from_string(self, text): value = text # XXX: what should we do here? return self(value) def format_value(self, value, unit=None): return repr(value) def compatible(self, other): try: if self.minbytes < other.minbytes or self.maxbytes > other.maxbytes: raise BadValueError('incompatible datatypes') except AttributeError: raise BadValueError('incompatible datatypes') class StringType(DataType): """string for parameters see properties below """ minchars = Property('minimum number of character points', IntRange(0, UNLIMITED), extname='minchars', default=0) maxchars = Property('maximum number of character points', IntRange(0, UNLIMITED), extname='maxchars', default=UNLIMITED) isUTF8 = Property('flag telling whether encoding is UTF-8 instead of ASCII', Stub('BoolType'), extname='isUTF8', default=False) def __init__(self, minchars=0, maxchars=None, **kwds): super().__init__() if maxchars is None: maxchars = minchars or UNLIMITED self.set_properties(minchars=minchars, maxchars=maxchars, **kwds) def checkProperties(self): self.default = ' ' * self.minchars super().checkProperties() def export_datatype(self): return self.get_info(type='string') def __repr__(self): return 'StringType(%s)' % (', '.join('%s=%r' % kv for kv in self.get_info().items())) def __call__(self, value): """return the validated (internal) value or raise""" if not isinstance(value, str): raise BadValueError('%s has the wrong type!' % repr(value)) if not self.isUTF8: try: value.encode('ascii') except UnicodeEncodeError: raise BadValueError('%r contains non-ascii character!' % value) size = len(value) if size < self.minchars: raise BadValueError( '%r must be at least %d bytes long!' % (value, self.minchars)) if size > self.maxchars: raise BadValueError( '%r must be at most %d bytes long!' % (value, self.maxchars)) if '\0' in value: raise BadValueError( 'Strings are not allowed to embed a \\0! Use a Blob instead!') return value def export_value(self, value): """returns a python object fit for serialisation""" return '%s' % value def import_value(self, value): """returns a python object from serialisation""" return str(value) def from_string(self, text): value = str(text) return self(value) def format_value(self, value, unit=None): return repr(value) def compatible(self, other): try: if self.minchars < other.minchars or self.maxchars > other.maxchars or \ self.isUTF8 > other.isUTF8: raise BadValueError('incompatible datatypes') except AttributeError: raise BadValueError('incompatible datatypes') # TextType is a special StringType intended for longer texts (i.e. embedding \n), # whereas StringType is supposed to not contain '\n' # unfortunately, SECoP makes no distinction here.... # note: content is supposed to follow the format of a git commit message, # i.e. a line of text, 2 '\n' + a longer explanation class TextType(StringType): def __init__(self, maxchars=None): if maxchars is None: maxchars = UNLIMITED super(TextType, self).__init__(0, maxchars) def __repr__(self): if self.maxchars == UNLIMITED: return 'TextType()' return 'TextType(%d)' % self.maxchars def copy(self): # DataType.copy will not work, because it is exported as 'string' return TextType(self.maxchars) class BoolType(DataType): """boolean""" default = False def export_datatype(self): return {'type': 'bool'} def __repr__(self): return 'BoolType()' def __call__(self, value): """return the validated (internal) value or raise""" if value in [0, '0', 'False', 'false', 'no', 'off', False]: return False if value in [1, '1', 'True', 'true', 'yes', 'on', True]: return True raise BadValueError('%r is not a boolean value!' % value) def export_value(self, value): """returns a python object fit for serialisation""" return self(value) def import_value(self, value): """returns a python object from serialisation""" return self(value) def from_string(self, text): value = text return self(value) def format_value(self, value, unit=None): return repr(bool(value)) def compatible(self, other): other(False) other(True) Stub.fix_datatypes() # # nested types # class ArrayOf(DataType): """data structure with fields of homogeneous type :param members: the datatype of the elements """ minlen = Property('minimum number of elements', IntRange(0), extname='minlen', default=0) maxlen = Property('maximum number of elements', IntRange(0), extname='maxlen', mandatory=True) def __init__(self, members, minlen=0, maxlen=None): super().__init__() if not isinstance(members, DataType): raise BadValueError( 'ArrayOf only works with a DataType as first argument!') # one argument -> exactly that size # argument default to 100 if maxlen is None: maxlen = minlen or 100 self.members = members self.set_properties(minlen=minlen, maxlen=maxlen) def copy(self): """DataType.copy does not work when members are enums""" return ArrayOf(self.members.copy(), self.minlen, self.maxlen) def checkProperties(self): self.default = [self.members.default] * self.minlen super().checkProperties() def getProperties(self): """get also properties of members""" res = {} res.update(super().getProperties()) res.update(self.members.getProperties()) return res def setProperty(self, key, value): """set also properties of members""" if key in self.propertyDict: super().setProperty(key, value) else: self.members.setProperty(key, value) def export_datatype(self): return dict(type='array', minlen=self.minlen, maxlen=self.maxlen, members=self.members.export_datatype()) def __repr__(self): return 'ArrayOf(%s, %s, %s)' % ( repr(self.members), self.minlen, self.maxlen) def __call__(self, value): """validate an external representation to an internal one""" if isinstance(value, (tuple, list)): # check number of elements if self.minlen is not None and len(value) < self.minlen: raise BadValueError( 'Array too small, needs at least %d elements!' % self.minlen) if self.maxlen is not None and len(value) > self.maxlen: raise BadValueError( 'Array too big, holds at most %d elements!' % self.minlen) # apply subtype valiation to all elements and return as list return tuple(self.members(elem) for elem in value) raise BadValueError( 'Can not convert %s to ArrayOf DataType!' % repr(value)) def export_value(self, value): """returns a python object fit for serialisation""" return [self.members.export_value(elem) for elem in value] def import_value(self, value): """returns a python object from serialisation""" return tuple(self.members.import_value(elem) for elem in value) def from_string(self, text): value, rem = Parser.parse(text) if rem: raise ProtocolError('trailing garbage: %r' % rem) return self(value) def format_value(self, value, unit=None): if unit is None: unit = self.unit or self.members.unit res = '[%s]' % (', '.join([self.members.format_value(elem, '') for elem in value])) if unit: return ' '.join([res, unit]) return res def compatible(self, other): try: if self.minlen < other.minlen or self.maxlen > other.maxlen: raise BadValueError('incompatible datatypes') self.members.compatible(other.members) except AttributeError: raise BadValueError('incompatible datatypes') class TupleOf(DataType): """data structure with fields of inhomogeneous type types are given as positional arguments """ def __init__(self, *members): super().__init__() if not members: raise BadValueError('Empty tuples are not allowed!') for subtype in members: if not isinstance(subtype, DataType): raise BadValueError( 'TupleOf only works with DataType objs as arguments!') self.members = members self.default = tuple(el.default for el in members) def copy(self): """DataType.copy does not work when members contain enums""" return TupleOf(*(m.copy() for m in self.members)) def export_datatype(self): return dict(type='tuple', members=[subtype.export_datatype() for subtype in self.members]) def __repr__(self): return 'TupleOf(%s)' % ', '.join([repr(st) for st in self.members]) def __call__(self, value): """return the validated value or raise""" # keep the ordering! try: if len(value) != len(self.members): raise BadValueError( 'Illegal number of Arguments! Need %d arguments.' % len(self.members)) # validate elements and return as list return tuple(sub(elem) for sub, elem in zip(self.members, value)) except Exception as exc: raise BadValueError('Can not validate:', str(exc)) def export_value(self, value): """returns a python object fit for serialisation""" return [sub.export_value(elem) for sub, elem in zip(self.members, value)] def import_value(self, value): """returns a python object from serialisation""" return tuple(sub.import_value(elem) for sub, elem in zip(self.members, value)) def from_string(self, text): value, rem = Parser.parse(text) if rem: raise ProtocolError('trailing garbage: %r' % rem) return self(value) def format_value(self, value, unit=None): return '(%s)' % (', '.join([sub.format_value(elem) for sub, elem in zip(self.members, value)])) def compatible(self, other): if not isinstance(other, TupleOf): raise BadValueError('incompatible datatypes') if len(self.members) != len(other.members): raise BadValueError('incompatible datatypes') for a, b in zip(self.members, other.members): a.compatible(b) class ImmutableDict(dict): def _no(self, *args, **kwds): raise TypeError('a struct can not be modified, please overwrite instead') __setitem__ = __delitem__ = clear = pop = popitem = setdefault = update = _no class StructOf(DataType): """data structure with named fields :param optional: a list of optional members :param members: names as keys and types as values for all members """ def __init__(self, optional=None, **members): super().__init__() self.members = members if not members: raise BadValueError('Empty structs are not allowed!') self.optional = list(optional or []) for name, subtype in list(members.items()): if not isinstance(subtype, DataType): raise ProgrammingError( 'StructOf only works with named DataType objs as keyworded arguments!') for name in self.optional: if name not in members: raise ProgrammingError( 'Only members of StructOf may be declared as optional!') self.default = dict((k, el.default) for k, el in members.items()) def copy(self): """DataType.copy does not work when members contain enums""" return StructOf(self.optional, **{k: v.copy() for k, v in self.members.items()}) def export_datatype(self): res = dict(type='struct', members=dict((n, s.export_datatype()) for n, s in list(self.members.items()))) if self.optional: res['optional'] = self.optional return res def __repr__(self): opt = ', optional=%r' % self.optional if self.optional else '' return 'StructOf(%s%s)' % (', '.join( ['%s=%s' % (n, repr(st)) for n, st in list(self.members.items())]), opt) def __call__(self, value): """return the validated value or raise""" try: missing = set(self.members) - set(value) - set(self.optional) if missing: raise BadValueError('missing values for keys %r' % list(missing)) # validate elements and return as dict return ImmutableDict((str(k), self.members[k](v)) for k, v in list(value.items())) except Exception as exc: raise BadValueError('Can not validate %s: %s' % (repr(value), str(exc))) def export_value(self, value): """returns a python object fit for serialisation""" self(value) # check validity return dict((str(k), self.members[k].export_value(v)) for k, v in list(value.items())) def import_value(self, value): """returns a python object from serialisation""" return self({str(k): self.members[k].import_value(v) for k, v in value.items()}) def from_string(self, text): value, rem = Parser.parse(text) if rem: raise ProtocolError('trailing garbage: %r' % rem) return self(dict(value)) def format_value(self, value, unit=None): return '{%s}' % (', '.join(['%s=%s' % (k, self.members[k].format_value(v)) for k, v in sorted(value.items())])) def compatible(self, other): try: mandatory = set(other.members) - set(other.optional) for k, m in self.members.items(): m.compatible(other.members[k]) mandatory.discard(k) if mandatory: raise BadValueError('incompatible datatypes') except (AttributeError, TypeError, KeyError): raise BadValueError('incompatible datatypes') class CommandType(DataType): """command a pseudo datatype for commands with arguments and return values """ IS_COMMAND = True def __init__(self, argument=None, result=None): super().__init__() if argument is not None: if not isinstance(argument, DataType): raise BadValueError('CommandType: Argument type must be a DataType!') if result is not None: if not isinstance(result, DataType): raise BadValueError('CommandType: Result type must be a DataType!') self.argument = argument self.result = result def export_datatype(self): a, r = self.argument, self.result props = {'type': 'command'} if a is not None: props['argument'] = a.export_datatype() if r is not None: props['result'] = r.export_datatype() return props def __repr__(self): argstr = repr(self.argument) if self.argument else '' if self.result is None: return 'CommandType(%s)' % argstr return 'CommandType(%s)->%s' % (argstr, repr(self.result)) def __call__(self, value): """return the validated argument value or raise""" return self.argument(value) def export_value(self, value): raise ProgrammingError('values of type command can not be transported!') def import_value(self, value): raise ProgrammingError('values of type command can not be transported!') def from_string(self, text): value, rem = Parser.parse(text) if rem: raise ProtocolError('trailing garbage: %r' % rem) return self(value) def format_value(self, value, unit=None): # actually I have no idea what to do here! raise NotImplementedError def compatible(self, other): try: if self.argument != other.argument: # not both are None self.argument.compatible(other.argument) if self.result != other.result: # not both are None other.result.compatible(self.result) except AttributeError: raise BadValueError('incompatible datatypes') # internally used datatypes (i.e. only for programming the SEC-node) class DataTypeType(DataType): def __call__(self, value): """check if given value (a python obj) is a valid datatype returns the value or raises an appropriate exception""" if isinstance(value, DataType): return value raise ProgrammingError('%r should be a DataType!' % value) def export_value(self, value): """if needed, reformat value for transport""" return value.export_datatype() def import_value(self, value): """opposite of export_value, reformat from transport to internal repr note: for importing from gui/configfile/commandline use :meth:`from_string` instead. """ raise NotImplementedError class ValueType(DataType): """validates any python value""" def __call__(self, value): """check if given value (a python obj) is valid for this datatype returns the value or raises an appropriate exception""" return value def export_value(self, value): """if needed, reformat value for transport""" return value def import_value(self, value): """opposite of export_value, reformat from transport to internal repr note: for importing from gui/configfile/commandline use :meth:`from_string` instead. """ raise NotImplementedError class NoneOr(DataType): """validates a None or smth. else""" default = None def __init__(self, other): super().__init__() self.other = other def __call__(self, value): return None if value is None else self.other(value) def export_value(self, value): if value is None: return None return self.other.export_value(value) class OrType(DataType): def __init__(self, *types): super().__init__() self.types = types self.default = self.types[0].default def __call__(self, value): for t in self.types: try: return t(value) except Exception: pass raise BadValueError("Invalid Value, must conform to one of %s" % (', '.join((str(t) for t in self.types)))) Int8 = IntRange(-(1 << 7), (1 << 7) - 1) Int16 = IntRange(-(1 << 15), (1 << 15) - 1) Int32 = IntRange(-(1 << 31), (1 << 31) - 1) Int64 = IntRange(-(1 << 63), (1 << 63) - 1) UInt8 = IntRange(0, (1 << 8) - 1) UInt16 = IntRange(0, (1 << 16) - 1) UInt32 = IntRange(0, (1 << 32) - 1) UInt64 = IntRange(0, (1 << 64) - 1) # Goodie: Convenience Datatypes for Programming class LimitsType(TupleOf): def __init__(self, members): TupleOf.__init__(self, members, members) def __call__(self, value): limits = TupleOf.__call__(self, value) if limits[1] < limits[0]: raise BadValueError('Maximum Value %s must be greater than minimum value %s!' % (limits[1], limits[0])) return limits class StatusType(TupleOf): # shorten initialisation and allow access to status enumMembers from status values def __init__(self, enum): TupleOf.__init__(self, EnumType(enum), StringType()) self._enum = enum def __getattr__(self, key): return getattr(self._enum, key) def floatargs(kwds): return {k: v for k, v in kwds.items() if k in {'unit', 'fmtstr', 'absolute_resolution', 'relative_resolution'}} # argumentnames to lambda from spec! # **kwds at the end are for must-ignore policy DATATYPES = dict( bool = lambda **kwds: BoolType(), int = lambda min, max, **kwds: IntRange(minval=min, maxval=max), scaled = lambda scale, min, max, **kwds: ScaledInteger(scale=scale, minval=min*scale, maxval=max*scale, **floatargs(kwds)), double = lambda min=None, max=None, **kwds: FloatRange(minval=min, maxval=max, **floatargs(kwds)), blob = lambda maxbytes, minbytes=0, **kwds: BLOBType(minbytes=minbytes, maxbytes=maxbytes), string = lambda minchars=0, maxchars=None, isUTF8=False, **kwds: StringType(minchars=minchars, maxchars=maxchars, isUTF8=isUTF8), array = lambda maxlen, members, minlen=0, pname='', **kwds: ArrayOf(get_datatype(members, pname), minlen=minlen, maxlen=maxlen), tuple = lambda members, pname='', **kwds: TupleOf(*tuple((get_datatype(t, pname) for t in members))), enum = lambda members, pname='', **kwds: EnumType(pname, members=members), struct = lambda members, optional=None, pname='', **kwds: StructOf(optional, **dict((n, get_datatype(t, pname)) for n, t in list(members.items()))), command = lambda argument=None, result=None, pname='', **kwds: CommandType(get_datatype(argument, pname), get_datatype(result)), limit = lambda members, pname='', **kwds: LimitsType(get_datatype(members, pname)), ) # important for getting the right datatype from formerly jsonified descr. def get_datatype(json, pname=''): """returns a DataType object from description inverse of .export_datatype() :param json: the datainfo object as returned from json.loads :param pname: if given, used to name EnumTypes from the parameter name :return: the datatype (instance of DataType) """ if json is None: return json if isinstance(json, list) and len(json) == 2: base, kwargs = json # still allow old syntax else: try: kwargs = json.copy() base = kwargs.pop('type') except (TypeError, KeyError, AttributeError): raise BadValueError('a data descriptor must be a dict containing a "type" key, not %r' % json) try: return DATATYPES[base](pname=pname, **kwargs) except Exception as e: raise BadValueError('invalid data descriptor: %r (%s)' % (json, str(e)))