improved commandhandler

- analyze_<group> returns now a dict
- change_<group> has no more values arguments.
  values may be read with change.readValues(), which leaves more
  freedom to the programmer, especially if the values do not need
  to be read before a change

Change-Id: I67b70302eaf9bbdac107df61123062fa133d501d
Reviewed-on: https://forge.frm2.tum.de/review/c/sine2020/secop/playground/+/22047
Tested-by: JenkinsCodeReview <bjoern_pedersen@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
zolliker 2019-12-16 17:23:20 +01:00
parent fcad78a682
commit a876eba5f6
5 changed files with 213 additions and 219 deletions

View File

@ -28,23 +28,29 @@ for all parameters with the same handler. Before analyze_<group> is called, the
reply is parsed and converted to values, which are then given as arguments.
def analyze_<group>(self, value1, value2, ...):
# <here> we have to calculate parameters from the values (value1, value2 ...)
# and assign them to self.<parameter>
# no return value is expected
# here we have to calculate parameters from the values (value1, value2 ...)
# and return a dict with parameter names as keys and new values.
It is an error to have a read_<parameter> method implemented on a parameter with a
handler.
For write, instead of the methods write_<parameter>" we write one method change_<group>
for all parameters with the same handler.
def change_<group>(self, new, value1, value2, ...):
# <new> is a wrapper object around the module, containing already the new values.
# if READ_BEFORE_WRITE is True (the default), the additional arguments (value1, ...)
# must be in the argument list. They contain the values read from the hardware.
# If they are not needed, set READ_BEFORE_WRITE to False, or declare them as '*args'.
# The expression ('<parameter>' in new) returns a boolean indicating, whether
# this parameter is subject to change.
def change_<group>(self, change):
# Change contains the to be changed parameters as attributes, and also the unchanged
# parameters taking part to the handler group. If the method needs the current values
# from the hardware, it can read them with change.getValues(). This call does also
# update the values of the attributes of change, which are not subject to change.
# In addtion, the method may call change.toBeChanged(<parameter name>) to determine,
# whether a specific parameter is subject to change.
# The return value must be either a sequence of values to be written to the hardware,
# which will be formatted by the handler, or None. The latter is used only in some
# special cases, when nothing has to be written.
A write_<parameter> method may be implemented in addition. In that case, it is executed
before change_<group>. write_<parameters> may return None or Done, in these cases
change_<group> is not called.
"""
import re
@ -56,8 +62,8 @@ from secop.errors import ProgrammingError
class CmdParser:
"""helper for parsing replies
using a subset of old style python formatting
the same format can be used or formatting command arguments
using a subset of old style python formatting.
The same format can be used or formatting command arguments
"""
# make a map of cast functions
@ -139,34 +145,43 @@ class CmdParser:
class Change:
"""contains new values for the call to change_<group>
A Change instance is used as the 'new' argument for the change_<group> method.
new.<parameter> is either the new, changed value or the old value from the module.
In addition '<parameter>' in new indicates, whether <parameter> is to be changed.
new.<parameter> can not be changed
A Change instance is used as an argument for the change_<group> method.
Getting the value of change.<parameter> returns either the new, changed value or the
current one from the module, if there is no new value.
"""
def __init__(self, module, parameters, valuedict):
self.__dict__.update(valuedict, _module=module, _parameters=parameters)
def __init__(self, handler, module, valuedict):
self._handler = handler
self._module = module
self._valuedict = valuedict
self._to_be_changed = set(self._valuedict)
self._do_read = True
def __getattr__(self, pname):
"""get current values from _module for unchanged parameters"""
if not pname in self._parameters:
raise AttributeError("parameter '%s' is not within the handlers group"
% pname)
return getattr(self._module, pname)
def __getattr__(self, key):
"""return attribute from module key is not in self._valuedict"""
if key in self._valuedict:
return self._valuedict[key]
return getattr(self._module, key)
def __setattr__(self, pname, value):
raise AttributeError("can't set attribute ")
def doesInclude(self, *args):
"""check whether one of the specified parameters is to be changed"""
return bool(set(args) & self._to_be_changed)
def __contains__(self, pname):
"""check whether a specific parameter is to be changed"""
return pname in self.__dict__
def readValues(self):
"""read values from the hardware
and update our parameter attributes accordingly (i.e. do not touch the new values)
"""
if self._do_read:
self._do_read = False
self._reply = self._handler.send_command(self._module)
result = self._handler.analyze(self._module, *self._reply)
result.update(self._valuedict)
self._valuedict.update(result)
return self._reply
class CmdHandlerBase:
"""generic command handler"""
READ_BEFORE_WRITE = True
# if READ_BEFORE_WRITE is True, a read is performed before a write, and the parsed
# additional parameters are added to the argument list of change_<group>.
def __init__(self, group):
# group is used for calling the proper analyze_<group> and change_<group> methods
@ -216,19 +231,19 @@ class CmdHandlerBase:
raise ProgrammingError("the handler '%s' for '%s.%s' is already used in module '%s'"
% (self.group, modclass.__name__, pname, self._module_class.__name__))
self.parameters.add(pname)
self.analyze = getattr(modclass, 'analyze_' + self.group)
return self.read
def read(self, module):
"""the read function passed to the metaclass
overwrite with None if not used
"""
"""write values from module"""
assert module.__class__ == self._module_class
try:
# do a read of the current hw values
reply = self.send_command(module)
# convert them to parameters
getattr(module, 'analyze_' + self.group)(*reply)
assert module.__class__ == self._module_class
result = self.analyze(module, *reply)
for pname, value in result.items():
setattr(module, pname, value)
for pname in self.parameters:
if module.parameters[pname].readerror:
# clear errors on parameters, which were not updated.
@ -239,72 +254,62 @@ class CmdHandlerBase:
for pname in self.parameters:
module.setError(pname, e)
raise
return Done # parameters should be updated already
return Done
def get_write_func(self, pname, wfunc):
def get_write_func(self, pname, pre_wfunc=None):
"""returns the write function passed to the metaclass
may be overriden to return None, if not used
If pre_wfunc is given, it is to be called before change_<group>.
May be overriden to return None, if not used
"""
self.change = getattr(self._module_class, 'change_' + self.group)
if wfunc:
def new_wfunc(module, value, cmd=self, pname=pname, wfunc=wfunc):
if pre_wfunc:
def wfunc(module, value, hdl=self, pname=pname, wfunc=pre_wfunc):
value = wfunc(module, value)
if value is None or value is Done:
return value
cmd.write(module, {pname: value})
hdl.write(module, pname, value)
return Done
else:
def new_wfunc(module, value, cmd=self, pname=pname):
cmd.write(module, {pname: value})
def wfunc(module, value, hdl=self, pname=pname):
hdl.write(module, pname, value)
return Done
return new_wfunc
return wfunc
def write(self, module, valuedict, force_read=False):
"""write values to the module
When called from write_<param>, valuedict contains only one item,
the single parameter to be changed.
If called directly, valuedict may have more items.
"""
analyze = getattr(module, 'analyze_' + self.group)
def write(self, module, pname, value):
"""write value to the module"""
assert module.__class__ == self._module_class
force_read = False
valuedict = {pname: value}
if module.writeDict: # collect other parameters to be written
valuedict = dict(valuedict)
for p in self.parameters:
if p in self.writeDict:
valuedict[p] = self.writeDict.pop(p)
if p in module.writeDict:
valuedict[p] = module.writeDict.pop(p)
elif p not in valuedict:
force_read = True
if self.READ_BEFORE_WRITE or force_read:
# do a read of the current hw values
values = self.send_command(module)
# convert them to parameters
analyze(*values)
if not self.READ_BEFORE_WRITE:
values = () # values are not expected for change_<group>
new = Change(module, self.parameters, valuedict)
# call change_* for calculation new hw values
values = getattr(module, 'change_' + self.group)(new, *values)
change = Change(self, module, valuedict)
if force_read:
change.readValues()
values = self.change(module, change)
if values is None: # this indicates that nothing has to be written
return
# send the change command and a query command
analyze(*self.send_change(module, *values))
def change(self, module, *values):
"""write and read back values
might be called from a write method, if change_<group> is not implemented
"""
getattr(module, 'analyze_' + self.group)(*self.send_change(module, *values))
reply = self.send_change(module, *values)
result = self.analyze(module, *reply)
for k, v in result.items():
setattr(module, k, v)
class CmdHandler(CmdHandlerBase):
"""more evolved command handler
this command handler works for a syntax, where the change command syntax can be
build from the query command syntax, with the to be changed items at the second
part of the command, using the same format as for the reply.
This command handler works for a syntax, where the reply of a query command has
the same format as the arguments for the change command.
Examples: devices from LakeShore, PPMS
implementing classes have to define/override the following:

