remove UNKNOWN, UNSTABLE and DISABLED from Readable.status

- re-add them where needed (epics, entangle ...)

Change-Id: I2b8af9f5f86285f081d5418211f6940e80a1dbd7
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/30718
Reviewed-by: Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2023-03-20 14:14:12 +01:00
parent 11a3bed8b8
commit 361f9ac4fc
8 changed files with 169 additions and 108 deletions

View File

@ -58,3 +58,4 @@ FINALIZING = StatusType.FINALIZING
ERROR = StatusType.ERROR
ERROR_STANDBY = StatusType.ERROR_STANDBY
ERROR_PREPARED = StatusType.ERROR_PREPARED
UNKNOWN = StatusType.UNKNOWN # no SECoP standard (yet)

View File

@ -834,10 +834,7 @@ class Readable(Module):
Status = Enum('Status',
IDLE=StatusType.IDLE,
WARN=StatusType.WARN,
UNSTABLE=270, # not SECoP standard. TODO: remove and adapt entangle
ERROR=StatusType.ERROR,
DISABLED=StatusType.DISABLED,
UNKNOWN=401, # not SECoP standard. TODO: remove and adapt entangle and epics
) #: status code Enum: extended automatically in inherited modules
value = Parameter('current value of the module', FloatRange())
status = Parameter('current status of the module', StatusType(Status),

View File

@ -21,7 +21,7 @@
# *****************************************************************************
from frappy.datatypes import EnumType, FloatRange, StringType
from frappy.datatypes import EnumType, FloatRange, StringType, StatusType
from frappy.modules import Drivable, Parameter, Readable
try:
@ -71,7 +71,7 @@ class EpicsReadable(Readable):
status_pv = Parameter('EPICS pv_name of status',
datatype=StringType(),
default="unset", export=False)
status = Parameter(datatype=StatusType(Readable, 'UNKNOWN'))
# Generic read and write functions
def _read_pv(self, pv_name):
@ -132,6 +132,7 @@ class EpicsDrivable(Drivable):
default="unset", export=False)
status_pv = Parameter('EPICS pv_name of status', datatype=StringType(),
default="unset", export=False)
status = Parameter(datatype=StatusType(Drivable, 'UNKNOWN'))
# Generic read and write functions

View File

@ -41,7 +41,7 @@ from frappy.datatypes import ArrayOf, EnumType, FloatRange, \
from frappy.errors import CommunicationFailedError, \
ConfigError, HardwareError, ProgrammingError
from frappy.lib import lazy_property
from frappy.modules import Command, \
from frappy.modules import Command, StatusType, \
Drivable, Module, Parameter, Readable
#####
@ -173,11 +173,11 @@ class BasePyTangoDevice:
)
tango_status_mapping = {
PyTango.DevState.ON: Drivable.Status.IDLE,
PyTango.DevState.ALARM: Drivable.Status.WARN,
PyTango.DevState.OFF: Drivable.Status.DISABLED,
PyTango.DevState.FAULT: Drivable.Status.ERROR,
PyTango.DevState.MOVING: Drivable.Status.BUSY,
PyTango.DevState.ON: StatusType.IDLE,
PyTango.DevState.ALARM: StatusType.WARN,
PyTango.DevState.OFF: StatusType.DISABLED,
PyTango.DevState.FAULT: StatusType.ERROR,
PyTango.DevState.MOVING: StatusType.BUSY,
}
@lazy_property
@ -364,13 +364,15 @@ class BasePyTangoDevice:
class PyTangoDevice(BasePyTangoDevice):
"""Base for "normal" devices with status."""
status = Parameter(datatype=StatusType(Readable, 'UNKNOWN', 'DISABLED'))
def read_status(self):
# Query status code and string
tangoState = self._dev.State()
tangoStatus = self._dev.Status()
# Map status
myState = self.tango_status_mapping.get(tangoState, Drivable.Status.UNKNOWN)
myState = self.tango_status_mapping.get(tangoState, StatusType.UNKNOWN)
return (myState, tangoStatus)
@ -456,6 +458,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
default=60.0, readonly=False,
datatype=FloatRange(0, 900, unit='s'), group='stability',
)
status = Parameter(datatype=StatusType(PyTangoDevice, 'BUSY', 'UNSTABLE'))
_history = ()
_timeout = None
@ -520,7 +523,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
def read_status(self):
status = super().read_status()
if status[0] in (Readable.Status.DISABLED, Readable.Status.ERROR):
if status[0] in (StatusType.DISABLED, StatusType.ERROR):
self.setFastPoll(False)
return status
if self._isAtTarget():
@ -857,7 +860,7 @@ class PartialDigitalInput(NamedDigitalInput):
return value # mapping is done by datatype upon export()
class DigitalOutput(PyTangoDevice, Drivable):
class DigitalOutput(PyTangoDevice):
"""A device that can set and read a digital value corresponding to a
bitfield.
"""

