migrated secop_psi drivers to new syntax

- includes all changes up to 'fix inheritance order' from git_mlz
  6a32ecf342

Change-Id: Ie3ceee3dbd0a9284b47b1d5b5dbe262eebe8f283
This commit is contained in:
2021-02-24 16:15:23 +01:00
parent bc5edec06f
commit 41baf5805f
79 changed files with 2610 additions and 3952 deletions

View File

@ -30,18 +30,17 @@ MLZ TANGO interface for the respective device classes.
import re
import threading
from time import time as currenttime
from time import sleep
from time import time as currenttime
import PyTango
from secop.datatypes import ArrayOf, EnumType, \
FloatRange, IntRange, StringType, TupleOf, LimitsType
from secop.datatypes import ArrayOf, EnumType, FloatRange, \
IntRange, LimitsType, StringType, TupleOf
from secop.errors import CommunicationFailedError, \
ConfigError, HardwareError, ProgrammingError
from secop.lib import lazy_property
from secop.modules import Command, Drivable, \
Module, Override, Parameter, Readable, BasicPoller
from secop.modules import BasicPoller, Command, \
Drivable, Module, Parameter, Readable
#####
@ -160,24 +159,18 @@ class PyTangoDevice(Module):
pollerClass = BasicPoller
parameters = {
'comtries': Parameter('Maximum retries for communication',
datatype=IntRange(1, 100), default=3, readonly=False,
group='communication'),
'comdelay': Parameter('Delay between retries', datatype=FloatRange(0),
unit='s', default=0.1, readonly=False,
group='communication'),
'tangodevice': Parameter('Tango device name',
datatype=StringType(), readonly=True,
# export=True, # for testing only
export=False,
),
}
commands = {
'reset': Command('Tango reset command', argument=None, result=None),
}
# parameters
comtries = Parameter('Maximum retries for communication',
datatype=IntRange(1, 100), default=3, readonly=False,
group='communication')
comdelay = Parameter('Delay between retries', datatype=FloatRange(0),
unit='s', default=0.1, readonly=False,
group='communication')
tangodevice = Parameter('Tango device name',
datatype=StringType(), readonly=True,
# export=True, # for testing only
export=False,
)
tango_status_mapping = {
PyTango.DevState.ON: Drivable.Status.IDLE,
@ -372,7 +365,9 @@ class PyTangoDevice(Module):
return (myState, tangoStatus)
def do_reset(self):
@Command(argument=None, result=None)
def reset(self):
"""Tango reset command"""
self._dev.Reset()
@ -405,13 +400,9 @@ class Sensor(AnalogInput):
# note: we don't transport the formula to secop....
# we support the adjust method
commands = {
'setposition': Command('Set the position to the given value.',
argument=FloatRange(), result=None,
),
}
def do_setposition(self, value):
@Command(argument=FloatRange(), result=None)
def setposition(self, value):
"""Set the position to the given value."""
self._dev.Adjust(value)
@ -427,29 +418,29 @@ class AnalogOutput(PyTangoDevice, Drivable):
controllers, ...
"""
parameters = {
'userlimits': Parameter('User defined limits of device value',
datatype=LimitsType(FloatRange(unit='$')),
default=(float('-Inf'), float('+Inf')),
readonly=False, poll=10,
),
'abslimits': Parameter('Absolute limits of device value',
# parameters
userlimits = Parameter('User defined limits of device value',
datatype=LimitsType(FloatRange(unit='$')),
),
'precision': Parameter('Precision of the device value (allowed deviation '
'of stable values from target)',
datatype=FloatRange(1e-38, unit='$'),
readonly=False, group='stability',
),
'window': Parameter('Time window for checking stabilization if > 0',
default=60.0, readonly=False,
datatype=FloatRange(0, 900, unit='s'), group='stability',
),
'timeout': Parameter('Timeout for waiting for a stable value (if > 0)',
default=60.0, readonly=False,
datatype=FloatRange(0, 900, unit='s'), group='stability',
),
}
default=(float('-Inf'), float('+Inf')),
readonly=False, poll=10,
)
abslimits = Parameter('Absolute limits of device value',
datatype=LimitsType(FloatRange(unit='$')),
)
precision = Parameter('Precision of the device value (allowed deviation '
'of stable values from target)',
datatype=FloatRange(1e-38, unit='$'),
readonly=False, group='stability',
)
window = Parameter('Time window for checking stabilization if > 0',
default=60.0, readonly=False,
datatype=FloatRange(0, 900, unit='s'), group='stability',
)
timeout = Parameter('Timeout for waiting for a stable value (if > 0)',
default=60.0, readonly=False,
datatype=FloatRange(0, 900, unit='s'), group='stability',
)
_history = ()
_timeout = None
_moving = False
@ -566,7 +557,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
if self.status[0] == self.Status.BUSY:
# changing target value during movement is not allowed by the
# Tango base class state machine. If we are moving, stop first.
self.do_stop()
self.stop()
self._hw_wait()
self._dev.value = value
# set meaningful timeout
@ -587,7 +578,7 @@ class AnalogOutput(PyTangoDevice, Drivable):
while super(AnalogOutput, self).read_status()[0] == self.Status.BUSY:
sleep(0.3)
def do_stop(self):
def stop(self):
self._dev.Stop()
@ -601,21 +592,14 @@ class Actuator(AnalogOutput):
"""
# for secop: support the speed and ramp parameters
parameters = {
'speed': Parameter('The speed of changing the value',
readonly=False, datatype=FloatRange(0, unit='$/s'),
),
'ramp': Parameter('The speed of changing the value',
readonly=False, datatype=FloatRange(0, unit='$/s'),
poll=30,
),
}
commands = {
'setposition': Command('Set the position to the given value.',
argument=FloatRange(), result=None,
),
}
# parameters
speed = Parameter('The speed of changing the value',
readonly=False, datatype=FloatRange(0, unit='$/s'),
)
ramp = Parameter('The speed of changing the value',
readonly=False, datatype=FloatRange(0, unit='$/s'),
poll=30,
)
def read_speed(self):
return self._dev.speed
@ -630,7 +614,9 @@ class Actuator(AnalogOutput):
self.write_speed(value / 60.)
return self.read_speed() * 60
def do_setposition(self, value=FloatRange()):
@Command(FloatRange(), result=None)
def setposition(self, value=FloatRange()):
"""Set the position to the given value."""
self._dev.Adjust(value)
@ -641,21 +627,16 @@ class Motor(Actuator):
It has the ability to move a real object from one place to another place.
"""
parameters = {
'refpos': Parameter('Reference position',
datatype=FloatRange(unit='$'),
),
'accel': Parameter('Acceleration',
datatype=FloatRange(unit='$/s^2'), readonly=False,
),
'decel': Parameter('Deceleration',
datatype=FloatRange(unit='$/s^2'), readonly=False,
),
}
commands = {
'reference': Command('Do a reference run', argument=None, result=None),
}
# parameters
refpos = Parameter('Reference position',
datatype=FloatRange(unit='$'),
)
accel = Parameter('Acceleration',
datatype=FloatRange(unit='$/s^2'), readonly=False,
)
decel = Parameter('Deceleration',
datatype=FloatRange(unit='$/s^2'), readonly=False,
)
def read_refpos(self):
return float(self._getProperty('refpos'))
@ -672,7 +653,9 @@ class Motor(Actuator):
def write_decel(self, value):
self._dev.decel = value
def do_reference(self):
@Command()
def reference(self):
"""Do a reference run"""
self._dev.Reference()
return self.read_value()
@ -681,32 +664,29 @@ class TemperatureController(Actuator):
"""A temperature control loop device.
"""
parameters = {
'p': Parameter('Proportional control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
),
'i': Parameter('Integral control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
),
'd': Parameter('Derivative control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
),
'pid': Parameter('pid control Parameters',
datatype=TupleOf(FloatRange(), FloatRange(), FloatRange()),
readonly=False, group='pid', poll=30,
),
'setpoint': Parameter('Current setpoint', datatype=FloatRange(unit='$'), poll=1,
),
'heateroutput': Parameter('Heater output', datatype=FloatRange(), poll=1,
),
}
# parameters
# pylint: disable=invalid-name
p = Parameter('Proportional control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
)
i = Parameter('Integral control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
)
d = Parameter('Derivative control Parameter', datatype=FloatRange(),
readonly=False, group='pid',
)
pid = Parameter('pid control Parameters',
datatype=TupleOf(FloatRange(), FloatRange(), FloatRange()),
readonly=False, group='pid', poll=30,
)
setpoint = Parameter('Current setpoint', datatype=FloatRange(unit='$'), poll=1,
)
heateroutput = Parameter('Heater output', datatype=FloatRange(), poll=1,
)
overrides = {
# We want this to be freely user-settable, and not produce a warning
# on startup, so select a usually sensible default.
'precision': Override(default=0.1),
'ramp': Override(description='Temperature ramp'),
}
# overrides
precision = Parameter(default=0.1)
ramp = Parameter(description='Temperature ramp')
def read_ramp(self):
return self._dev.ramp
@ -755,15 +735,14 @@ class PowerSupply(Actuator):
"""A power supply (voltage and current) device.
"""
parameters = {
'voltage': Parameter('Actual voltage',
datatype=FloatRange(unit='V'), poll=-5),
'current': Parameter('Actual current',
datatype=FloatRange(unit='A'), poll=-5),
}
overrides = {
'ramp': Override(description='Current/voltage ramp'),
}
# parameters
voltage = Parameter('Actual voltage',
datatype=FloatRange(unit='V'), poll=-5)
current = Parameter('Actual current',
datatype=FloatRange(unit='A'), poll=-5)
# overrides
ramp = Parameter(description='Current/voltage ramp')
def read_ramp(self):
return self._dev.ramp
@ -782,9 +761,8 @@ class DigitalInput(PyTangoDevice, Readable):
"""A device reading a bitfield.
"""
overrides = {
'value': Override(datatype=IntRange()),
}
# overrides
value = Parameter(datatype=IntRange())
def read_value(self):
return self._dev.value
@ -794,10 +772,9 @@ class NamedDigitalInput(DigitalInput):
"""A DigitalInput with numeric values mapped to names.
"""
parameters = {
'mapping': Parameter('A dictionary mapping state names to integers',
datatype=StringType(), export=False), # XXX:!!!
}
# parameters
mapping = Parameter('A dictionary mapping state names to integers',
datatype=StringType(), export=False) # XXX:!!!
def initModule(self):
super(NamedDigitalInput, self).initModule()
@ -821,12 +798,11 @@ class PartialDigitalInput(NamedDigitalInput):
bit width accessed.
"""
parameters = {
'startbit': Parameter('Number of the first bit',
datatype=IntRange(0), default=0),
'bitwidth': Parameter('Number of bits',
datatype=IntRange(0), default=1),
}
# parameters
startbit = Parameter('Number of the first bit',
datatype=IntRange(0), default=0)
bitwidth = Parameter('Number of bits',
datatype=IntRange(0), default=1)
def initModule(self):
super(PartialDigitalInput, self).initModule()
@ -844,10 +820,9 @@ class DigitalOutput(PyTangoDevice, Drivable):
bitfield.
"""
overrides = {
'value': Override(datatype=IntRange()),
'target': Override(datatype=IntRange()),
}
# overrides
value = Parameter(datatype=IntRange())
target = Parameter(datatype=IntRange())
def read_value(self):
return self._dev.value # mapping is done by datatype upon export()
@ -865,10 +840,9 @@ class NamedDigitalOutput(DigitalOutput):
"""A DigitalOutput with numeric values mapped to names.
"""
parameters = {
'mapping': Parameter('A dictionary mapping state names to integers',
datatype=StringType(), export=False),
}
# parameters
mapping = Parameter('A dictionary mapping state names to integers',
datatype=StringType(), export=False)
def initModule(self):
super(NamedDigitalOutput, self).initModule()
@ -894,12 +868,11 @@ class PartialDigitalOutput(NamedDigitalOutput):
bit width accessed.
"""
parameters = {
'startbit': Parameter('Number of the first bit',
datatype=IntRange(0), default=0),
'bitwidth': Parameter('Number of bits',
datatype=IntRange(0), default=1),
}
# parameters
startbit = Parameter('Number of the first bit',
datatype=IntRange(0), default=0)
bitwidth = Parameter('Number of bits',
datatype=IntRange(0), default=1)
def initModule(self):
super(PartialDigitalOutput, self).initModule()
@ -925,17 +898,16 @@ class StringIO(PyTangoDevice, Module):
receives strings.
"""
parameters = {
'bustimeout': Parameter('Communication timeout',
datatype=FloatRange(unit='s'), readonly=False,
group='communication'),
'endofline': Parameter('End of line',
datatype=StringType(), readonly=False,
group='communication'),
'startofline': Parameter('Start of line',
datatype=StringType(), readonly=False,
group='communication'),
}
# parameters
bustimeout = Parameter('Communication timeout',
datatype=FloatRange(unit='s'), readonly=False,
group='communication')
endofline = Parameter('End of line',
datatype=StringType(), readonly=False,
group='communication')
startofline = Parameter('Start of line',
datatype=StringType(), readonly=False,
group='communication')
def read_bustimeout(self):
return self._dev.communicationTimeout
@ -955,53 +927,48 @@ class StringIO(PyTangoDevice, Module):
def write_startofline(self, value):
self._dev.startOfLine = value
commands = {
'communicate': Command('Send a string and return the reply',
argument=StringType(),
result=StringType()),
'flush': Command('Flush output buffer',
argument=None, result=None),
'read': Command('read some characters from input buffer',
argument=IntRange(0), result=StringType()),
'write': Command('write some chars to output',
argument=StringType(), result=None),
'readLine': Command('Read sol - a whole line - eol',
argument=None, result=StringType()),
'writeLine': Command('write sol + a whole line + eol',
argument=StringType(), result=None),
'availableChars': Command('return number of chars in input buffer',
argument=None, result=IntRange(0)),
'availableLines': Command('return number of lines in input buffer',
argument=None, result=IntRange(0)),
'multiCommunicate': Command('perform a sequence of communications',
argument=ArrayOf(
TupleOf(StringType(), IntRange()), 100),
result=ArrayOf(StringType(), 100)),
}
def do_communicate(self, value=StringType()):
@Command(argument=StringType(), result=StringType())
def communicate(self, value=StringType()):
"""Send a string and return the reply"""
return self._dev.Communicate(value)
def do_flush(self):
@Command(argument=None, result=None)
def flush(self):
"""Flush output buffer"""
self._dev.Flush()
def do_read(self, value):
@Command(argument=IntRange(0), result=StringType())
def read(self, value):
"""read some characters from input buffer"""
return self._dev.Read(value)
def do_write(self, value):
@Command(argument=StringType(), result=None)
def write(self, value):
"""write some chars to output"""
return self._dev.Write(value)
def do_readLine(self):
@Command(argument=None, result=StringType())
def readLine(self):
"""Read sol - a whole line - eol"""
return self._dev.ReadLine()
def do_writeLine(self, value):
@Command(argument=StringType(), result=None)
def writeLine(self, value):
"""write sol + a whole line + eol"""
return self._dev.WriteLine(value)
def do_multiCommunicate(self, value):
@Command(argument=ArrayOf(TupleOf(StringType(), IntRange()), 100),
result=ArrayOf(StringType(), 100))
def multiCommunicate(self, value):
"""perform a sequence of communications"""
return self._dev.MultiCommunicate(value)
def do_availableChars(self):
@Command(argument=None, result=IntRange(0))
def availableChars(self):
"""return number of chars in input buffer"""
return self._dev.availableChars
def do_availableLines(self):
@Command(argument=None, result=IntRange(0))
def availableLines(self):
"""return number of lines in input buffer"""
return self._dev.availableLines