From 37e18a8a5b5024a2275560b6c4dff408045eea29 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 3 May 2023 14:59:53 +0200 Subject: [PATCH] driver for mercury IPS OI mercury series magnet power supply Change-Id: I1ec20435a9e585c543afc736355e39da72eff879 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31009 Tested-by: Jenkins Automated Tests Reviewed-by: Markus Zolliker --- frappy_psi/ips_mercury.py | 340 ++++++++++++++++++++++++++++++++ frappy_psi/magfield.py | 400 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 740 insertions(+) create mode 100644 frappy_psi/ips_mercury.py create mode 100644 frappy_psi/magfield.py diff --git a/frappy_psi/ips_mercury.py b/frappy_psi/ips_mercury.py new file mode 100644 index 00000000..467f9eb3 --- /dev/null +++ b/frappy_psi/ips_mercury.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""oxford instruments mercury IPS power supply""" + +import time +from frappy.core import Parameter, EnumType, FloatRange, BoolType, IntRange, Property, Module +from frappy.lib.enum import Enum +from frappy.errors import BadValueError, HardwareError +from frappy_psi.magfield import Magfield, SimpleMagfield, Status +from frappy_psi.mercury import MercuryChannel, off_on, Mapped +from frappy.states import Retry + +Action = Enum(hold=0, run_to_set=1, run_to_zero=2, clamped=3) +hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set, + RTOZ=Action.run_to_zero, CLMP=Action.clamped) +CURRENT_CHECK_SIZE = 2 + + +class SimpleField(MercuryChannel, SimpleMagfield): + nunits = Property('number of IPS subunits', IntRange(1, 6), default=1) + action = Parameter('action', EnumType(Action), readonly=False) + setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0) + voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0) + atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0) + working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0) + kind = 'PSU' + slave_currents = None + classdict = {} + + def __new__(cls, name, logger, cfgdict, srv): # pylint: disable=arguments-differ + nunits = cfgdict.get('nunits', 1) + if isinstance(nunits, dict): + nunits = nunits['value'] + if nunits == 1: + return Module.__new__(cls, name, logger, cfgdict, srv) + classname = cls.__name__ + str(nunits) + newclass = cls.classdict.get(classname) + if not newclass: + # create individual current and voltage parameters dynamically + attrs = {} + for i in range(1, nunits + 1): + attrs['I%d' % i] = Parameter('slave %s current' % i, FloatRange(unit='A'), default=0) + attrs['V%d' % i] = Parameter('slave %s voltage' % i, FloatRange(unit='V'), default=0) + + newclass = type(classname, (cls,), attrs) + cls.classdict[classname] = newclass + return Module.__new__(newclass, name, logger, cfgdict, srv) + + def initModule(self): + super().initModule() + try: + self.write_action(Action.hold) + except Exception as e: + self.log.error('can not set to hold %r', e) + + def read_value(self): + return self.query('DEV::PSU:SIG:FLD') + + def read_ramp(self): + return self.query('DEV::PSU:SIG:RFST') + + def write_ramp(self, value): + return self.change('DEV::PSU:SIG:RFST', value) + + def read_action(self): + return self.query('DEV::PSU:ACTN', hold_rtoz_rtos_clmp) + + def write_action(self, value): + return self.change('DEV::PSU:ACTN', value, hold_rtoz_rtos_clmp) + + def read_atob(self): + return self.query('DEV::PSU:ATOB') + + def read_voltage(self): + return self.query('DEV::PSU:SIG:VOLT') + + def read_working_ramp(self): + return self.query('DEV::PSU:SIG:RFLD') + + def read_setpoint(self): + return self.query('DEV::PSU:SIG:FSET') + + def set_and_go(self, value): + self.setpoint = self.change('DEV::PSU:SIG:FSET', value) + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_set) == Action.run_to_set + + def start_ramp_to_target(self, sm): + # if self.action != Action.hold: + # assert self.write_action(Action.hold) == Action.hold + # return Retry + self.set_and_go(sm.target) + sm.try_cnt = 5 + return self.ramp_to_target + + def ramp_to_target(self, sm): + try: + return super().ramp_to_target(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + self.set_and_go(sm.target) + return Retry + + def final_status(self, *args, **kwds): + self.write_action(Action.hold) + return super().final_status(*args, **kwds) + + def on_restart(self, sm): + self.write_action(Action.hold) + return super().on_restart(sm) + + +class Field(SimpleField, Magfield): + persistent_field = Parameter( + 'persistent field at last switch off', FloatRange(unit='$'), readonly=False) + wait_switch_on = Parameter( + 'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60) + wait_switch_off = Parameter( + 'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60) + forced_persistent_field = Parameter( + 'manual indication that persistent field is bad', BoolType(), readonly=False, default=False) + + _field_mismatch = None + __persistent_field = None # internal value of persistent field + __switch_fixed_until = 0 + + def doPoll(self): + super().doPoll() + self.read_current() + + def startModule(self, start_events): + # on restart, assume switch is changed long time ago, if not, the mercury + # will complain and this will be handled in start_ramp_to_field + self.switch_on_time = 0 + self.switch_off_time = 0 + self.switch_heater = self.query('DEV::PSU:SIG:SWHT', off_on) + super().startModule(start_events) + + def read_value(self): + current = self.query('DEV::PSU:SIG:FLD') + if self.switch_heater == self.switch_heater.on: + self.__persistent_field = current + self.forced_persistent_field = False + return current + pf = self.query('DEV::PSU:SIG:PFLD') + if self.__persistent_field is None: + self.__persistent_field = pf + self._field_mismatch = False + else: + self._field_mismatch = abs(self.__persistent_field - pf) > self.tolerance * 10 + self.persistent_field = self.__persistent_field + return self.__persistent_field + + def _check_adr(self, adr): + """avoid complains about bad slot""" + if adr.startswith('DEV:PSU.M'): + return + super()._check_adr(adr) + + def read_current(self): + if self.slave_currents is None: + self.slave_currents = [[] for _ in range(self.nunits + 1)] + if self.nunits > 1: + for i in range(1, self.nunits + 1): + curri = self.query(f'DEV:PSU.M{i}:PSU:SIG:CURR') + volti = self.query(f'DEV:PSU.M{i}:PSU:SIG:VOLT') + setattr(self, f'I{i}', curri) + setattr(self, f'V{i}', volti) + self.slave_currents[i].append(curri) + current = self.query('DEV::PSU:SIG:CURR') + self.slave_currents[0].append(current) + min_ = min(self.slave_currents[0]) / self.nunits + max_ = max(self.slave_currents[0]) / self.nunits + # keep one element more for the total current (first and last measurement is a total) + self.slave_currents[0] = self.slave_currents[0][-CURRENT_CHECK_SIZE-1:] + for i in range(1, self.nunits + 1): + min_i = min(self.slave_currents[i]) + max_i = max(self.slave_currents[i]) + if len(self.slave_currents[i]) > CURRENT_CHECK_SIZE: + self.slave_currents[i] = self.slave_currents[i][-CURRENT_CHECK_SIZE:] + if min_i - 0.1 > max_ or min_ > max_i + 0.1: # use an arbitrary 0.1 A tolerance + self.log.warning('individual currents mismatch %r', self.slave_currents) + else: + current = self.query('DEV::PSU:SIG:CURR') + if self.atob: + return current / self.atob + return 0 + + def write_persistent_field(self, value): + if self.forced_persistent_field or abs(self.__persistent_field - value) <= self.tolerance * 10: + self._field_mismatch = False + self.__persistent_field = value + return value + raise BadValueError('changing persistent field needs forced_persistent_field=True') + + def write_target(self, target): + if self._field_mismatch: + self.forced_persistent_field = True + raise BadValueError('persistent field does not match - set persistent field to guessed value first') + return super().write_target(target) + + def read_switch_heater(self): + value = self.query('DEV::PSU:SIG:SWHT', off_on) + now = time.time() + if value != self.switch_heater: + if now < self.__switch_fixed_until: + self.log.debug('correct fixed switch time') + # probably switch heater was changed, but IPS reply is not yet updated + if self.switch_heater: + self.switch_on_time = time.time() + else: + self.switch_off_time = time.time() + return self.switch_heater + return value + + def read_wait_switch_on(self): + return self.query('DEV::PSU:SWONT') * 0.001 + + def read_wait_switch_off(self): + return self.query('DEV::PSU:SWOFT') * 0.001 + + def write_switch_heater(self, value): + if value == self.read_switch_heater(): + self.log.info('switch heater already %r', value) + # we do not want to restart the timer + return value + self.__switch_fixed_until = time.time() + 10 + self.log.debug('switch time fixed for 10 sec') + result = self.change('DEV::PSU:SIG:SWHT', value, off_on, n_retry=0) # no readback check + return result + + def start_ramp_to_field(self, sm): + if abs(self.current - self.__persistent_field) <= self.tolerance: + self.log.info('leads %g are already at %g', self.current, self.__persistent_field) + return self.ramp_to_field + try: + self.set_and_go(self.__persistent_field) + except (HardwareError, AssertionError) as e: + if self.switch_heater: + self.log.warn('switch is already on!') + return self.ramp_to_field + self.log.warn('wait first for switch off current=%g pf=%g %r', self.current, self.__persistent_field, e) + sm.after_wait = self.ramp_to_field + return self.wait_for_switch + return self.ramp_to_field + + def start_ramp_to_target(self, sm): + sm.try_cnt = 5 + try: + self.set_and_go(sm.target) + except (HardwareError, AssertionError) as e: + self.log.warn('switch not yet ready %r', e) + self.status = Status.PREPARING, 'wait for switch on' + sm.after_wait = self.ramp_to_target + return self.wait_for_switch + return self.ramp_to_target + + def ramp_to_field(self, sm): + try: + return super().ramp_to_field(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + self.set_and_go(self.__persistent_field) + return Retry + + def wait_for_switch(self, sm): + if not sm.delta(10): + return Retry + try: + self.log.warn('try again') + # try again + self.set_and_go(self.__persistent_field) + except (HardwareError, AssertionError): + return Retry + return sm.after_wait + + def wait_for_switch_on(self, sm): + self.read_switch_heater() # trigger switch_on/off_time + if self.switch_heater == self.switch_heater.off: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned off manually?') + return self.start_switch_on + return super().wait_for_switch_on(sm) + + def wait_for_switch_off(self, sm): + self.read_switch_heater() + if self.switch_heater == self.switch_heater.on: + if sm.init: # avoid too many states chained + return Retry + self.log.warning('switch turned on manually?') + return self.start_switch_off + return super().wait_for_switch_off(sm) + + def start_ramp_to_zero(self, sm): + pf = self.query('DEV::PSU:SIG:PFLD') + if abs(pf - self.value) > self.tolerance * 10: + self.log.warning('persistent field %g does not match %g after switch off', pf, self.value) + try: + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_zero) == Action.run_to_zero + except (HardwareError, AssertionError) as e: + self.log.warn('switch not yet ready %r', e) + self.status = Status.PREPARING, 'wait for switch off' + sm.after_wait = self.ramp_to_zero + return self.wait_for_switch + return self.ramp_to_zero + + def ramp_to_zero(self, sm): + try: + return super().ramp_to_zero(sm) + except HardwareError: + sm.try_cnt -= 1 + if sm.try_cnt < 0: + raise + assert self.write_action(Action.hold) == Action.hold + assert self.write_action(Action.run_to_zero) == Action.run_to_zero + return Retry diff --git a/frappy_psi/magfield.py b/frappy_psi/magfield.py new file mode 100644 index 00000000..71f128bb --- /dev/null +++ b/frappy_psi/magfield.py @@ -0,0 +1,400 @@ +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""generic persistent magnet driver""" + +import time +from frappy.core import Drivable, Parameter, BUSY, Limit +from frappy.datatypes import FloatRange, EnumType, TupleOf, StatusType +from frappy.errors import ConfigError, HardwareError, DisabledError +from frappy.lib.enum import Enum +from frappy.states import Retry, HasStates, status_code + +UNLIMITED = FloatRange() + +Mode = Enum( + DISABLED=0, + PERSISTENT=30, + DRIVEN=50, +) + +Status = Enum( + Drivable.Status, + PREPARED=150, + PREPARING=340, + RAMPING=370, + STABILIZING=380, + FINALIZING=390, +) + +OFF = 0 +ON = 1 + + +class SimpleMagfield(HasStates, Drivable): + value = Parameter('magnetic field', datatype=FloatRange(unit='T')) + target_min = Limit() + target_max = Limit() + ramp = Parameter( + 'wanted ramp rate for field', FloatRange(unit='$/min'), readonly=False) + # export only when different from ramp: + workingramp = Parameter( + 'effective ramp rate for field', FloatRange(unit='$/min'), export=False) + tolerance = Parameter( + 'tolerance', FloatRange(0, unit='$'), readonly=False, default=0.0002) + trained = Parameter( + 'trained field (positive)', + TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')), + readonly=False, default=(0, 0)) + wait_stable_field = Parameter( + 'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31) + ramp_tmo = Parameter( + 'timeout for field ramp progress', + FloatRange(0, unit='s'), readonly=False, default=30) + + _last_target = None + _symmetric_limits = False + + def earlyInit(self): + super().earlyInit() + # when limits are symmetric at init, we want auto symmetric limits + self._symmetric_limits = self.target_min == -self.target_max + + def write_target_max(self, value): + if self._symmetric_limits: + self.target_min = -value + return value + + def write_target_min(self, value): + """when modified to other than a symmetric value, we assume the user does not want auto symmetric limits""" + self._symmetric_limits = value == -self.target_max + return value + + def checkProperties(self): + dt = self.parameters['target'].datatype + max_ = dt.max + if max_ == UNLIMITED.max: + raise ConfigError('target.max not configured') + if dt.min == UNLIMITED.min: # not given: assume bipolar symmetric + dt.min = -max_ + self.target_min = max(dt.min, self.target_min) + if 'target_max' in self.writeDict: + self.writeDict.setdefault('target_min', -self.writeDict['target_max']) + super().checkProperties() + + def stop(self): + """keep field at current value""" + # let the state machine do the needed steps to finish + self.write_target(self.value) + + def last_target(self): + """get best known last target + + as long as the guessed last target is within tolerance + with repsect to the main value, it is used, as in general + it has better precision + """ + last = self._last_target + if last is None: + try: + last = self.setpoint # get read back from HW, if available + except Exception: + pass + if last is None or abs(last - self.value) > self.tolerance: + return self.value + return last + + def write_target(self, target): + self.start_machine(self.start_field_change, target=target) + return target + + def init_progress(self, sm, value): + sm.prev_point = sm.now, value + + def get_progress(self, sm, value): + """return the time passed for at least one tolerance step""" + t, v = sm.prev_point + dif = abs(v - value) + tdif = sm.now - t + if dif > self.tolerance: + sm.prev_point = sm.now, value + return tdif + + @status_code(BUSY, 'start ramp to target') + def start_field_change(self, sm): + self.setFastPoll(True, 1.0) + return self.start_ramp_to_target + + @status_code(BUSY, 'ramping field') + def ramp_to_target(self, sm): + if sm.init: + self.init_progress(sm, self.value) + # Remarks: assume there is a ramp limiting feature + if abs(self.value - sm.target) > self.tolerance: + if self.get_progress(sm, self.value) > self.ramp_tmo: + raise HardwareError('no progress') + sm.stabilize_start = None # force reset + return Retry + sm.stabilize_start = time.time() + return self.stabilize_field + + @status_code(BUSY, 'stabilizing field') + def stabilize_field(self, sm): + if sm.now - sm.stabilize_start < self.wait_stable_field: + return Retry + return self.final_status() + + def read_workingramp(self): + return self.ramp + + +class Magfield(SimpleMagfield): + status = Parameter(datatype=StatusType(Status)) + mode = Parameter('persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT) + switch_heater = Parameter('switch heater', EnumType(off=OFF, on=ON), + readonly=False, default=0) + current = Parameter( + 'leads current (in units of field)', FloatRange(unit='$')) + # TODO: time_to_target + # profile = Parameter( + # 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), + # readonly=False) + # profile_training = Parameter( + # 'ramp limit table when in training', + # ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False) + # TODO: the following parameters should be changed into properties after tests + wait_switch_on = Parameter( + 'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=61) + wait_switch_off = Parameter( + 'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=61) + wait_stable_leads = Parameter( + 'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6) + persistent_limit = Parameter( + 'above this limit, lead currents are not driven to 0', + FloatRange(0, unit='$'), readonly=False, default=99) + leads_ramp_tmo = Parameter( + 'timeout for leads ramp progress', + FloatRange(0, unit='s'), readonly=False, default=30) + init_persistency = True + switch_on_time = None + switch_off_time = None + + def doPoll(self): + if self.init_persistency: + if self.read_switch_heater() and self.mode != Mode.DRIVEN: + # switch heater is on on startup: got to persistent mode + # do this after some delay, so the user might continue + # driving without delay after a restart + self.start_machine(self.go_persistent_soon, mode=self.mode) + self.init_persistency = False + super().doPoll() + + def initModule(self): + super().initModule() + self.registerCallbacks(self) # for update_switch_heater + + def write_mode(self, value): + self.init_persistency = False + target = self.last_target() + func = self.start_field_change + if value == Mode.DISABLED: + target = 0 + if abs(self.value) < self.tolerance: + func = self.start_switch_off + elif value == Mode.PERSISTENT: + func = self.start_switch_off + self.target = target + self.start_machine(func, target=target, mode=value) + return value + + def write_target(self, target): + self.init_persistency = False + if self.mode == Mode.DISABLED: + if target == 0: + return 0 + raise DisabledError('disabled') + self.start_machine(self.start_field_change, target=target, mode=self.mode) + return target + + def on_error(self, sm): # sm is short for statemachine + if self.switch_heater == ON: + self.read_value() + if sm.mode != Mode.DRIVEN: + self.log.warning('turn switch heater off') + self.write_switch_heater(OFF) + return super().on_error(sm) + + @status_code(Status.WARN) + def go_persistent_soon(self, sm): + if sm.delta(60): + self.target = sm.target = self.last_target() + return self.start_field_change + return Retry + + @status_code(Status.PREPARING) + def start_field_change(self, sm): + self.setFastPoll(True, 1.0) + if (sm.target == self.last_target() and + abs(sm.target - self.value) <= self.tolerance and + abs(self.current - self.value) < self.tolerance and + (self.mode != Mode.DRIVEN or self.switch_heater == ON)): # short cut + return self.check_switch_off + if self.switch_heater == ON: + return self.start_switch_on + return self.start_ramp_to_field + + @status_code(Status.PREPARING) + def start_ramp_to_field(self, sm): + """start ramping current to persistent field + + initiate ramp to persistent field (with corresponding ramp rate) + the implementation should return ramp_to_field + """ + raise NotImplementedError + + @status_code(Status.PREPARING, 'ramp leads to match field') + def ramp_to_field(self, sm): + if sm.init: + sm.stabilize_start = 0 # in case current is already at field + self.init_progress(sm, self.current) + dif = abs(self.current - self.value) + if dif > self.tolerance: + tdif = self.get_progress(sm, self.current) + if tdif > self.leads_ramp_tmo: + raise HardwareError('no progress') + sm.stabilize_start = None # force reset + return Retry + if sm.stabilize_start is None: + sm.stabilize_start = sm.now + return self.stabilize_current + + @status_code(Status.PREPARING) + def stabilize_current(self, sm): + if sm.now - sm.stabilize_start < self.wait_stable_leads: + return Retry + return self.start_switch_on + + def update_switch_heater(self, value): + """is called whenever switch heater was changed""" + if value == 0: + if self.switch_off_time is None: + self.log.info('restart switch_off_time') + self.switch_off_time = time.time() + self.switch_on_time = None + else: + if self.switch_on_time is None: + self.log.info('restart switch_on_time') + self.switch_on_time = time.time() + self.switch_off_time = None + + @status_code(Status.PREPARING) + def start_switch_on(self, sm): + if (sm.target == self.last_target() and + abs(sm.target - self.value) <= self.tolerance): # short cut + return self.check_switch_off + if self.read_switch_heater() == OFF: + try: + self.write_switch_heater(ON) + except Exception as e: + self.log.warning('write_switch_heater %r', e) + return Retry + return self.wait_for_switch_on + + @status_code(Status.PREPARING) + def wait_for_switch_on(self, sm): + if sm.now - self.switch_on_time < self.wait_switch_on: + if sm.delta(10): + self.log.info('waited for %g sec', sm.now - self.switch_on_time) + return Retry + self._last_target = sm.target + return self.start_ramp_to_target + + @status_code(Status.RAMPING) + def start_ramp_to_target(self, sm): + """start ramping current to target field + + initiate ramp to target + the implementation should return ramp_to_target + """ + raise NotImplementedError + + @status_code(Status.RAMPING) + def ramp_to_target(self, sm): + dif = abs(self.value - sm.target) + if sm.init: + sm.stabilize_start = 0 # in case current is already at target + self.init_progress(sm, self.value) + if dif > self.tolerance: + sm.stabilize_start = sm.now + tdif = self.get_progress(sm, self.value) + if tdif > self.workingramp / self.tolerance * 60 + self.ramp_tmo: + self.log.warn('no progress') + raise HardwareError('no progress') + sm.stabilize_start = None + return Retry + if sm.stabilize_start is None: + sm.stabilize_start = sm.now + return self.stabilize_field + + @status_code(Status.STABILIZING) + def stabilize_field(self, sm): + if sm.now < sm.stabilize_start + self.wait_stable_field: + return Retry + return self.check_switch_off + + def check_switch_off(self, sm): + if sm.mode == Mode.DRIVEN: + return self.final_status(Status.PREPARED, 'driven') + return self.start_switch_off + + @status_code(Status.FINALIZING) + def start_switch_off(self, sm): + if self.switch_heater == ON: + self.write_switch_heater(OFF) + return self.wait_for_switch_off + + @status_code(Status.FINALIZING) + def wait_for_switch_off(self, sm): + if sm.now - self.switch_off_time < self.wait_switch_off: + return Retry + if abs(self.value) > self.persistent_limit: + return self.final_status(Status.IDLE, 'leads current at field, switch off') + return self.start_ramp_to_zero + + @status_code(Status.FINALIZING) + def start_ramp_to_zero(self, sm): + """start ramping current to zero + + initiate ramp to zero (with corresponding ramp rate) + the implementation should return ramp_to_zero + """ + raise NotImplementedError + + @status_code(Status.FINALIZING) + def ramp_to_zero(self, sm): + """ramp field to zero""" + if sm.init: + self.init_progress(sm, self.current) + if abs(self.current) > self.tolerance: + if self.get_progress(sm, self.value) > self.leads_ramp_tmo: + raise HardwareError('no progress') + return Retry + if sm.mode == Mode.DISABLED and abs(self.value) < self.tolerance: + return self.final_status(Status.DISABLED, 'disabled') + return self.final_status(Status.IDLE, 'persistent mode')