provide setup for MLZ_Amagnet to be used @PSI soon
Also implement lots of fixes and improvements. fixes: #3381 Change-Id: Ibe6664da00756ae5813b90f190295045808b2ff0
This commit is contained in:
328
secop_mlz/amagnet.py
Normal file
328
secop_mlz/amagnet.py
Normal file
@ -0,0 +1,328 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# *****************************************************************************
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify it under
|
||||
# the terms of the GNU General Public License as published by the Free Software
|
||||
# Foundation; either version 2 of the License, or (at your option) any later
|
||||
# version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
||||
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along with
|
||||
# this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
# Module authors:
|
||||
# Enrico Faulhaber <enrico.faulhaber@frm2.tum.de>
|
||||
#
|
||||
# *****************************************************************************
|
||||
|
||||
"""
|
||||
Supporting classes for FRM2 magnets, currently only Garfield (amagnet).
|
||||
"""
|
||||
|
||||
# partially borrowed from nicos
|
||||
|
||||
import math
|
||||
|
||||
from secop.lib import lazy_property, mkthread
|
||||
from secop.lib.sequence import SequencerMixin, Step
|
||||
from secop.protocol import status
|
||||
from secop.datatypes import *
|
||||
from secop.errors import SECoPServerError, ConfigError, ProgrammingError, CommunicationError, HardwareError, DisabledError
|
||||
from secop.modules import PARAM, CMD, OVERRIDE, Device, Readable, Driveable
|
||||
|
||||
|
||||
class GarfieldMagnet(SequencerMixin, Driveable):
|
||||
"""Garfield Magnet
|
||||
|
||||
uses a polarity switch ('+' or '-') to flip polarity and an onoff switch
|
||||
to cut power (to be able to switch polarity) in addition to an
|
||||
unipolar current source.
|
||||
|
||||
B(I) = Ic0 + c1*erf(c2*I) + c3*atan(c4*I)
|
||||
|
||||
Coefficients c0..c4 are given as 'calibration_table' parameter,
|
||||
the symmetry setting selects which.
|
||||
"""
|
||||
|
||||
PARAMS = {
|
||||
'subdev_currentsource': PARAM('(bipolar) Powersupply', datatype=StringType(), readonly=True, export=False),
|
||||
'subdev_enable': PARAM('Switch to set for on/off', datatype=StringType(), readonly=True, export=False),
|
||||
'subdev_polswitch': PARAM('Switch to set for polarity', datatype=StringType(), readonly=True, export=False),
|
||||
'subdev_symmetry': PARAM('Switch to read for symmetry', datatype=StringType(), readonly=True, export=False),
|
||||
'userlimits': PARAM('User defined limits of device value',
|
||||
unit='main', datatype=TupleOf(FloatRange(), FloatRange()),
|
||||
default=(float('-Inf'), float('+Inf')), readonly=False, poll=10),
|
||||
'abslimits': PARAM('Absolute limits of device value',
|
||||
unit='main', datatype=TupleOf(FloatRange(), FloatRange()),
|
||||
default=(-0.5, 0.5), poll=True,
|
||||
),
|
||||
'precision': PARAM('Precision of the device value (allowed deviation '
|
||||
'of stable values from target)',
|
||||
unit='main', datatype=FloatRange(0.001), default=0.001, readonly=False,
|
||||
),
|
||||
'ramp': PARAM('Target rate of field change per minute', readonly=False,
|
||||
unit='main/min', datatype=FloatRange(), default=1.0),
|
||||
'calibration': PARAM('Coefficients for calibration '
|
||||
'function: [c0, c1, c2, c3, c4] calculates '
|
||||
'B(I) = c0*I + c1*erf(c2*I) + c3*atan(c4*I)'
|
||||
' in T', poll=1,
|
||||
datatype=ArrayOf(FloatRange(), 5, 5),
|
||||
default=(1.0, 0.0, 0.0, 0.0, 0.0)),
|
||||
'calibrationtable': PARAM('Map of Coefficients for calibration per symmetry setting',
|
||||
datatype=StructOf(symmetric=ArrayOf(FloatRange(), 5, 5),
|
||||
short=ArrayOf(FloatRange(), 5, 5),
|
||||
asymmetric=ArrayOf(FloatRange(), 5, 5)), export=False),
|
||||
}
|
||||
|
||||
def _current2field(self, current, *coefficients):
|
||||
"""Return field in T for given current in A.
|
||||
|
||||
Should be monotonic and asymetric or _field2current will fail!
|
||||
|
||||
Note: This may be overridden in derived classes.
|
||||
"""
|
||||
v = coefficients or self.calibration
|
||||
if len(v) != 5:
|
||||
self.log.warning('Wrong number of coefficients in calibration '
|
||||
'data! Need exactly 5 coefficients!')
|
||||
return current * v[0] + v[1] * math.erf(v[2] * current) + \
|
||||
v[3] * math.atan(v[4] * current)
|
||||
|
||||
def _field2current(self, field):
|
||||
"""Return required current in A for requested field in T.
|
||||
|
||||
Default implementation does a binary search using _current2field,
|
||||
which must be monotonic for this to work!
|
||||
|
||||
Note: This may be overridden in derived classes.
|
||||
"""
|
||||
# binary search/bisection
|
||||
maxcurr = self._currentsource.abslimits[1]
|
||||
mincurr = -maxcurr
|
||||
maxfield = self._current2field(maxcurr)
|
||||
minfield = -maxfield
|
||||
if not minfield <= field <= maxfield:
|
||||
raise ValueError(self,
|
||||
'requested field %g T out of range %g..%g T' %
|
||||
(field, minfield, maxfield))
|
||||
while minfield <= field <= maxfield:
|
||||
# binary search
|
||||
trycurr = 0.5 * (mincurr + maxcurr)
|
||||
tryfield = self._current2field(trycurr)
|
||||
if field == tryfield:
|
||||
self.log.debug('current for %g T is %g A', field, trycurr)
|
||||
return trycurr # Gotcha!
|
||||
elif field > tryfield:
|
||||
# retry upper interval
|
||||
mincurr = trycurr
|
||||
minfield = tryfield
|
||||
else:
|
||||
# retry lower interval
|
||||
maxcurr = trycurr
|
||||
maxfield = tryfield
|
||||
# if interval is so small, that any error within is acceptable:
|
||||
if maxfield - minfield < 1e-4:
|
||||
ratio = (field - minfield) / (maxfield - minfield)
|
||||
trycurr = (maxcurr - mincurr) * ratio + mincurr
|
||||
self.log.debug('current for %g T is %g A', field, trycurr)
|
||||
return trycurr # interpolated
|
||||
raise ConfigurationError(self,
|
||||
'_current2field polynome not monotonic!')
|
||||
|
||||
def init(self):
|
||||
super(GarfieldMagnet, self).init()
|
||||
self._enable = self.DISPATCHER.get_module(self.subdev_enable)
|
||||
self._symmetry = self.DISPATCHER.get_module(self.subdev_symmetry)
|
||||
self._polswitch = self.DISPATCHER.get_module(self.subdev_polswitch)
|
||||
self._currentsource = self.DISPATCHER.get_module(
|
||||
self.subdev_currentsource)
|
||||
self.init_sequencer(fault_on_error=False, fault_on_stop=False)
|
||||
self._symmetry.read_value(0)
|
||||
|
||||
def read_calibration(self, maxage=0):
|
||||
try:
|
||||
return self.calibrationtable[self._symmetry.value]
|
||||
except KeyError:
|
||||
minslope = min(entry[0]
|
||||
for entry in self.calibrationtable.values())
|
||||
self.log.error(
|
||||
'unconfigured calibration for symmetry %r' %
|
||||
self._symmetry.value)
|
||||
return [minslope, 0, 0, 0, 0]
|
||||
|
||||
def _checkLimits(self, limits):
|
||||
umin, umax = limits
|
||||
amin, amax = self.abslimits
|
||||
if umin > umax:
|
||||
raise ValueError(
|
||||
self, 'user minimum (%s) above the user '
|
||||
'maximum (%s)' % (umin, umax))
|
||||
if umin < amin - abs(amin * 1e-12):
|
||||
umin = amin
|
||||
if umax > amax + abs(amax * 1e-12):
|
||||
umax = amax
|
||||
return (umin, umax)
|
||||
|
||||
def write_userlimits(self, value):
|
||||
limits = self._checkLimits(value)
|
||||
return limits
|
||||
|
||||
def read_abslimits(self, maxage=0):
|
||||
maxfield = self._current2field(self._currentsource.abslimits[1])
|
||||
# limit to configured value (if any)
|
||||
maxfield = min(maxfield, max(self.PARAMS['abslimits'].default))
|
||||
return -maxfield, maxfield
|
||||
|
||||
def read_ramp(self, maxage=0):
|
||||
# This is an approximation!
|
||||
return self.calibration[0] * abs(self._currentsource.ramp)
|
||||
|
||||
def write_ramp(self, newramp):
|
||||
# This is an approximation!
|
||||
self._currentsource.ramp = newramp / self.calibration[0]
|
||||
|
||||
def _get_field_polarity(self):
|
||||
sign = int(self._polswitch.read_value())
|
||||
if self._enable.read_value():
|
||||
return sign
|
||||
return 0
|
||||
|
||||
def _set_field_polarity(self, polarity):
|
||||
current_pol = self._get_field_polarity()
|
||||
if current_pol == polarity:
|
||||
return
|
||||
if polarity == 0:
|
||||
return
|
||||
if current_pol == 0:
|
||||
# safe to switch
|
||||
self._polswitch.write_target(
|
||||
'+1' if polarity == 1 else str(polarity))
|
||||
return 0
|
||||
if self._currentsource.value < 0.1:
|
||||
self._polswitch.write_target('0')
|
||||
return current_pol
|
||||
# unsafe to switch, go to safe state first
|
||||
self._currentsource.write_target(0)
|
||||
|
||||
def read_value(self, maxage=0):
|
||||
return self._current2field(
|
||||
self._currentsource.read_value(maxage) *
|
||||
self._get_field_polarity())
|
||||
|
||||
def read_hw_status(self, maxage=0):
|
||||
# called from SequencerMixin.read_status if no sequence is running
|
||||
if self._enable.value == 'Off':
|
||||
return status.WARN, 'Disabled'
|
||||
if self._enable.read_status(maxage)[0] != status.OK:
|
||||
return self._enable.status
|
||||
if self._polswitch.value in ['0', 0]:
|
||||
return self._currentsource.status[
|
||||
0], 'Shorted, ' + self._currentsource.status[1]
|
||||
if self._symmetry.value in ['short', 0]:
|
||||
return self._currentsource.status[
|
||||
0], 'Shorted, ' + self._currentsource.status[1]
|
||||
return self._currentsource.read_status(maxage)
|
||||
|
||||
def write_target(self, target):
|
||||
if target != 0 and self._symmetry.read_value(0) in ['short', 0]:
|
||||
raise DisabledError(
|
||||
'Symmetry is shorted, please select another symmetry first!')
|
||||
|
||||
wanted_current = self._field2current(abs(target))
|
||||
wanted_polarity = '-1' if target < 0 else ('+1' if target else '0')
|
||||
current_polarity = self._get_field_polarity()
|
||||
|
||||
# generate Step sequence and start it
|
||||
seq = []
|
||||
seq.append(Step('preparing', 0, self._prepare_ramp))
|
||||
seq.append(Step('recover', 0, self._recover))
|
||||
if current_polarity != wanted_polarity:
|
||||
if self._currentsource.read_value(0) > 0.1:
|
||||
# switching only allowed if current is low enough -> ramp down
|
||||
# first
|
||||
seq.append(
|
||||
Step(
|
||||
'ramping down',
|
||||
0.3,
|
||||
self._ramp_current,
|
||||
0,
|
||||
cleanup=self._ramp_current_cleanup))
|
||||
seq.append(
|
||||
Step(
|
||||
'set polarity %s' %
|
||||
wanted_polarity,
|
||||
0.3,
|
||||
self._set_polarity,
|
||||
wanted_polarity)) # no cleanup
|
||||
seq.append(
|
||||
Step(
|
||||
'ramping to %.3fT (%.2fA)' %
|
||||
(target,
|
||||
wanted_current),
|
||||
0.3,
|
||||
self._ramp_current,
|
||||
wanted_current,
|
||||
cleanup=self._ramp_current_cleanup))
|
||||
seq.append(Step('finalize', 0, self._finish_ramp))
|
||||
|
||||
self.start_sequence(seq)
|
||||
self.status = 'BUSY', 'ramping'
|
||||
|
||||
# steps for the sequencing
|
||||
def _prepare_ramp(self, store, *args):
|
||||
store.old_window = self._currentsource.window
|
||||
self._currentsource.window = 1
|
||||
|
||||
def _finish_ramp(self, store, *args):
|
||||
self._currentsource.window = max(store.old_window, 10)
|
||||
|
||||
def _recover(self, store):
|
||||
# check for interlock
|
||||
if self._currentsource.read_status(0)[0] != status.ERROR:
|
||||
return
|
||||
# recover from interlock
|
||||
ramp = self._currentsource.ramp
|
||||
self._polswitch.write_target('0') # short is safe...
|
||||
self._polswitch._hw_wait()
|
||||
self._enable.write_target('On') # else setting ramp won't work
|
||||
self._enable._hw_wait()
|
||||
self._currentsource.ramp = 60000
|
||||
self._currentsource.target = 0
|
||||
self._currentsource.ramp = ramp
|
||||
# safe state.... if anything of the above fails, the tamperatures may
|
||||
# be too hot!
|
||||
|
||||
def _ramp_current(self, store, target):
|
||||
if abs(self._currentsource.value - target) <= 0.05:
|
||||
# done with this step if no longer BUSY
|
||||
return self._currentsource.read_status(0)[0] == 'BUSY'
|
||||
if self._currentsource.status[0] != 'BUSY':
|
||||
if self._enable.status[0] == 'ERROR':
|
||||
self._enable.do_reset()
|
||||
self._enable.read_status(0)
|
||||
self._enable.write_target('On')
|
||||
self._enable._hw_wait()
|
||||
self._currentsource.write_target(target)
|
||||
return True # repeat
|
||||
|
||||
def _ramp_current_cleanup(self, store, step_was_busy, target):
|
||||
# don't cleanup if step finished
|
||||
if step_was_busy:
|
||||
self._currentsource.write_target(self._currentsource.read_value(0))
|
||||
self._currentsource.window = max(store.old_window, 10)
|
||||
|
||||
def _set_polarity(self, store, target):
|
||||
if self._polswitch.read_status(0)[0] == status.BUSY:
|
||||
return True
|
||||
if self._polswitch.value == target:
|
||||
return False # done with this step
|
||||
if self._polswitch.read_value(0) != 0:
|
||||
self._polswitch.write_target(0)
|
||||
else:
|
||||
self._polswitch.write_target(target)
|
||||
return True # repeat
|
@ -164,7 +164,7 @@ class PyTangoDevice(Device):
|
||||
|
||||
'tangodevice': PARAM('Tango device name',
|
||||
datatype=StringType(), readonly=True,
|
||||
# export=True, # for testing only
|
||||
# export=True, # for testing only
|
||||
export=False,
|
||||
),
|
||||
}
|
||||
@ -215,8 +215,8 @@ class PyTangoDevice(Device):
|
||||
|
||||
def _hw_wait(self):
|
||||
"""Wait until hardware status is not BUSY."""
|
||||
while PyTangoDevice.doStatus(self, 0)[0] == status.BUSY:
|
||||
sleep(self._base_loop_delay)
|
||||
while self.read_status(0)[0] == 'BUSY':
|
||||
sleep(0.3)
|
||||
|
||||
def _getProperty(self, name, dev=None):
|
||||
"""
|
||||
@ -366,8 +366,8 @@ class AnalogInput(PyTangoDevice, Readable):
|
||||
The AnalogInput handles all devices only delivering an analogue value.
|
||||
"""
|
||||
|
||||
def init(self):
|
||||
super(AnalogInput, self).init()
|
||||
def late_init(self):
|
||||
super(AnalogInput, self).late_init()
|
||||
# query unit from tango and update value property
|
||||
attrInfo = self._dev.attribute_query('value')
|
||||
# prefer configured unit if nothing is set on the Tango device, else
|
||||
@ -442,27 +442,15 @@ class AnalogOutput(PyTangoDevice, Driveable):
|
||||
900),
|
||||
readonly=False,
|
||||
),
|
||||
'pollinterval': PARAM(
|
||||
'[min, max] sleeptime between polls',
|
||||
default=[
|
||||
0.5,
|
||||
5],
|
||||
readonly=False,
|
||||
datatype=TupleOf(
|
||||
FloatRange(
|
||||
0,
|
||||
20),
|
||||
FloatRange(
|
||||
0.1,
|
||||
120)),
|
||||
),
|
||||
}
|
||||
OVERRIDES = {
|
||||
'value': OVERRIDE(poll=False),
|
||||
}
|
||||
|
||||
def init(self):
|
||||
super(AnalogInput, self).init()
|
||||
super(AnalogOutput, self).init()
|
||||
# init history
|
||||
self._history = [] # will keep (timestamp, value) tuple
|
||||
|
||||
def late_init(self):
|
||||
super(AnalogOutput, self).late_init()
|
||||
# query unit from tango and update value property
|
||||
attrInfo = self._dev.attribute_query('value')
|
||||
# prefer configured unit if nothing is set on the Tango device, else
|
||||
@ -470,30 +458,14 @@ class AnalogOutput(PyTangoDevice, Driveable):
|
||||
if attrInfo.unit != 'No unit':
|
||||
self.PARAMS['value'].unit = attrInfo.unit
|
||||
|
||||
# init history
|
||||
self._history = [] # will keep (timestamp, value) tuple
|
||||
mkthread(self._history_thread)
|
||||
|
||||
def _history_thread(self):
|
||||
while True:
|
||||
# adaptive sleeping interval
|
||||
if self.status[0] == status.BUSY:
|
||||
sleep(min(self.pollinterval))
|
||||
else:
|
||||
sleep(min(max(self.pollinterval) / 2.,
|
||||
max(self.window / 10., min(pollinterval))))
|
||||
try:
|
||||
self.read_value(0) # also append to self._history
|
||||
# shorten history
|
||||
while len(self._history) > 2:
|
||||
# if history would be too short, break
|
||||
if self._history[-1][0] - \
|
||||
self._history[1][0] < self.window:
|
||||
break
|
||||
# remove a stale point
|
||||
self._history.pop(0)
|
||||
except Exception:
|
||||
pass
|
||||
def poll(self, nr):
|
||||
super(AnalogOutput, self).poll(nr)
|
||||
while len(self._history) > 2:
|
||||
# if history would be too short, break
|
||||
if self._history[-1][0] - self._history[1][0] < self.window:
|
||||
break
|
||||
# else: remove a stale point
|
||||
self._history.pop(0)
|
||||
|
||||
def read_value(self, maxage=0):
|
||||
value = self._dev.value
|
||||
@ -560,17 +532,17 @@ class AnalogOutput(PyTangoDevice, Driveable):
|
||||
return self._checkLimits(value)
|
||||
|
||||
def write_target(self, value=FloatRange()):
|
||||
try:
|
||||
self._dev.value = value
|
||||
except HardwareError:
|
||||
if self.status[0] == status.BUSY:
|
||||
# changing target value during movement is not allowed by the
|
||||
# Tango base class state machine. If we are moving, stop first.
|
||||
if self.read_status(0)[0] == status.BUSY:
|
||||
self.stop()
|
||||
self._hw_wait()
|
||||
self._dev.value = value
|
||||
else:
|
||||
raise
|
||||
self.do_stop()
|
||||
self._hw_wait()
|
||||
self._dev.value = value
|
||||
self.read_status(0) # poll our status to keep it updated
|
||||
|
||||
def _hw_wait(self):
|
||||
while self.read_status(0)[0] == status.BUSY:
|
||||
sleep(0.3)
|
||||
|
||||
def do_stop(self):
|
||||
self._dev.Stop()
|
||||
@ -601,21 +573,21 @@ class Actuator(AnalogOutput):
|
||||
poll=30),
|
||||
}
|
||||
|
||||
def read_speed(self):
|
||||
def read_speed(self, maxage=0):
|
||||
return self._dev.speed
|
||||
|
||||
def write_speed(self, value):
|
||||
self._dev.speed = value
|
||||
|
||||
def read_ramp(self):
|
||||
def read_ramp(self, maxage=0):
|
||||
return self.read_speed() * 60
|
||||
|
||||
def write_ramp(self, value):
|
||||
self.write_speed(value / 60.)
|
||||
return self.speed * 60
|
||||
return self.read_speed(0) * 60
|
||||
|
||||
def do_setposition(self, value):
|
||||
self._dev.Adjust(value)
|
||||
# def do_setposition(self, value=FloatRange()):
|
||||
# self._dev.Adjust(value)
|
||||
|
||||
|
||||
class Motor(Actuator):
|
||||
@ -643,16 +615,16 @@ class Motor(Actuator):
|
||||
unit='main/s^2'),
|
||||
}
|
||||
|
||||
def read_refpos(self):
|
||||
def read_refpos(self, maxage=0):
|
||||
return float(self._getProperty('refpos'))
|
||||
|
||||
def read_accel(self):
|
||||
def read_accel(self, maxage=0):
|
||||
return self._dev.accel
|
||||
|
||||
def write_accel(self, value):
|
||||
self._dev.accel = value
|
||||
|
||||
def read_decel(self):
|
||||
def read_decel(self, maxage=0):
|
||||
return self._dev.decel
|
||||
|
||||
def write_decel(self, value):
|
||||
@ -695,32 +667,32 @@ class TemperatureController(Actuator):
|
||||
'precision': OVERRIDE(default=0.1),
|
||||
}
|
||||
|
||||
def read_ramp(self):
|
||||
def read_ramp(self, maxage=0):
|
||||
return self._dev.ramp
|
||||
|
||||
def write_ramp(self, value):
|
||||
self._dev.ramp = value
|
||||
return self._dev.ramp
|
||||
|
||||
def read_p(self):
|
||||
def read_p(self, maxage=0):
|
||||
return self._dev.p
|
||||
|
||||
def write_p(self, value):
|
||||
self._dev.p = value
|
||||
|
||||
def read_i(self):
|
||||
def read_i(self, maxage=0):
|
||||
return self._dev.i
|
||||
|
||||
def write_i(self, value):
|
||||
self._dev.i = value
|
||||
|
||||
def read_d(self):
|
||||
def read_d(self, maxage=0):
|
||||
return self._dev.d
|
||||
|
||||
def write_d(self, value):
|
||||
self._dev.d = value
|
||||
|
||||
def read_pid(self):
|
||||
def read_pid(self, maxage=0):
|
||||
self.read_p()
|
||||
self.read_i()
|
||||
self.read_d()
|
||||
@ -731,10 +703,10 @@ class TemperatureController(Actuator):
|
||||
self._dev.i = value[1]
|
||||
self._dev.d = value[2]
|
||||
|
||||
def read_setpoint(self):
|
||||
def read_setpoint(self, maxage=0):
|
||||
return self._dev.setpoint
|
||||
|
||||
def read_heateroutput(self):
|
||||
def read_heateroutput(self, maxage=0):
|
||||
return self._dev.heaterOutput
|
||||
|
||||
|
||||
@ -747,21 +719,21 @@ class PowerSupply(Actuator):
|
||||
'ramp': PARAM('Current/voltage ramp', unit='main/min',
|
||||
datatype=FloatRange(), readonly=False, poll=30,),
|
||||
'voltage': PARAM('Actual voltage', unit='V',
|
||||
datatype=FloatRange(), poll=5),
|
||||
datatype=FloatRange(), poll=-5),
|
||||
'current': PARAM('Actual current', unit='A',
|
||||
datatype=FloatRange(), poll=5),
|
||||
datatype=FloatRange(), poll=-5),
|
||||
}
|
||||
|
||||
def read_ramp(self):
|
||||
def read_ramp(self, maxage=0):
|
||||
return self._dev.ramp
|
||||
|
||||
def write_ramp(self, value):
|
||||
self._dev.ramp = value
|
||||
|
||||
def read_voltage(self):
|
||||
def read_voltage(self, maxage=0):
|
||||
return self._dev.voltage
|
||||
|
||||
def read_current(self):
|
||||
def read_current(self, maxage=0):
|
||||
return self._dev.current
|
||||
|
||||
|
||||
@ -771,7 +743,7 @@ class DigitalInput(PyTangoDevice, Readable):
|
||||
"""
|
||||
|
||||
OVERRIDES = {
|
||||
'value': OVERRIDE(datatype=IntRange(0)),
|
||||
'value': OVERRIDE(datatype=IntRange()),
|
||||
}
|
||||
|
||||
def read_value(self, maxage=0):
|
||||
@ -835,8 +807,8 @@ class DigitalOutput(PyTangoDevice, Driveable):
|
||||
"""
|
||||
|
||||
OVERRIDES = {
|
||||
'value': OVERRIDE(datatype=IntRange(0)),
|
||||
'target': OVERRIDE(datatype=IntRange(0)),
|
||||
'value': OVERRIDE(datatype=IntRange()),
|
||||
'target': OVERRIDE(datatype=IntRange()),
|
||||
}
|
||||
|
||||
def read_value(self, maxage=0):
|
||||
@ -856,17 +828,23 @@ class NamedDigitalOutput(DigitalOutput):
|
||||
A DigitalOutput with numeric values mapped to names.
|
||||
"""
|
||||
|
||||
PARAMS = {
|
||||
'mapping': PARAM('A dictionary mapping state names to integers',
|
||||
datatype=StringType(), export=False), # XXX: !!!
|
||||
}
|
||||
# PARAMS = {
|
||||
# 'mapping': PARAM('A dictionary mapping state names to integers',
|
||||
# datatype=EnumType(), export=False), # XXX: !!!
|
||||
# }
|
||||
#
|
||||
# def init(self):
|
||||
# super(NamedDigitalOutput, self).init()
|
||||
# try: # XXX: !!!
|
||||
# self.PARAMS['value'].datatype = EnumType(**eval(self.mapping))
|
||||
# except Exception as e:
|
||||
# raise ValueError('Illegal Value for mapping: %r' % e)
|
||||
|
||||
def init(self):
|
||||
super(NamedDigitalOutput, self).init()
|
||||
try: # XXX: !!!
|
||||
self.PARAMS['value'].datatype = EnumType(**eval(self.mapping))
|
||||
except Exception as e:
|
||||
raise ValueError('Illegal Value for mapping: %r' % e)
|
||||
def write_target(self, target):
|
||||
# map from enum-str to integer value
|
||||
self._dev.value = self.PARAMS[
|
||||
'target'].datatype.reversed.get(target, target)
|
||||
self.read_value()
|
||||
|
||||
|
||||
class PartialDigitalOutput(NamedDigitalOutput):
|
||||
@ -930,19 +908,19 @@ class StringIO(PyTangoDevice, Device):
|
||||
group='communication'),
|
||||
}
|
||||
|
||||
def read_bustimeout(self):
|
||||
def read_bustimeout(self, maxage=0):
|
||||
return self._dev.communicationTimeout
|
||||
|
||||
def write_bustimeout(self, value):
|
||||
self._dev.communicationTimeout = value
|
||||
|
||||
def read_endofline(self):
|
||||
def read_endofline(self, maxage=0):
|
||||
return self._dev.endOfLine
|
||||
|
||||
def write_endofline(self, value):
|
||||
self._dev.endOfLine = value
|
||||
|
||||
def read_startofline(self):
|
||||
def read_startofline(self, maxage=0):
|
||||
return self._dev.startOfLine
|
||||
|
||||
def write_startofline(self, value):
|
||||
|
Reference in New Issue
Block a user