View File

@ -23,7 +23,7 @@
"""drivers for CCU4, the cryostat control unit at SINQ"""
# the most common Frappy classes can be imported from frappy.core
from frappy.core import EnumType, FloatRange, \
HasIO, Parameter, Readable, StringIO
HasIO, Parameter, Readable, StringIO, StatusType
class CCU4IO(StringIO):
@ -51,16 +51,16 @@ class HeLevel(HasIO, Readable):
readonly=False)
sample_rate = Parameter('sample rate', EnumType(slow=0, fast=1), readonly=False)
Status = Readable.Status
status = Parameter(datatype=StatusType(Readable, 'DISABLED'))
# conversion of the code from the CCU4 parameter 'hsf'
STATUS_MAP = {
0: (Status.IDLE, 'sensor ok'),
1: (Status.ERROR, 'sensor warm'),
2: (Status.ERROR, 'no sensor'),
3: (Status.ERROR, 'timeout'),
4: (Status.ERROR, 'not yet read'),
5: (Status.DISABLED, 'disabled'),
0: (StatusType.IDLE, 'sensor ok'),
1: (StatusType.ERROR, 'sensor warm'),
2: (StatusType.ERROR, 'no sensor'),
3: (StatusType.ERROR, 'timeout'),
4: (StatusType.ERROR, 'not yet read'),
5: (StatusType.DISABLED, 'disabled'),
}
def query(self, cmd):

View File

@ -31,7 +31,7 @@ import time
from ast import literal_eval
import frappy.io
from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange
from frappy.datatypes import BoolType, EnumType, FloatRange, IntRange, StatusType
from frappy.lib import formatStatusBits
from frappy.core import Done, Drivable, Parameter, Property, CommonReadHandler, CommonWriteHandler
from frappy.io import HasIO
@ -158,6 +158,7 @@ class ResChannel(Channel):
channel = Property('the Lakeshore channel', datatype=IntRange(1, 16), export=False)
value = Parameter(datatype=FloatRange(unit='Ohm'))
status = Parameter(datatype=StatusType(Drivable, 'DISABLED'))
pollinterval = Parameter(visibility=3, default=1)
range = Parameter('reading range', readonly=False,
datatype=EnumType(**RES_RANGE))

View File

