# -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """generic persistent magnet driver""" import time from secop.core import Drivable, Parameter, Done from secop.datatypes import FloatRange, EnumType, ArrayOf, TupleOf, StatusType from secop.features import HasLimits from secop.errors import ConfigError, ProgrammingError from secop.lib.enum import Enum from secop.lib.statemachine import Retry, StateMachine UNLIMITED = FloatRange() Mode = Enum( DISABLED=0, PERSISTENT=30, DRIVEN=50, ) Status = Enum( Drivable.Status, PREPARED=150, PREPARING=340, RAMPING=370, STABILIZING=380, FINALIZING=390, ) class Magfield(HasLimits, Drivable): value = Parameter('magnetic field', datatype=FloatRange(unit='T')) status = Parameter(datatype=StatusType(Status)) mode = Parameter( 'persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT) tolerance = Parameter( 'tolerance', FloatRange(0, unit='$'), readonly=False, default=0.0002) switch_heater = Parameter('switch heater', EnumType(off=0, on=1), readonly=False, default=0) persistent_field = Parameter( 'persistent field', FloatRange(unit='$'), readonly=False) current = Parameter( 'leads current (in units of field)', FloatRange(unit='$')) ramp = Parameter( 'ramp rate for field', FloatRange(unit='$/min'), readonly=False) trained = Parameter( 'trained field (positive)', TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')), readonly=False, default=(0, 0)) # TODO: time_to_target # profile = Parameter( # 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), # readonly=False) # profile_training = Parameter( # 'ramp limit table when in training', # ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False) # TODO: the following parameters should be changed into properties after tests wait_switch_on = Parameter( 'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=61) wait_switch_off = Parameter( 'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=61) wait_stable_leads = Parameter( 'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6) wait_stable_field = Parameter( 'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31) persistent_limit = Parameter( 'above this limit, lead currents are not driven to 0', FloatRange(0, unit='$'), readonly=False, default=99) _state = None __init = True _last_target = None switch_on_time = None switch_off_time = None def doPoll(self): if self.__init: self.__init = False if self.read_switch_heater() and self.mode == Mode.PERSISTENT: self.read_value() # check for persistent field mismatch # switch off heater from previous live or manual intervention self.write_target(self.persistent_field) else: self._last_target = self.persistent_field else: self.read_value() self._state.cycle() def checkProperties(self): dt = self.parameters['target'].datatype max_ = dt.max if max_ == UNLIMITED.max: raise ConfigError('target.max not configured') if dt.min == UNLIMITED.min: # not given: assume bipolar symmetric dt.min = -max_ super().checkProperties() def initModule(self): super().initModule() self.registerCallbacks(self) # for update_switch_heater self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup_state) def write_target(self, target): self.check_limits(target) self.target = target if not self._state.is_active: # as long as the state machine is still running, it takes care of changing targets self._state.start(self.start_field_change) self.doPoll() return Done def write_mode(self, value): self.mode = value if not self._state.is_active: self._state.start(self.start_field_change) self.doPoll() return Done def cleanup_state(self, state): self.status = Status.ERROR, repr(state.last_error) self.log.error('in state %s: %r', state.state.__name__, state.last_error) self.setFastPoll(False) if self.switch_heater != 0: self.persistent_field = self.read_value() if self.mode != Mode.DRIVEN: self.log.warning('turn switch heater off') self.write_switch_heater(0) def stop(self): """keep field at current value""" # let the state machine do the needed steps to finish self.write_target(self.value) def start_field_change(self, state): self.setFastPoll(True, 1.0) self.status = Status.PREPARING, 'changed target field' if (self.target == self._last_target and abs(self.target - self.persistent_field) <= self.tolerance): # short cut return self.check_switch_off return self.start_ramp_to_field def start_ramp_to_field(self, state): """start ramping current to persistent field should return ramp_to_field """ raise NotImplementedError def ramp_to_field(self, state): """ramping, wait for current at persistent field""" if (self.target == self._last_target and abs(self.target - self.persistent_field) <= self.tolerance): # short cut return self.check_switch_off if abs(self.current - self.persistent_field) > self.tolerance: if state.init: self.status = Status.PREPARING, 'ramping leads current to field' return Retry() state.stabilize_start = time.time() return self.stabilize_current def stabilize_current(self, state): """wait for stable current at persistent field""" if state.now - state.stabilize_start < self.wait_stable_leads: if state.init: self.status = Status.PREPARING, 'stabilizing leads current' return Retry() return self.start_switch_on def update_switch_heater(self, value): """is called whenever switch heater was changed""" if value != 0: self.switch_off_time = None if self.switch_on_time is None: self.switch_on_time = time.time() else: self.switch_on_time = None if self.switch_off_time is None: self.switch_off_time = time.time() def start_switch_on(self, state): """switch heater on""" if self.switch_heater == 0: self.status = Status.PREPARING, 'turn switch heater on' try: self.write_switch_heater(True) except Exception as e: self.log.warning('write_switch_heater %r', e) return Retry() else: self.status = Status.PREPARING, 'wait for heater on' return self.switch_on def switch_on(self, state): """wait for switch heater open""" if (self.target == self._last_target and abs(self.target - self.persistent_field) <= self.tolerance): # short cut return self.check_switch_off self.read_switch_heater() if self.switch_on_time is None: if state.now - self.switch_off_time > 10: self.log.warning('switch turned off manually?') return self.start_switch_on return Retry() if state.now - self.switch_on_time < self.wait_switch_on: return Retry() self._last_target = self.target return self.start_ramp_to_target def start_ramp_to_target(self, state): """start ramping current to target should return ramp_to_target """ raise NotImplementedError def ramp_to_target(self, state): """ramp field to target""" if self.target != self._last_target: # target was changed self._last_target = self.target return self.start_ramp_to_target self.persistent_field = self.value # Remarks: assume there is a ramp limiting feature if abs(self.value - self.target) > self.tolerance: if state.init: self.status = Status.RAMPING, 'ramping field' return Retry() state.stabilize_start = time.time() return self.stabilize_field def stabilize_field(self, state): """stabilize field""" if self.target != self._last_target: # target was changed self._last_target = self.target return self.start_ramp_to_target self.persistent_field = self.value if state.now - state.stabilize_start < self.wait_stable_field: if state.init: self.status = Status.STABILIZING, 'stabilizing field' return Retry() return self.check_switch_off def check_switch_off(self, state): if self.mode == Mode.DRIVEN: self.status = Status.PREPARED, 'driven' return self.finish_state return self.start_switch_off def start_switch_off(self, state): """turn off switch heater""" if self.switch_heater == 1: self.status = Status.FINALIZING, 'turn switch heater off' self.write_switch_heater(False) else: self.status = Status.FINALIZING, 'wait for heater off' return self.switch_off def switch_off(self, state): """wait for switch heater closed""" if self.target != self._last_target or self.mode == Mode.DRIVEN: # target or mode has changed -> redo self._last_target = None return self.start_switch_on self.persistent_field = self.value self.read_switch_heater() if self.switch_off_time is None: if state.now - self.switch_on_time > 10: self.log.warning('switch turned on manually?') return self.start_switch_off return Retry() if state.now - self.switch_off_time < self.wait_switch_off: return Retry() if abs(self.value) > self.persistent_limit: self.status = Status.IDLE, 'leads current at field, switch off' return self.finish_state return self.start_ramp_to_zero def start_ramp_to_zero(self, state): """start ramping current to target initiate ramp to zero (with corresponding ramp rate) should return ramp_to_zero """ raise NotImplementedError def ramp_to_zero(self, state): """ramp field to zero""" if self.target != self._last_target or self.mode == Mode.DRIVEN: # target or mode has changed -> redo self._last_target = None return self.start_field_change if abs(self.current) > self.tolerance: if state.init: self.status = Status.FINALIZING, 'ramp leads to zero' return Retry() if self.mode == Mode.DISABLED and self.persistent_field == 0: self.status = Status.DISABLED, 'disabled' else: self.status = Status.IDLE, 'persistent mode' return self.finish_state def finish_state(self, state): """finish""" self.setFastPoll(False) return None