frappy/secop_psi/magfield.py
camea 8e3cdc80e4 improvements in magfiels/ips_mercury
- read voltage
- fix a bug with ._init name conflict
2022-08-12 15:10:23 +02:00

300 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""generic persistent magnet driver"""
import time
from secop.core import Drivable, Parameter, Done
from secop.datatypes import FloatRange, EnumType, ArrayOf, TupleOf, StatusType
from secop.features import HasLimits
from secop.errors import ConfigError, ProgrammingError
from secop.lib.enum import Enum
from secop.lib.statemachine import Retry, StateMachine
UNLIMITED = FloatRange()
Mode = Enum(
DISABLED=0,
PERSISTENT=30,
DRIVEN=50,
)
Status = Enum(
Drivable.Status,
PREPARED=150,
PREPARING=340,
RAMPING=370,
STABILIZING=380,
FINALIZING=390,
)
class Magfield(HasLimits, Drivable):
value = Parameter('magnetic field', datatype=FloatRange(unit='T'))
status = Parameter(datatype=StatusType(Status))
mode = Parameter(
'persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT)
tolerance = Parameter(
'tolerance', FloatRange(0, unit='$'), readonly=False, default=0.0002)
switch_heater = Parameter('switch heater', EnumType(off=0, on=1),
readonly=False, default=0)
persistent_field = Parameter(
'persistent field', FloatRange(unit='$'), readonly=False)
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
ramp = Parameter(
'ramp rate for field', FloatRange(unit='$/min'), readonly=False)
trained = Parameter(
'trained field (positive)',
TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')),
readonly=False, default=(0, 0))
# TODO: time_to_target
# profile = Parameter(
# 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))),
# readonly=False)
# profile_training = Parameter(
# 'ramp limit table when in training',
# ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False)
# TODO: the following parameters should be changed into properties after tests
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=61)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=61)
wait_stable_leads = Parameter(
'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6)
wait_stable_field = Parameter(
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31)
_state = None
__init = True
_super_sw_check = False
_last_target = None
switch_time = None
def doPoll(self):
if self.__init:
self.__init = False
self.switch_time = time.time()
if self.read_switch_heater() and self.mode == Mode.PERSISTENT:
self.read_value() # check for persistent field mismatch
# switch off heater from previous live or manual intervention
self.write_target(self.persistent_field)
else:
self._last_target = self.persistent_field
else:
self.read_value()
self._state.cycle()
def checkProperties(self):
dt = self.parameters['target'].datatype
max_ = dt.max
if max_ == UNLIMITED.max:
raise ConfigError('target.max not configured')
if dt.min == UNLIMITED.min: # not given: assume bipolar symmetric
dt.min = -max_
super().checkProperties()
def initModule(self):
super().initModule()
self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup_state)
def write_target(self, target):
self.check_limits(target)
self.target = target
if not self._state.is_active:
# as long as the state machine is still running, it takes care of changing targets
self._state.start(self.start_field_change)
self.doPoll()
return Done
def write_mode(self, value):
self.mode = value
if not self._state.is_active:
self._state.start(self.start_field_change)
self.doPoll()
return Done
def cleanup_state(self, state):
self.status = Status.ERROR, repr(state.last_error)
self.log.error('in state %s: %r', state.state.__name__, state.last_error)
self.setFastPoll(False)
if self.switch_heater != 0:
self.persistent_field = self.read_value()
if self.mode != Mode.DRIVEN:
self.log.warning('turn switch heater off')
self.write_switch_heater(0)
def stop(self):
"""keep field at current value"""
# let the state machine do the needed steps to finish
self.write_target(self.value)
def start_field_change(self, state):
self.setFastPoll(True, 1.0)
self.status = Status.PREPARING, 'changed target field'
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
return self.start_ramp_to_field
def start_ramp_to_field(self, state):
"""start ramping current to persistent field
should return ramp_to_field
"""
raise NotImplementedError
def ramp_to_field(self, state):
"""ramping, wait for current at persistent field"""
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
if abs(self.current - self.persistent_field) > self.tolerance:
if state.init:
self.status = Status.PREPARING, 'ramping leads current to field'
return Retry()
state.stabilize_start = time.time()
return self.stabilize_current
def stabilize_current(self, state):
"""wait for stable current at persistent field"""
if state.now - state.stabilize_start < self.wait_stable_leads:
if state.init:
self.status = Status.PREPARING, 'stabilizing leads current'
return Retry()
return self.start_switch_on
def write_switch_heater(self, value):
"""implementations must super call this!"""
self._super_sw_check = True
if value != self.switch_heater:
self.switch_time = time.time()
return value
def start_switch_on(self, state):
"""switch heater on"""
if self.switch_heater != 0:
self.status = Status.PREPARING, 'wait for heater on'
else:
self.status = Status.PREPARING, 'turn switch heater on'
self._super_sw_check = False
self.write_switch_heater(True)
if not self._super_sw_check:
raise ProgrammingError('missing super call in write_switch_heater')
return self.switch_on
def switch_on(self, state):
"""wait for switch heater open"""
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
if state.now - self.switch_time < self.wait_switch_on:
return Retry()
self._last_target = self.target
return self.start_ramp_to_target
def start_ramp_to_target(self, state):
"""start ramping current to target
should return ramp_to_target
"""
raise NotImplementedError
def ramp_to_target(self, state):
"""ramp field to target"""
if self.target != self._last_target: # target was changed
self._last_target = self.target
return self.start_ramp_to_target
self.persistent_field = self.value
# Remarks: assume there is a ramp limiting feature
if abs(self.value - self.target) > self.tolerance:
if state.init:
self.status = Status.RAMPING, 'ramping field'
return Retry()
state.stabilize_start = time.time()
return self.stabilize_field
def stabilize_field(self, state):
"""stabilize field"""
if self.target != self._last_target: # target was changed
self._last_target = self.target
return self.start_ramp_to_target
self.persistent_field = self.value
if state.now - state.stabilize_start < self.wait_stable_field:
if state.init:
self.status = Status.STABILIZING, 'stabilizing field'
return Retry()
return self.check_switch_off
def check_switch_off(self, state):
if self.mode == Mode.DRIVEN:
self.status = Status.PREPARED, 'driven'
return self.finish_state
return self.start_switch_off
def start_switch_off(self, state):
"""turn off switch heater"""
if self.switch_heater != 0:
self.status = Status.FINALIZING, 'turn switch heater off'
else:
self.status = Status.FINALIZING, 'wait for heater off'
self.write_switch_heater(False)
# no check for super call needed here (would have been detected in start_switch_on)
return self.switch_off
def switch_off(self, state):
"""wait for switch heater closed"""
if self.target != self._last_target or self.mode == Mode.DRIVEN:
# target or mode has changed -> redo
self._last_target = None
return self.start_switch_on
self.persistent_field = self.value
if state.now - self.switch_time < self.wait_switch_off:
return Retry()
return self.start_ramp_to_zero
def start_ramp_to_zero(self, state):
"""start ramping current to target
initiate ramp to zero (with corresponding ramp rate)
should return ramp_to_zero
"""
raise NotImplementedError
def ramp_to_zero(self, state):
"""ramp field to zero"""
if self.target != self._last_target or self.mode == Mode.DRIVEN:
# target or mode has changed -> redo
self._last_target = None
return self.start_field_change
if abs(self.current) > self.tolerance:
if state.init:
self.status = Status.FINALIZING, 'ramp leads to zero'
return Retry()
if self.mode == Mode.DISABLED and self.persistent_field == 0:
self.status = Status.DISABLED, 'disabled'
else:
self.status = Status.IDLE, 'persistent mode'
return self.finish_state
def finish_state(self, state):
"""finish"""
self.setFastPoll(False)
return None