@ -28,7 +28,7 @@ The PPMS hardware has some special requirements:
needing a mechanism to treat a single parameter change correctly.
Polling of value and status is done commonly for all modules. For each registered module
<module>.update_value_status() is called in order to update their value and status.
<module>.update_value_status() is called in order to update their value and StatusType.
"""
import threading
@ -39,7 +39,6 @@ from frappy.datatypes import BoolType, EnumType, \
FloatRange, IntRange, StatusType, StringType
from frappy.errors import HardwareError
from frappy.lib import clamp
from frappy.lib.enum import Enum
from frappy.modules import Communicator, \
Drivable, Parameter, Property, Readable
from frappy.io import HasIO
@ -117,7 +116,7 @@ class Main(Communicator):
class PpmsBase(HasIO, Readable):
"""common base for all ppms modules"""
value = Parameter(needscfg=False)
status = Parameter(needscfg=False)
status = Parameter(datatype=StatusType(Readable, 'DISABLED'), needscfg=False)
enabled = True # default, if no parameter enable is defined
_last_settings = None # used by several modules
@ -140,13 +139,13 @@ class PpmsBase(HasIO, Readable):
# update value and status
# to be reimplemented for modules looking at packed_status
if not self.enabled:
self.status = (self.Status.DISABLED, 'disabled')
self.status = (StatusType.DISABLED, 'disabled')
return
if value is None:
self.status = (self.Status.ERROR, 'invalid value')
self.status = (StatusType.ERROR, 'invalid value')
else:
self.value = value
self.status = (self.Status.IDLE, '')
self.status = (StatusType.IDLE, '')
def comm_write(self, command):
"""write command and check if result is OK"""
@ -293,19 +292,18 @@ class Chamber(PpmsDrivable):
value is an Enum, which is redundant with the status text
"""
Status = Drivable.Status
code_table = [
# valuecode, status, statusname, opcode, targetname
(0, Status.IDLE, 'unknown', 10, 'noop'),
(1, Status.IDLE, 'purged_and_sealed', 1, 'purge_and_seal'),
(2, Status.IDLE, 'vented_and_sealed', 2, 'vent_and_seal'),
(3, Status.WARN, 'sealed_unknown', 0, 'seal_immediately'),
(4, Status.BUSY, 'purge_and_seal', None, None),
(5, Status.BUSY, 'vent_and_seal', None, None),
(6, Status.BUSY, 'pumping_down', None, None),
(8, Status.IDLE, 'pumping_continuously', 3, 'pump_continuously'),
(9, Status.IDLE, 'venting_continuously', 4, 'vent_continuously'),
(15, Status.ERROR, 'general_failure', None, None),
(0, StatusType.IDLE, 'unknown', 10, 'noop'),
(1, StatusType.IDLE, 'purged_and_sealed', 1, 'purge_and_seal'),
(2, StatusType.IDLE, 'vented_and_sealed', 2, 'vent_and_seal'),
(3, StatusType.WARN, 'sealed_unknown', 0, 'seal_immediately'),
(4, StatusType.BUSY, 'purge_and_seal', None, None),
(5, StatusType.BUSY, 'vent_and_seal', None, None),
(6, StatusType.BUSY, 'pumping_down', None, None),
(8, StatusType.IDLE, 'pumping_continuously', 3, 'pump_continuously'),
(9, StatusType.IDLE, 'venting_continuously', 4, 'vent_continuously'),
(15, StatusType.ERROR, 'general_failure', None, None),
]
value_codes = {k: v for v, _, k, _, _ in code_table}
target_codes = {k: v for v, _, _, _, k in code_table if k}
@ -324,7 +322,7 @@ class Chamber(PpmsDrivable):
self.status = self.status_map[status_code]
else:
self.value = self.value_map['unknown']
self.status = (self.Status.ERROR, 'unknown status code %d' % status_code)
self.status = (StatusType.ERROR, 'unknown status code %d' % status_code)
def read_target(self):
opcode = int(self.communicate('CHAMBER?'))
@ -341,13 +339,8 @@ class Chamber(PpmsDrivable):
class Temp(PpmsDrivable):
"""temperature"""
Status = Enum(
Drivable.Status,
RAMPING=370,
STABILIZING=380,
)
value = Parameter(datatype=FloatRange(unit='K'))
status = Parameter(datatype=StatusType(Status))
status = Parameter(datatype=StatusType(Drivable, 'RAMPING', 'STABILIZING'))
target = Parameter(datatype=FloatRange(1.7, 402.0, unit='K'), needscfg=False)
setpoint = Parameter('intermediate set point',
datatype=FloatRange(1.7, 402.0, unit='K'))
@ -362,15 +355,15 @@ class Temp(PpmsDrivable):
general_stop = Property('respect general stop', datatype=BoolType(),
default=True, value=False)
STATUS_MAP = {
1: (Status.IDLE, 'stable at target'),
2: (Status.RAMPING, 'ramping'),
5: (Status.STABILIZING, 'within tolerance'),
6: (Status.STABILIZING, 'outside tolerance'),
7: (Status.STABILIZING, 'filling/emptying reservoir'),
10: (Status.WARN, 'standby'),
13: (Status.WARN, 'control disabled'),
14: (Status.ERROR, 'can not complete'),
15: (Status.ERROR, 'general failure'),
1: (StatusType.IDLE, 'stable at target'),
2: (StatusType.RAMPING, 'ramping'),
5: (StatusType.STABILIZING, 'within tolerance'),
6: (StatusType.STABILIZING, 'outside tolerance'),
7: (StatusType.STABILIZING, 'filling/emptying reservoir'),
10: (StatusType.WARN, 'standby'),
13: (StatusType.WARN, 'control disabled'),
14: (StatusType.ERROR, 'can not complete'),
15: (StatusType.ERROR, 'general failure'),
}
channel = 'temp'
@ -420,11 +413,11 @@ class Temp(PpmsDrivable):
def update_value_status(self, value, packed_status):
if value is None:
self.status = (self.Status.ERROR, 'invalid value')
self.status = (StatusType.ERROR, 'invalid value')
return
self.value = value
status_code = packed_status & 0xf
status = self.STATUS_MAP.get(status_code, (self.Status.ERROR, 'unknown status code %d' % status_code))
status = self.STATUS_MAP.get(status_code, (StatusType.ERROR, 'unknown status code %d' % status_code))
now = time.time()
if value > 11:
# when starting from T > 50, this will be 15 min.
@ -436,7 +429,7 @@ class Temp(PpmsDrivable):
self._wait_at10 = False
self._last_change = now
self._write_params(self.target, self.ramp, self.approachmode)
status = (self.Status.STABILIZING, 'waiting at 10 K')
status = (StatusType.STABILIZING, 'waiting at 10 K')
if self._last_change: # there was a change, which is not yet confirmed by hw
if now > self._last_change + 5:
self._last_change = 0 # give up waiting for busy
@ -444,14 +437,14 @@ class Temp(PpmsDrivable):
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
self._last_change = 0
else:
status = (self.Status.BUSY, 'changed target')
status = (StatusType.BUSY, 'changed target')
if abs(self.value - self.target) < self.target * 0.01:
self._last_target = self.target
elif self._last_target is None:
self._last_target = self.value
if self._stopped:
# combine 'stopped' with current status text
if status[0] == self.Status.IDLE:
if status[0] == StatusType.IDLE:
status = (status[0], 'stopped')
else:
status = (status[0], 'stopping (%s)' % status[1])
@ -459,7 +452,7 @@ class Temp(PpmsDrivable):
# handle timeout
if self.isDriving(status):
if now > self._expected_target_time + self.timeout:
status = (self.Status.WARN, 'timeout while %s' % status[1])
status = (StatusType.WARN, 'timeout while %s' % status[1])
else:
self._expected_target_time = 0
self.status = status
@ -469,7 +462,7 @@ class Temp(PpmsDrivable):
if abs(self.target - self.value) <= 2e-5 * target and target == self.target:
return None
self._status_before_change = self.status
self.status = (self.Status.BUSY, 'changed target')
self.status = (StatusType.BUSY, 'changed target')
self._last_change = time.time()
self._write_params(target, self.ramp, self.approachmode)
self.log.debug('write_target %s' % repr((self.setpoint, target, self._wait_at10)))
@ -493,7 +486,7 @@ class Temp(PpmsDrivable):
def stop(self):
if not self.isDriving():
return
if self.status[0] != self.Status.STABILIZING:
if self.status[0] != StatusType.STABILIZING:
# we are not near target
newtarget = clamp(self._last_target, self.value, self.target)
if newtarget != self.target:
@ -506,16 +499,8 @@ class Temp(PpmsDrivable):
class Field(PpmsDrivable):
"""magnetic field"""
Status = Enum(
Drivable.Status,
PREPARED=150,
PREPARING=340,
RAMPING=370,
STABILIZING=380,
FINALIZING=390,
)
value = Parameter(datatype=FloatRange(unit='T'))
status = Parameter(datatype=StatusType(Status))
status = Parameter(datatype=StatusType(Drivable, 'PREPARED', 'PREPARING', 'RAMPING', 'STABILIZING', 'FINALIZING'))
target = Parameter(datatype=FloatRange(-15, 15, unit='T')) # poll only one parameter
ramp = Parameter('ramping speed', readonly=False,
datatype=FloatRange(0.064, 1.19, unit='T/min'), default=0.19)
@ -525,16 +510,16 @@ class Field(PpmsDrivable):
datatype=EnumType(persistent=0, driven=1), default=0)
STATUS_MAP = {
1: (Status.IDLE, 'persistent mode'),
2: (Status.PREPARING, 'switch warming'),
3: (Status.FINALIZING, 'switch cooling'),
4: (Status.IDLE, 'driven stable'),
5: (Status.STABILIZING, 'driven final'),
6: (Status.RAMPING, 'charging'),
7: (Status.RAMPING, 'discharging'),
8: (Status.ERROR, 'current error'),
11: (Status.ERROR, 'probably quenched'),
15: (Status.ERROR, 'general failure'),
1: (StatusType.IDLE, 'persistent mode'),
2: (StatusType.PREPARING, 'switch warming'),
3: (StatusType.FINALIZING, 'switch cooling'),
4: (StatusType.IDLE, 'driven stable'),
5: (StatusType.STABILIZING, 'driven final'),
6: (StatusType.RAMPING, 'charging'),
7: (StatusType.RAMPING, 'discharging'),
8: (StatusType.ERROR, 'current error'),
11: (StatusType.ERROR, 'probably quenched'),
15: (StatusType.ERROR, 'general failure'),
}
channel = 'field'
@ -563,33 +548,33 @@ class Field(PpmsDrivable):
def update_value_status(self, value, packed_status):
if value is None:
self.status = (self.Status.ERROR, 'invalid value')
self.status = (StatusType.ERROR, 'invalid value')
return
self.value = round(value * 1e-4, 7)
status_code = (packed_status >> 4) & 0xf
status = self.STATUS_MAP.get(status_code, (self.Status.ERROR, 'unknown status code %d' % status_code))
status = self.STATUS_MAP.get(status_code, (StatusType.ERROR, 'unknown status code %d' % status_code))
now = time.time()
if self._last_change: # there was a change, which is not yet confirmed by hw
if status_code == 1: # persistent mode
# leads are ramping (ppms has no extra status code for this!)
if now < self._last_change + 30:
status = (self.Status.PREPARING, 'ramping leads')
status = (StatusType.PREPARING, 'ramping leads')
else:
status = (self.Status.WARN, 'timeout when ramping leads')
status = (StatusType.WARN, 'timeout when ramping leads')
elif now > self._last_change + 5:
self._last_change = 0 # give up waiting for driving
elif self.isDriving(status) and status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
else:
status = (self.Status.BUSY, 'changed target')
status = (StatusType.BUSY, 'changed target')
if abs(self.target - self.value) <= 1e-4:
self._last_target = self.target
elif self._last_target is None:
self._last_target = self.value
if self._stopped:
# combine 'stopped' with current status text
if status[0] == self.Status.IDLE:
if status[0] == StatusType.IDLE:
status = (status[0], 'stopped')
else:
status = (status[0], 'stopping (%s)' % status[1])
@ -602,7 +587,7 @@ class Field(PpmsDrivable):
self._status_before_change = self.status
self._stopped = False
self._last_change = time.time()
self.status = (self.Status.BUSY, 'changed target')
self.status = (StatusType.BUSY, 'changed target')
self._write_params(target, self.ramp, self.approachmode, self.persistentmode)
return self.target
@ -613,7 +598,7 @@ class Field(PpmsDrivable):
self._last_change = time.time()
self._status_before_change = self.status
self._stopped = False
self.status = (self.Status.BUSY, 'changed persistent mode')
self.status = (StatusType.BUSY, 'changed persistent mode')
self._write_params(self.target, self.ramp, self.approachmode, mode)
return self.persistentmode
@ -642,8 +627,6 @@ class Field(PpmsDrivable):
class Position(PpmsDrivable):
"""rotator position"""
Status = Drivable.Status
value = Parameter(datatype=FloatRange(unit='deg'))
target = Parameter(datatype=FloatRange(-720., 720., unit='deg'))
enabled = Parameter('is this channel used?', readonly=False,
@ -651,11 +634,11 @@ class Position(PpmsDrivable):
speed = Parameter('motor speed', readonly=False, default=12,
datatype=FloatRange(0.8, 12, unit='deg/sec'))
STATUS_MAP = {
1: (Status.IDLE, 'at target'),
5: (Status.BUSY, 'moving'),
8: (Status.IDLE, 'at limit'),
9: (Status.IDLE, 'at index'),
15: (Status.ERROR, 'general failure'),
1: (StatusType.IDLE, 'at target'),
5: (StatusType.BUSY, 'moving'),
8: (StatusType.IDLE, 'at limit'),
9: (StatusType.IDLE, 'at index'),
15: (StatusType.ERROR, 'general failure'),
}
channel = 'position'
@ -683,14 +666,14 @@ class Position(PpmsDrivable):
def update_value_status(self, value, packed_status):
if not self.enabled:
self.status = (self.Status.DISABLED, 'disabled')
self.status = (StatusType.DISABLED, 'disabled')
return
if value is None:
self.status = (self.Status.ERROR, 'invalid value')
self.status = (StatusType.ERROR, 'invalid value')
return
self.value = value
status_code = (packed_status >> 12) & 0xf
status = self.STATUS_MAP.get(status_code, (self.Status.ERROR, 'unknown status code %d' % status_code))
status = self.STATUS_MAP.get(status_code, (StatusType.ERROR, 'unknown status code %d' % status_code))
if self._last_change: # there was a change, which is not yet confirmed by hw
now = time.time()
if now > self._last_change + 5:
@ -699,20 +682,20 @@ class Position(PpmsDrivable):
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
self._last_change = 0
else:
status = (self.Status.BUSY, 'changed target')
status = (StatusType.BUSY, 'changed target')
# BUSY can not reliably be determined from the status code, we have to do it on our own
if abs(value - self.target) < 0.1:
self._last_target = self.target
if not self._within_target:
self._within_target = time.time()
if time.time() > self._within_target + 1:
if status[0] != self.Status.IDLE:
status = (self.Status.IDLE, status[1])
elif status[0] != self.Status.BUSY:
status = (self.Status.BUSY, status[1])
if status[0] != StatusType.IDLE:
status = (StatusType.IDLE, status[1])
elif status[0] != StatusType.BUSY:
status = (StatusType.BUSY, status[1])
if self._stopped:
# combine 'stopped' with current status text
if status[0] == self.Status.IDLE:
if status[0] == StatusType.IDLE:
status = (status[0], 'stopped')
else:
status = (status[0], 'stopping (%s)' % status[1])
@ -722,7 +705,7 @@ class Position(PpmsDrivable):
self._stopped = False
self._last_change = 0
self._status_before_change = self.status
self.status = (self.Status.BUSY, 'changed target')
self.status = (StatusType.BUSY, 'changed target')
self._write_params(target, self.speed)
return self.target

View File

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.core import Readable, Drivable, StatusType
from frappy_ess.epics import EpicsReadable, EpicsDrivable
readable_codes = {m.name: m.value for m in Readable.Status.members}
drivable_codes = {m.name: m.value for m in Drivable.Status.members}
def test_entangle_status():
try:
# pylint: disable=import-outside-toplevel
# pylint: disable=unused-import
import PyTango
except ImportError:
return
# pylint: disable=import-outside-toplevel
from frappy_mlz.entangle import AnalogInput, AnalogOutput, TemperatureController
assert AnalogInput.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
**readable_codes},
}
assert AnalogOutput.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
"UNSTABLE": StatusType.UNSTABLE,
**drivable_codes},
}
assert TemperatureController.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
"DISABLED": StatusType.DISABLED,
"UNSTABLE": StatusType.UNSTABLE,
**drivable_codes},
}
def test_epics_status():
assert EpicsReadable.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
**readable_codes},
}
assert EpicsDrivable.status.datatype.members[0].export_datatype() == {
"type": "enum",
"members": {"UNKNOWN": StatusType.UNKNOWN,
**drivable_codes},
}