View File

@ -424,7 +424,7 @@ class Drivable(Writable):
def isDriving(self, status=None):
'''helper function (finalize is busy, not driving)'''
return 300 <= (status or self.status)[0] < 380
return 300 <= (status or self.status)[0] < 390
# improved polling: may poll faster if module is BUSY
def pollParams(self, nr=0):

View File

@ -45,7 +45,7 @@ class CmdHandler(secop.commandhandler.CmdHandler):
rdgrng = CmdHandler('rdgrng', 'RDGRNG?%(channel)d', '%d,%d,%d,%d,%d')
inset = CmdHandler('inset', 'INSET?%(channel)d', '%d,%d,%d,%d,%d')
filterhdl = CmdHandler('filt', 'FILTER?%(channel)d', '%d,%d,%d')
filterhdl = CmdHandler('filter', 'FILTER?%(channel)d', '%d,%d,%d')
scan = CmdHandler('scan', 'SCAN?', '%d,%d')
@ -74,10 +74,11 @@ class Main(HasIodev, Module):
pollerClass = Poller
def analyze_scan(self, channel, autoscan):
self.channel, self.autoscan = channel, autoscan
return dict(channel=channel, autoscan=autoscan)
def change_scan(self, new, *args):
return new.channel, new.autoscan
def change_scan(self, change):
change.readValues()
return change.channel, change.autoscan
class ResChannel(HasIodev, Readable):
@ -121,7 +122,7 @@ class ResChannel(HasIodev, Readable):
Parameter('current excitation', datatype=EnumType(off=0, **CUR_RANGE), readonly=False, handler=rdgrng),
'vexc':
Parameter('voltage excitation', datatype=EnumType(off=0, **VOLT_RANGE), readonly=False, handler=rdgrng),
'enable':
'enabled':
Parameter('is this channel enabled?', datatype=BoolType(), readonly=False, handler=inset),
'pause':
Parameter('pause after channel change', datatype=IntRange(), readonly=False, handler=inset),
@ -139,6 +140,9 @@ class ResChannel(HasIodev, Readable):
def read_value(self):
if self.channel != self._main.channel:
return Done
if not self.enabled:
self.status = [self.Status.DISABLED, 'disabled']
return Done
result = self.sendRecv('RDGR?%d' % self.channel)
result = float(result)
if self.autorange == 'soft':
@ -172,6 +176,8 @@ class ResChannel(HasIodev, Readable):
def read_status(self):
if self.channel != self._main.channel:
return Done
if not self.enabled:
return [self.Status.DISABLED, 'disabled']
result = int(self.sendRecv('RDGST?%d' % self.channel))
result &= 0x37 # mask T_OVER and T_UNDER (change this when implementing temperatures instead of resistivities)
statustext = STATUS_TEXT[result]
@ -180,54 +186,56 @@ class ResChannel(HasIodev, Readable):
return [self.Status.IDLE, '']
def analyze_rdgrng(self, iscur, exc, rng, autorange, excoff):
if excoff:
self.iexc, self.vexc = 0,0
elif iscur:
self.iexc, self.vexc = exc, 0
else:
self.iexc, self.vexc = 0, exc
result = dict(range=rng)
if autorange:
self.autorange = 'hard'
result['auotrange'] = 'hard'
elif self.autorange == 'hard':
result['autorange'] = 'soft'
# else: do not change autorange
if excoff:
result.update(iexc=0, vexc=0)
elif iscur:
result.update(iexc=exc, vexc=0)
else:
if self.autorange == 'hard':
self.autorange = 'soft'
else:
self.autorange = self.autorange
self.range = rng
result.update(iexc=0, vexc=exc)
return result
def change_rdgrng(self, new, iscur, exc, rng, autorange, excoff):
if new.vexc != self.vexc: # in case vext is changed, do not consider iexc
new.iexc = 0
if new.iexc != 0: # we need '!= 0' here, as bool(enum) is always True!
def change_rdgrng(self, change):
iscur, exc, rng, autorange, excoff = change.readValues()
if change.doesInclude('vexc'): # in case vext is changed, do not consider iexc
change.iexc = 0
if change.iexc != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 1
exc = new.iexc
exc = change.iexc
excoff = 0
elif new.vexc != 0: # we need '!= 0' here, as bool(enum) is always True!
elif change.vexc != 0: # we need '!= 0' here, as bool(enum) is always True!
iscur = 0
exc = new.vexc
exc = change.vexc
excoff = 0
else:
excoff = 1
rng = new.range
if new.autorange == 'hard':
rng = change.range
if change.autorange == 'hard':
autorange = 1
else:
autorange = 0
if new.autorange == 'soft':
if rng < new.minrange:
rng = new.minrange
if change.autorange == 'soft':
if rng < self.minrange:
rng = self.minrange
return iscur, exc, rng, autorange, excoff
def analyze_inset(self, on, dwell, pause, curve, tempco):
self.enabled, self.dwell, self.pause = on, dwell, pause
return dict(enabled=on, dwell=dwell, pause=pause)
def change_inset(self, new, on, dwell, pause, curve, tempco):
return new.enable, new.dwell, new.pause, curve, tempco
def change_inset(self, change):
_, _, _, curve, tempco = change.readValues()
return change.enabled, change.dwell, change.pause, curve, tempco
def analyze_filt(self, on, settle, window):
self.filter = settle if on else 0
def analyze_filter(self, on, settle, window):
return dict(filter=settle if on else 0)
def change_filt(self, new, on, settle, window):
if new.filter:
return 1, new.filter, 80 # always use 80% filter
def change_filter(self, change):
_, settle, window = change.readValues()
if change.filter:
return 1, change.filter, 80 # always use 80% filter
return 0, settle, window

