simplify status type declaration
- StatusType: simpler inheritance (inherit from module instead of Enum) - StatusType: more robust for standard codes, give names only - <Module>.Status is automatically extended - Enum: accept duplicates with same name and value Change-Id: Iad1dacf14c31fe6f4ae48e7560b29e49838e4f23 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30716 Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de> Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de> Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
parent
dccd329435
commit
5db84b3fa1
@ -1242,7 +1242,12 @@ class LimitsType(TupleOf):
|
|||||||
|
|
||||||
|
|
||||||
class StatusType(TupleOf):
|
class StatusType(TupleOf):
|
||||||
# shorten initialisation and allow access to status enumMembers from status values
|
"""convenience type for status
|
||||||
|
|
||||||
|
:param first: an Enum or Module to inherit from, or the first member name
|
||||||
|
:param args: member names (codes will be taken from class attributes below)
|
||||||
|
:param kwds: additional members not matching the standard
|
||||||
|
"""
|
||||||
DISABLED = 0
|
DISABLED = 0
|
||||||
IDLE = 100
|
IDLE = 100
|
||||||
STANDBY = 130
|
STANDBY = 130
|
||||||
@ -1250,7 +1255,7 @@ class StatusType(TupleOf):
|
|||||||
WARN = 200
|
WARN = 200
|
||||||
WARN_STANDBY = 230
|
WARN_STANDBY = 230
|
||||||
WARN_PREPARED = 250
|
WARN_PREPARED = 250
|
||||||
UNSTABLE = 270 # no SECoP standard (yet)
|
UNSTABLE = 270 # not in SECoP standard (yet)
|
||||||
BUSY = 300
|
BUSY = 300
|
||||||
DISABLING = 310
|
DISABLING = 310
|
||||||
INITIALIZING = 320
|
INITIALIZING = 320
|
||||||
@ -1262,13 +1267,30 @@ class StatusType(TupleOf):
|
|||||||
ERROR = 400
|
ERROR = 400
|
||||||
ERROR_STANDBY = 430
|
ERROR_STANDBY = 430
|
||||||
ERROR_PREPARED = 450
|
ERROR_PREPARED = 450
|
||||||
|
UNKNOWN = 401 # not in SECoP standard (yet)
|
||||||
|
|
||||||
def __init__(self, enum):
|
def __init__(self, first, *args, **kwds):
|
||||||
super().__init__(EnumType(enum), StringType())
|
if first:
|
||||||
self._enum = enum
|
if isinstance(first, str):
|
||||||
|
args = (first,) + args
|
||||||
|
first = 'Status' # enum name
|
||||||
|
else:
|
||||||
|
if not isinstance(first, Enum):
|
||||||
|
# assume first is a Module with a status parameter
|
||||||
|
try:
|
||||||
|
first = first.status.datatype.members[0]._enum
|
||||||
|
except AttributeError:
|
||||||
|
raise ProgrammingError('first argument must be either str, Enum or a module') from None
|
||||||
|
else:
|
||||||
|
first = 'Status' # enum name
|
||||||
|
bad = {n for n in args if n not in StatusType.__dict__ or n.startswith('_')} # avoid built-in attributes
|
||||||
|
if bad:
|
||||||
|
raise ProgrammingError('positional arguments %r must be standard status code names' % bad)
|
||||||
|
self.enum = Enum(Enum(first, **{n: StatusType.__dict__[n] for n in args}), **kwds)
|
||||||
|
super().__init__(EnumType(self.enum), StringType())
|
||||||
|
|
||||||
def __getattr__(self, key):
|
def __getattr__(self, key):
|
||||||
return getattr(self._enum, key)
|
return self.enum[key]
|
||||||
|
|
||||||
|
|
||||||
def floatargs(kwds):
|
def floatargs(kwds):
|
||||||
|
@ -264,8 +264,7 @@ class Enum(dict):
|
|||||||
names = set()
|
names = set()
|
||||||
values = set()
|
values = set()
|
||||||
|
|
||||||
# pylint: disable=dangerous-default-value
|
def add(self, k, v):
|
||||||
def add(self, k, v, names=names, value=values):
|
|
||||||
"""helper for creating the enum members"""
|
"""helper for creating the enum members"""
|
||||||
if v is None:
|
if v is None:
|
||||||
# sugar: take the next free number if value was None
|
# sugar: take the next free number if value was None
|
||||||
@ -285,10 +284,10 @@ class Enum(dict):
|
|||||||
v = _v
|
v = _v
|
||||||
|
|
||||||
# check for duplicates
|
# check for duplicates
|
||||||
if k in names:
|
if self.get(k, v) != v:
|
||||||
raise TypeError('duplicate name %r' % k)
|
raise TypeError('%s=%d conflicts with %s=%d' % (k, v, k, self[k]))
|
||||||
if v in values:
|
if self.get(v, k) != k:
|
||||||
raise TypeError('duplicate value %d (key=%r)' % (v, k))
|
raise TypeError('%s=%d conflicts with %s=%d' % (k, v, self[v].name, v))
|
||||||
|
|
||||||
# remember it
|
# remember it
|
||||||
self[v] = self[k] = EnumMember(self, k, v)
|
self[v] = self[k] = EnumMember(self, k, v)
|
||||||
|
@ -220,6 +220,11 @@ class HasAccessibles(HasProperties):
|
|||||||
raise ProgrammingError('%s.%s defined, but %r is no parameter'
|
raise ProgrammingError('%s.%s defined, but %r is no parameter'
|
||||||
% (cls.__name__, attrname, pname))
|
% (cls.__name__, attrname, pname))
|
||||||
|
|
||||||
|
try:
|
||||||
|
# update Status type
|
||||||
|
cls.Status = cls.status.datatype.members[0]._enum
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
res = {}
|
res = {}
|
||||||
# collect info about properties
|
# collect info about properties
|
||||||
for pn, pv in cls.propertyDict.items():
|
for pn, pv in cls.propertyDict.items():
|
||||||
@ -826,7 +831,6 @@ class Module(HasAccessibles):
|
|||||||
|
|
||||||
class Readable(Module):
|
class Readable(Module):
|
||||||
"""basic readable module"""
|
"""basic readable module"""
|
||||||
# pylint: disable=invalid-name
|
|
||||||
Status = Enum('Status',
|
Status = Enum('Status',
|
||||||
IDLE=StatusType.IDLE,
|
IDLE=StatusType.IDLE,
|
||||||
WARN=StatusType.WARN,
|
WARN=StatusType.WARN,
|
||||||
@ -834,11 +838,10 @@ class Readable(Module):
|
|||||||
ERROR=StatusType.ERROR,
|
ERROR=StatusType.ERROR,
|
||||||
DISABLED=StatusType.DISABLED,
|
DISABLED=StatusType.DISABLED,
|
||||||
UNKNOWN=401, # not SECoP standard. TODO: remove and adapt entangle and epics
|
UNKNOWN=401, # not SECoP standard. TODO: remove and adapt entangle and epics
|
||||||
) #: status codes
|
) #: status code Enum: extended automatically in inherited modules
|
||||||
|
|
||||||
value = Parameter('current value of the module', FloatRange())
|
value = Parameter('current value of the module', FloatRange())
|
||||||
status = Parameter('current status of the module', StatusType(Status),
|
status = Parameter('current status of the module', StatusType(Status),
|
||||||
default=(Status.IDLE, ''))
|
default=(StatusType.IDLE, ''))
|
||||||
pollinterval = Parameter('default poll interval', FloatRange(0.1, 120),
|
pollinterval = Parameter('default poll interval', FloatRange(0.1, 120),
|
||||||
default=5, readonly=False, export=True)
|
default=5, readonly=False, export=True)
|
||||||
|
|
||||||
@ -869,9 +872,7 @@ class Writable(Readable):
|
|||||||
class Drivable(Writable):
|
class Drivable(Writable):
|
||||||
"""basic drivable module"""
|
"""basic drivable module"""
|
||||||
|
|
||||||
Status = Enum(Readable.Status, BUSY=StatusType.BUSY) #: status codes
|
status = Parameter(datatype=StatusType(Readable, 'BUSY')) # extend Readable.status
|
||||||
|
|
||||||
status = Parameter(datatype=StatusType(Status)) # override Readable.status
|
|
||||||
|
|
||||||
def isBusy(self, status=None):
|
def isBusy(self, status=None):
|
||||||
"""check for busy, treating substates correctly
|
"""check for busy, treating substates correctly
|
||||||
|
@ -26,7 +26,7 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from frappy.datatypes import ArrayOf, BLOBType, BoolType, \
|
from frappy.datatypes import ArrayOf, BLOBType, BoolType, \
|
||||||
CommandType, ConfigError, DataType, Enum, EnumType, FloatRange, \
|
CommandType, ConfigError, DataType, EnumType, FloatRange, \
|
||||||
IntRange, ProgrammingError, ScaledInteger, StatusType, \
|
IntRange, ProgrammingError, ScaledInteger, StatusType, \
|
||||||
StringType, StructOf, TextType, TupleOf, get_datatype, \
|
StringType, StructOf, TextType, TupleOf, get_datatype, \
|
||||||
DiscouragedConversion
|
DiscouragedConversion
|
||||||
@ -495,11 +495,23 @@ def test_Command():
|
|||||||
|
|
||||||
|
|
||||||
def test_StatusType():
|
def test_StatusType():
|
||||||
status_codes = Enum('Status', IDLE=100, WARN=200, BUSY=300, ERROR=400)
|
dt = StatusType('IDLE', 'WARN', 'ERROR', 'DISABLED')
|
||||||
dt = StatusType(status_codes)
|
assert dt.IDLE == StatusType.IDLE == 100
|
||||||
assert dt.IDLE == status_codes.IDLE
|
assert dt.ERROR == StatusType.ERROR == 400
|
||||||
assert dt.ERROR == status_codes.ERROR
|
|
||||||
assert dt._enum == status_codes
|
dt2 = StatusType(None, IDLE=100, WARN=200, ERROR=400, DISABLED=0)
|
||||||
|
assert dt2.export_datatype() == dt.export_datatype()
|
||||||
|
|
||||||
|
dt3 = StatusType(dt.enum)
|
||||||
|
assert dt3.export_datatype() == dt.export_datatype()
|
||||||
|
|
||||||
|
with pytest.raises(ProgrammingError):
|
||||||
|
StatusType('__init__') # built in attribute of StatusType
|
||||||
|
|
||||||
|
with pytest.raises(ProgrammingError):
|
||||||
|
StatusType(dt.enum, 'custom') # not a standard attribute
|
||||||
|
|
||||||
|
StatusType(dt.enum, custom=499) # o.k., if value is given
|
||||||
|
|
||||||
|
|
||||||
def test_get_datatype():
|
def test_get_datatype():
|
||||||
|
@ -83,3 +83,12 @@ def test_Enum_bool():
|
|||||||
e = Enum('OffOn', off=0, on=1)
|
e = Enum('OffOn', off=0, on=1)
|
||||||
assert bool(e(0)) is False
|
assert bool(e(0)) is False
|
||||||
assert bool(e(1)) is True
|
assert bool(e(1)) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_Enum_duplicate():
|
||||||
|
e = Enum('x', a=1, b=2)
|
||||||
|
Enum(e, b=2, c=3) # matching duplicate
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
Enum(e, b=3, c=4) # duplicate name with value mismatch
|
||||||
|
with pytest.raises(TypeError):
|
||||||
|
Enum(e, c=1) # duplicate value with name mismatch
|
||||||
|
Loading…
x
Reference in New Issue
Block a user