driver for mercury IPS

OI mercury series magnet power supply

Change-Id: I1ec20435a9e585c543afc736355e39da72eff879
Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31009
Tested-by: Jenkins Automated Tests <pedersen+jenkins@frm2.tum.de>
Reviewed-by: Markus Zolliker <markus.zolliker@psi.ch>
This commit is contained in:
2023-05-03 14:59:53 +02:00
parent dfe0038494
commit 37e18a8a5b
2 changed files with 740 additions and 0 deletions

340
frappy_psi/ips_mercury.py Normal file
View File

@ -0,0 +1,340 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""oxford instruments mercury IPS power supply"""
import time
from frappy.core import Parameter, EnumType, FloatRange, BoolType, IntRange, Property, Module
from frappy.lib.enum import Enum
from frappy.errors import BadValueError, HardwareError
from frappy_psi.magfield import Magfield, SimpleMagfield, Status
from frappy_psi.mercury import MercuryChannel, off_on, Mapped
from frappy.states import Retry
Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3)
hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
RTOZ=Action.run_to_zero, CLMP=Action.clamped)
CURRENT_CHECK_SIZE = 2
class SimpleField(MercuryChannel, SimpleMagfield):
nunits = Property('number of IPS subunits', IntRange(1, 6), default=1)
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0)
working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0)
kind = 'PSU'
slave_currents = None
classdict = {}
def __new__(cls, name, logger, cfgdict, srv): # pylint: disable=arguments-differ
nunits = cfgdict.get('nunits', 1)
if isinstance(nunits, dict):
nunits = nunits['value']
if nunits == 1:
return Module.__new__(cls, name, logger, cfgdict, srv)
classname = cls.__name__ + str(nunits)
newclass = cls.classdict.get(classname)
if not newclass:
# create individual current and voltage parameters dynamically
attrs = {}
for i in range(1, nunits + 1):
attrs['I%d' % i] = Parameter('slave %s current' % i, FloatRange(unit='A'), default=0)
attrs['V%d' % i] = Parameter('slave %s voltage' % i, FloatRange(unit='V'), default=0)
newclass = type(classname, (cls,), attrs)
cls.classdict[classname] = newclass
return Module.__new__(newclass, name, logger, cfgdict, srv)
def initModule(self):
super().initModule()
try:
self.write_action(Action.hold)
except Exception as e:
self.log.error('can not set to hold %r', e)
def read_value(self):
return self.query('DEV::PSU:SIG:FLD')
def read_ramp(self):
return self.query('DEV::PSU:SIG:RFST')
def write_ramp(self, value):
return self.change('DEV::PSU:SIG:RFST', value)
def read_action(self):
return self.query('DEV::PSU:ACTN', hold_rtoz_rtos_clmp)
def write_action(self, value):
return self.change('DEV::PSU:ACTN', value, hold_rtoz_rtos_clmp)
def read_atob(self):
return self.query('DEV::PSU:ATOB')
def read_voltage(self):
return self.query('DEV::PSU:SIG:VOLT')
def read_working_ramp(self):
return self.query('DEV::PSU:SIG:RFLD')
def read_setpoint(self):
return self.query('DEV::PSU:SIG:FSET')
def set_and_go(self, value):
self.setpoint = self.change('DEV::PSU:SIG:FSET', value)
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_set) == Action.run_to_set
def start_ramp_to_target(self, sm):
# if self.action != Action.hold:
# assert self.write_action(Action.hold) == Action.hold
# return Retry
self.set_and_go(sm.target)
sm.try_cnt = 5
return self.ramp_to_target
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
self.write_action(Action.hold)
return super().final_status(*args, **kwds)
def on_restart(self, sm):
self.write_action(Action.hold)
return super().on_restart(sm)
class Field(SimpleField, Magfield):
persistent_field = Parameter(
'persistent field at last switch off', FloatRange(unit='$'), readonly=False)
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
_field_mismatch = None
__persistent_field = None # internal value of persistent field
__switch_fixed_until = 0
def doPoll(self):
super().doPoll()
self.read_current()
def startModule(self, start_events):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
self.switch_heater = self.query('DEV::PSU:SIG:SWHT', off_on)
super().startModule(start_events)
def read_value(self):
current = self.query('DEV::PSU:SIG:FLD')
if self.switch_heater == self.switch_heater.on:
self.__persistent_field = current
self.forced_persistent_field = False
return current
pf = self.query('DEV::PSU:SIG:PFLD')
if self.__persistent_field is None:
self.__persistent_field = pf
self._field_mismatch = False
else:
self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10
self.persistent_field = self.__persistent_field
return self.__persistent_field
def _check_adr(self, adr):
"""avoid complains about bad slot"""
if adr.startswith('DEV:PSU.M'):
return
super()._check_adr(adr)
def read_current(self):
if self.slave_currents is None:
self.slave_currents = [[] for _ in range(self.nunits + 1)]
if self.nunits > 1:
for i in range(1, self.nunits + 1):
curri = self.query(f'DEV:PSU.M{i}:PSU:SIG:CURR')
volti = self.query(f'DEV:PSU.M{i}:PSU:SIG:VOLT')
setattr(self, f'I{i}', curri)
setattr(self, f'V{i}', volti)
self.slave_currents[i].append(curri)
current = self.query('DEV::PSU:SIG:CURR')
self.slave_currents[0].append(current)
min_ = min(self.slave_currents[0]) / self.nunits
max_ = max(self.slave_currents[0]) / self.nunits
# keep one element more for the total current (first and last measurement is a total)
self.slave_currents[0] = self.slave_currents[0][-CURRENT_CHECK_SIZE-1:]
for i in range(1, self.nunits + 1):
min_i = min(self.slave_currents[i])
max_i = max(self.slave_currents[i])
if len(self.slave_currents[i]) > CURRENT_CHECK_SIZE:
self.slave_currents[i] = self.slave_currents[i][-CURRENT_CHECK_SIZE:]
if min_i - 0.1 > max_ or min_ > max_i + 0.1: # use an arbitrary 0.1 A tolerance
self.log.warning('individual currents mismatch %r', self.slave_currents)
else:
current = self.query('DEV::PSU:SIG:CURR')
if self.atob:
return current / self.atob
return 0
def write_persistent_field(self, value):
if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10:
self._field_mismatch = False
self.__persistent_field = value
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
value = self.query('DEV::PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < self.__switch_fixed_until:
self.log.debug('correct fixed switch time')
# probably switch heater was changed, but IPS reply is not yet updated
if self.switch_heater:
self.switch_on_time = time.time()
else:
self.switch_off_time = time.time()
return self.switch_heater
return value
def read_wait_switch_on(self):
return self.query('DEV::PSU:SWONT') * 0.001
def read_wait_switch_off(self):
return self.query('DEV::PSU:SWOFT') * 0.001
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.__switch_fixed_until = time.time() + 10
self.log.debug('switch time fixed for 10 sec')
result = self.change('DEV::PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check
return result
def start_ramp_to_field(self, sm):
if abs(self.current - self.__persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.__persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e)
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(self.__persistent_field)
return Retry
def wait_for_switch(self, sm):
if not sm.delta(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.__persistent_field)
except (HardwareError, AssertionError):
return Retry
return sm.after_wait
def wait_for_switch_on(self, sm):
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == self.switch_heater.off:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned off manually?')
return self.start_switch_on
return super().wait_for_switch_on(sm)
def wait_for_switch_off(self, sm):
self.read_switch_heater()
if self.switch_heater == self.switch_heater.on:
if sm.init: # avoid too many states chained
return Retry
self.log.warning('switch turned on manually?')
return self.start_switch_off
return super().wait_for_switch_off(sm)
def start_ramp_to_zero(self, sm):
pf = self.query('DEV::PSU:SIG:PFLD')
if abs(pf - self.value) > self.tolerance * 10:
self.log.warning('persistent field %g does not match %g after switch off', pf, self.value)
try:
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action(Action.hold) == Action.hold
assert self.write_action(Action.run_to_zero) == Action.run_to_zero
return Retry

400
frappy_psi/magfield.py Normal file
View File

@ -0,0 +1,400 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""generic persistent magnet driver"""
import time
from frappy.core import Drivable, Parameter, BUSY, Limit
from frappy.datatypes import FloatRange, EnumType, TupleOf, StatusType
from frappy.errors import ConfigError, HardwareError, DisabledError
from frappy.lib.enum import Enum
from frappy.states import Retry, HasStates, status_code
UNLIMITED = FloatRange()
Mode = Enum(
DISABLED=0,
PERSISTENT=30,
DRIVEN=50,
)
Status = Enum(
Drivable.Status,
PREPARED=150,
PREPARING=340,
RAMPING=370,
STABILIZING=380,
FINALIZING=390,
)
OFF = 0
ON = 1
class SimpleMagfield(HasStates, Drivable):
value = Parameter('magnetic field', datatype=FloatRange(unit='T'))
target_min = Limit()
target_max = Limit()
ramp = Parameter(
'wanted ramp rate for field', FloatRange(unit='$/min'), readonly=False)
# export only when different from ramp:
workingramp = Parameter(
'effective ramp rate for field', FloatRange(unit='$/min'), export=False)
tolerance = Parameter(
'tolerance', FloatRange(0, unit='$'), readonly=False, default=0.0002)
trained = Parameter(
'trained field (positive)',
TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')),
readonly=False, default=(0, 0))
wait_stable_field = Parameter(
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31)
ramp_tmo = Parameter(
'timeout for field ramp progress',
FloatRange(0, unit='s'), readonly=False, default=30)
_last_target = None
_symmetric_limits = False
def earlyInit(self):
super().earlyInit()
# when limits are symmetric at init, we want auto symmetric limits
self._symmetric_limits = self.target_min == -self.target_max
def write_target_max(self, value):
if self._symmetric_limits:
self.target_min = -value
return value
def write_target_min(self, value):
"""when modified to other than a symmetric value, we assume the user does not want auto symmetric limits"""
self._symmetric_limits = value == -self.target_max
return value
def checkProperties(self):
dt = self.parameters['target'].datatype
max_ = dt.max
if max_ == UNLIMITED.max:
raise ConfigError('target.max not configured')
if dt.min == UNLIMITED.min: # not given: assume bipolar symmetric
dt.min = -max_
self.target_min = max(dt.min, self.target_min)
if 'target_max' in self.writeDict:
self.writeDict.setdefault('target_min', -self.writeDict['target_max'])
super().checkProperties()
def stop(self):
"""keep field at current value"""
# let the state machine do the needed steps to finish
self.write_target(self.value)
def last_target(self):
"""get best known last target
as long as the guessed last target is within tolerance
with repsect to the main value, it is used, as in general
it has better precision
"""
last = self._last_target
if last is None:
try:
last = self.setpoint # get read back from HW, if available
except Exception:
pass
if last is None or abs(last - self.value) > self.tolerance:
return self.value
return last
def write_target(self, target):
self.start_machine(self.start_field_change, target=target)
return target
def init_progress(self, sm, value):
sm.prev_point = sm.now, value
def get_progress(self, sm, value):
"""return the time passed for at least one tolerance step"""
t, v = sm.prev_point
dif = abs(v - value)
tdif = sm.now - t
if dif > self.tolerance:
sm.prev_point = sm.now, value
return tdif
@status_code(BUSY, 'start ramp to target')
def start_field_change(self, sm):
self.setFastPoll(True, 1.0)
return self.start_ramp_to_target
@status_code(BUSY, 'ramping field')
def ramp_to_target(self, sm):
if sm.init:
self.init_progress(sm, self.value)
# Remarks: assume there is a ramp limiting feature
if abs(self.value - sm.target) > self.tolerance:
if self.get_progress(sm, self.value) > self.ramp_tmo:
raise HardwareError('no progress')
sm.stabilize_start = None # force reset
return Retry
sm.stabilize_start = time.time()
return self.stabilize_field
@status_code(BUSY, 'stabilizing field')
def stabilize_field(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_field:
return Retry
return self.final_status()
def read_workingramp(self):
return self.ramp
class Magfield(SimpleMagfield):
status = Parameter(datatype=StatusType(Status))
mode = Parameter('persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT)
switch_heater = Parameter('switch heater', EnumType(off=OFF, on=ON),
readonly=False, default=0)
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
# TODO: time_to_target
# profile = Parameter(
# 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))),
# readonly=False)
# profile_training = Parameter(
# 'ramp limit table when in training',
# ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False)
# TODO: the following parameters should be changed into properties after tests
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=61)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=61)
wait_stable_leads = Parameter(
'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6)
persistent_limit = Parameter(
'above this limit, lead currents are not driven to 0',
FloatRange(0, unit='$'), readonly=False, default=99)
leads_ramp_tmo = Parameter(
'timeout for leads ramp progress',
FloatRange(0, unit='s'), readonly=False, default=30)
init_persistency = True
switch_on_time = None
switch_off_time = None
def doPoll(self):
if self.init_persistency:
if self.read_switch_heater() and self.mode != Mode.DRIVEN:
# switch heater is on on startup: got to persistent mode
# do this after some delay, so the user might continue
# driving without delay after a restart
self.start_machine(self.go_persistent_soon, mode=self.mode)
self.init_persistency = False
super().doPoll()
def initModule(self):
super().initModule()
self.registerCallbacks(self) # for update_switch_heater
def write_mode(self, value):
self.init_persistency = False
target = self.last_target()
func = self.start_field_change
if value == Mode.DISABLED:
target = 0
if abs(self.value) < self.tolerance:
func = self.start_switch_off
elif value == Mode.PERSISTENT:
func = self.start_switch_off
self.target = target
self.start_machine(func, target=target, mode=value)
return value
def write_target(self, target):
self.init_persistency = False
if self.mode == Mode.DISABLED:
if target == 0:
return 0
raise DisabledError('disabled')
self.start_machine(self.start_field_change, target=target, mode=self.mode)
return target
def on_error(self, sm): # sm is short for statemachine
if self.switch_heater == ON:
self.read_value()
if sm.mode != Mode.DRIVEN:
self.log.warning('turn switch heater off')
self.write_switch_heater(OFF)
return super().on_error(sm)
@status_code(Status.WARN)
def go_persistent_soon(self, sm):
if sm.delta(60):
self.target = sm.target = self.last_target()
return self.start_field_change
return Retry
@status_code(Status.PREPARING)
def start_field_change(self, sm):
self.setFastPoll(True, 1.0)
if (sm.target == self.last_target() and
abs(sm.target - self.value) <= self.tolerance and
abs(self.current - self.value) < self.tolerance and
(self.mode != Mode.DRIVEN or self.switch_heater == ON)): # short cut
return self.check_switch_off
if self.switch_heater == ON:
return self.start_switch_on
return self.start_ramp_to_field
@status_code(Status.PREPARING)
def start_ramp_to_field(self, sm):
"""start ramping current to persistent field
initiate ramp to persistent field (with corresponding ramp rate)
the implementation should return ramp_to_field
"""
raise NotImplementedError
@status_code(Status.PREPARING, 'ramp leads to match field')
def ramp_to_field(self, sm):
if sm.init:
sm.stabilize_start = 0 # in case current is already at field
self.init_progress(sm, self.current)
dif = abs(self.current - self.value)
if dif > self.tolerance:
tdif = self.get_progress(sm, self.current)
if tdif > self.leads_ramp_tmo:
raise HardwareError('no progress')
sm.stabilize_start = None # force reset
return Retry
if sm.stabilize_start is None:
sm.stabilize_start = sm.now
return self.stabilize_current
@status_code(Status.PREPARING)
def stabilize_current(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_leads:
return Retry
return self.start_switch_on
def update_switch_heater(self, value):
"""is called whenever switch heater was changed"""
if value == 0:
if self.switch_off_time is None:
self.log.info('restart switch_off_time')
self.switch_off_time = time.time()
self.switch_on_time = None
else:
if self.switch_on_time is None:
self.log.info('restart switch_on_time')
self.switch_on_time = time.time()
self.switch_off_time = None
@status_code(Status.PREPARING)
def start_switch_on(self, sm):
if (sm.target == self.last_target() and
abs(sm.target - self.value) <= self.tolerance): # short cut
return self.check_switch_off
if self.read_switch_heater() == OFF:
try:
self.write_switch_heater(ON)
except Exception as e:
self.log.warning('write_switch_heater %r', e)
return Retry
return self.wait_for_switch_on
@status_code(Status.PREPARING)
def wait_for_switch_on(self, sm):
if sm.now - self.switch_on_time < self.wait_switch_on:
if sm.delta(10):
self.log.info('waited for %g sec', sm.now - self.switch_on_time)
return Retry
self._last_target = sm.target
return self.start_ramp_to_target
@status_code(Status.RAMPING)
def start_ramp_to_target(self, sm):
"""start ramping current to target field
initiate ramp to target
the implementation should return ramp_to_target
"""
raise NotImplementedError
@status_code(Status.RAMPING)
def ramp_to_target(self, sm):
dif = abs(self.value - sm.target)
if sm.init:
sm.stabilize_start = 0 # in case current is already at target
self.init_progress(sm, self.value)
if dif > self.tolerance:
sm.stabilize_start = sm.now
tdif = self.get_progress(sm, self.value)
if tdif > self.workingramp / self.tolerance * 60 + self.ramp_tmo:
self.log.warn('no progress')
raise HardwareError('no progress')
sm.stabilize_start = None
return Retry
if sm.stabilize_start is None:
sm.stabilize_start = sm.now
return self.stabilize_field
@status_code(Status.STABILIZING)
def stabilize_field(self, sm):
if sm.now < sm.stabilize_start + self.wait_stable_field:
return Retry
return self.check_switch_off
def check_switch_off(self, sm):
if sm.mode == Mode.DRIVEN:
return self.final_status(Status.PREPARED, 'driven')
return self.start_switch_off
@status_code(Status.FINALIZING)
def start_switch_off(self, sm):
if self.switch_heater == ON:
self.write_switch_heater(OFF)
return self.wait_for_switch_off
@status_code(Status.FINALIZING)
def wait_for_switch_off(self, sm):
if sm.now - self.switch_off_time < self.wait_switch_off:
return Retry
if abs(self.value) > self.persistent_limit:
return self.final_status(Status.IDLE, 'leads current at field, switch off')
return self.start_ramp_to_zero
@status_code(Status.FINALIZING)
def start_ramp_to_zero(self, sm):
"""start ramping current to zero
initiate ramp to zero (with corresponding ramp rate)
the implementation should return ramp_to_zero
"""
raise NotImplementedError
@status_code(Status.FINALIZING)
def ramp_to_zero(self, sm):
"""ramp field to zero"""
if sm.init:
self.init_progress(sm, self.current)
if abs(self.current) > self.tolerance:
if self.get_progress(sm, self.value) > self.leads_ramp_tmo:
raise HardwareError('no progress')
return Retry
if sm.mode == Mode.DISABLED and abs(self.value) < self.tolerance:
return self.final_status(Status.DISABLED, 'disabled')
return self.final_status(Status.IDLE, 'persistent mode')