View File

@ -29,12 +29,6 @@ The PPMS hardware has some special requirements:
Polling of value and status is done commonly for all modules. For each registered module
<module>.update_value_status() is called in order to update their value and status.
Polling of module settings is using the same poller (secop.Poller is checking iodev).
Only the hidden (not exported) parameter 'settings' is polled, all the others are updated
by read_settings. The modules parameters related to the settings are updated only on change.
This allows for example for the field module to buffer ramp and approachmode until the
next target or persistent_mode change happens, because sending the common command for
settings and target would do a useless cycle of ramping up leads, heating switch etc.
"""
import time
@ -137,7 +131,6 @@ class PpmsMixin(HasIodev, Module):
pollerClass = Poller
enabled = True # default, if no parameter enable is defined
# STATUS_MAP = {} # a mapping converting ppms status codes into SECoP status values
_last_target_change = 0 # used by several modules
_last_settings = None # used by several modules
slow_pollfactor = 1
@ -150,22 +143,19 @@ class PpmsMixin(HasIodev, Module):
started_callback()
def read_value(self):
"""effective polling is done by the main module"""
if not self.enabled:
return Done
if self.parameters['value'].timestamp == 0:
# make sure that the value is read at least after init
self._iodev.read_data()
return self.value
"""polling is done by the main module
and PPMS does not deliver really more fresh values when polled more often
"""
return Done
def read_status(self):
"""effective polling is done by the main module"""
if not self.enabled:
return Done
if self.parameters['value'].timestamp == 0:
# make sure that the value is read at least after init
self._iodev.read_data()
return self.status
"""polling is done by the main module
and PPMS does not deliver really fresh status values anyway: the status is not
changed immediately after a target change!
"""
return Done
def update_value_status(self, value, packed_status):
"""update value and status
@ -239,11 +229,11 @@ class DriverChannel(Channel):
def analyze_drvout(self, no, current, powerlimit):
if self.no != no:
raise HardwareError('DRVOUT command: channel number in reply does not match')
self.current = current
self.powerlimit = powerlimit
return dict(current=current, powerlimit=powerlimit)
def change_drvout(self, new):
return new.current, new.powerlimit
def change_drvout(self, change):
self.readValues()
return change.current, change.powerlimit
class BridgeChannel(Channel):
@ -272,22 +262,23 @@ class BridgeChannel(Channel):
Override(visibility=3),
}
_settingnames = ['no', 'excitation', 'powerlimit', 'dcflag', 'readingmode', 'voltagelimit']
def analyze_bridge(self, no, excitation, powerlimit, dcflag, readingmode, voltagelimit):
if self.no != no:
raise HardwareError('DRVOUT command: channel number in reply does not match')
self.enabled = excitation != 0 and powerlimit != 0 and voltagelimit != 0
self.excitation = excitation or self.excitation
self.powerlimit = powerlimit or self.powerlimit
self.dcflag = dcflag
self.readingmode = readingmode
self.voltagelimit = voltagelimit or self.voltagelimit
return dict(
enabled=excitation != 0 and powerlimit != 0 and voltagelimit != 0,
excitation=excitation or self.excitation,
powerlimit=powerlimit or self.powerlimit,
dcflag=dcflag,
readingmode=readingmode,
voltagelimit=voltagelimit or self.voltagelimit,
)
def change_bridge(self, new):
if new.enabled:
return self.no, new.excitation, new.powerlimit, new.dcflag, new.readingmode, new.voltagelimit
return self.no, 0, 0, new.dcflag, new.readingmode, 0
def change_bridge(self, change):
self.readValues()
if change.enabled:
return self.no, change.excitation, change.powerlimit, change.dcflag, change.readingmode, change.voltagelimit
return self.no, 0, 0, change.dcflag, change.readingmode, 0
class Level(PpmsMixin, Readable):
@ -316,7 +307,7 @@ class Level(PpmsMixin, Readable):
self.status = [self.Status.IDLE, '']
else:
self.status = [self.Status.ERROR, 'old reading']
self.value = level
return dict(value = level)
class Chamber(PpmsMixin, Drivable):
@ -377,7 +368,6 @@ class Chamber(PpmsMixin, Drivable):
}
channel = 'chamber'
_settingnames = ['target']
def update_value_status(self, value, packed_status):
"""update value and status"""
@ -385,16 +375,16 @@ class Chamber(PpmsMixin, Drivable):
self.status = self.STATUS_MAP[self.value]
def analyze_chamber(self, target):
self.target = target
return dict(target=target)
def change_chamber(self, new):
def change_chamber(self, change):
"""write settings, combining <pname>=<value> and current attributes
and request updated settings
"""
if new.target == self.Operation.noop:
if change.target == self.Operation.noop:
return None
return (new.target,)
return (change.target,)
class Temp(PpmsMixin, Drivable):
@ -446,7 +436,7 @@ class Temp(PpmsMixin, Drivable):
STATUS_MAP = {
0: [Status.ERROR, 'unknown'],
1: [Status.IDLE, 'stable at target'],
2: [Status.RAMPING, 'changing'],
2: [Status.RAMPING, 'ramping'],
5: [Status.STABILIZING, 'within tolerance'],
6: [Status.STABILIZING, 'outside tolerance'],
10: [Status.WARN, 'standby'],
@ -456,7 +446,6 @@ class Temp(PpmsMixin, Drivable):
}
channel = 'temp'
_settingnames = ['target', 'ramp', 'approachmode']
_stopped = False
_expected_target = 0
_last_change = 0 # 0 means no target change is pending
@ -481,15 +470,13 @@ class Temp(PpmsMixin, Drivable):
self.status = [self.Status.IDLE, 'stopped(%s)' % status[1]]
return
if self._last_change: # there was a change, which is not yet confirmed by hw
if self.isDriving(status):
if now > self._last_change + 15 or status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
if now > self._last_change + 5:
self._last_change = 0 # give up waiting for busy
elif self.isDriving(status) and status != self._status_before_change:
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
self._last_change = 0
else:
if now < self._last_change + 15:
status = [self.Status.BUSY, 'changed target while %s' % status[1]]
else:
status = [self.Status.WARN, 'temperature status (%r) does not change to BUSY' % status]
status = [self.Status.BUSY, 'changed target']
if self._expected_target:
# handle timeout
if self.isDriving(status):
@ -500,24 +487,23 @@ class Temp(PpmsMixin, Drivable):
self.status = status
def analyze_temp(self, target, ramp, approachmode):
if (target, ramp, approachmode) != self._last_settings:
if (target, ramp, approachmode) == self._last_settings:
# we update parameters only on change, as 'approachmode'
# is not always sent to the hardware
self._last_settings = target, ramp, approachmode
self.target = target
self.ramp =ramp
self.approachmode = approachmode
return {}
self._last_settings = target, ramp, approachmode
return dict(target=target, ramp=ramp, approachmode=approachmode)
def change_temp(self, new):
self.calc_expected(new.target, self.ramp)
return new.target, new.ramp, new.approachmode
def change_temp(self, change):
self.calc_expected(change.target, self.ramp)
return change.target, change.ramp, change.approachmode
def write_target(self, target):
self._stopped = False
if abs(self.target - self.value) < 2e-5 and target == self.target:
return None
self._status_before_change = self.status
self.status = [self.Status.BUSY, 'changed_target']
self.status = [self.Status.BUSY, 'changed target']
self._last_change = time.time()
return target
@ -623,31 +609,27 @@ class Field(PpmsMixin, Drivable):
status = [self.Status.PREPARING, 'ramping leads']
else:
status = [self.Status.WARN, 'timeout when ramping leads']
elif self.isDriving(status):
if now > self._last_change + 5 or status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
elif now > self._last_change + 5:
self._last_change = 0 # give up waiting for driving
elif self.isDriving(status) and status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
else:
if now < self._last_change + 5:
status = [self.Status.BUSY, 'changed target while %s' % status[1]]
else:
status = [self.Status.WARN, 'field status (%r) does not change to BUSY' % status]
status = [self.Status.BUSY, 'changed target']
self.status = status
def analyze_field(self, target, ramp, approachmode, persistentmode):
if (target, ramp, approachmode, persistentmode) != self._last_settings:
if (target, ramp, approachmode, persistentmode) == self._last_settings:
# we update parameters only on change, as 'ramp' and 'approachmode' are
# not always sent to the hardware
self._last_settings = target, ramp, approachmode, persistentmode
self.target = target * 1e-4
self.ramp = ramp * 6e-3
self.approachmode = approachmode
self.persistentmode = persistentmode
return {}
self._last_settings = target, ramp, approachmode, persistentmode
return dict(target=target * 1e-4, ramp=ramp * 6e-3, approachmode=approachmode,
persistentmode=persistentmode)
def change_field(self, new):
if 'target' in new or 'persistentmode' in new:
# changed target or persistentmode
if 'target' in new:
def change_field(self, change):
if change.doesInclude('target', 'persistentmode'):
if change.doesInclude('target'):
self._last_target = self.target # save for stop command
self._stopped = False
self._last_change = time.time()
@ -656,7 +638,7 @@ class Field(PpmsMixin, Drivable):
# changed ramp or approachmode
if not self.isDriving():
return None # nothing to be written, as this would trigger a ramp up of leads current
return new.target * 1e+4, new.ramp / 6e-3, new.approachmode, new.persistentmode
return change.target * 1e+4, change.ramp / 6e-3, change.approachmode, change.persistentmode
def do_stop(self):
if not self.isDriving():
@ -701,7 +683,6 @@ class Position(PpmsMixin, Drivable):
}
channel = 'position'
_settingnames = ['target', 'mode', 'speed']
_stopped = False
_last_target = 0
_last_change = 0 # means no target change is pending
@ -724,28 +705,26 @@ class Position(PpmsMixin, Drivable):
status = [self.Status.IDLE, 'stopped(%s)' % status[1]]
if self._last_change: # there was a change, which is not yet confirmed by hw
now = time.time()
if self.isDriving():
if now > self._last_change + 15 or status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
if now > self._last_change + 5:
self._last_change = 0 # give up waiting for busy
elif self.isDriving() and status != self._status_before_change:
self._last_change = 0
self.log.debug('time needed to change to busy: %.3g', now - self._last_change)
else:
if now < self._last_change + 15:
status = [self.Status.BUSY, 'changed target while %s' % status[1]]
else:
status = [self.Status.WARN, 'temperature status (%r) does not change to BUSY' % status]
status = [self.Status.BUSY, 'changed target']
self.status = status
def analyze_move(self, target, mode, speed):
if (target, speed) != self._last_settings:
if (target, speed) == self._last_settings:
# we update parameters only on change, as 'speed' is
# not always sent to the hardware
self._last_settings = target, speed
self.target = target
self.speed = (15 - speed) * 0.8
return {}
self._last_settings = target, speed
return dict(target=target, speed=(15 - speed) * 0.8)
def change_move(self, new):
speed = int(round(min(14, max(0, 15 - new.speed / 0.8)), 0))
return new.target, 0, speed
def change_move(self, change):
speed = int(round(min(14, max(0, 15 - change.speed / 0.8)), 0))
return change.target, 0, speed
def write_target(self, target):
self._last_target = self.target # save for stop command

View File

@ -62,7 +62,7 @@ class Data:
tag, data = self.data.pop(0)
print('pop(%s) %r' % (tag, data))
if tag != expected:
raise ValueError('expected tag %s' % expected)
raise ValueError('expected tag %s, not %s' % (expected, tag))
return data
def empty(self):
@ -123,16 +123,18 @@ def test_CmdHandler():
def analyze_group1(self, val):
assert data.pop('val') == val
self.simple = data.pop('simple')
return dict(simple=data.pop('simple'))
def analyze_group2(self, gval, sval, dval):
assert data.pop('gsv') == (gval, sval, dval)
self.real, self.text = data.pop('rt')
real, text = data.pop('rt')
return dict(real=real, text=text)
def change_group2(self, new, gval, sval, dval):
def change_group2(self, change):
gval, sval, dval = change.readValues()
assert data.pop('old') == (gval, sval, dval)
assert data.pop('self') == (self.real, self.text)
assert data.pop('new') == (new.real, new.text)
assert data.pop('new') == (change.real, change.text)
return data.pop('changed')
data = Data()