magfield adapted to new state machine

This commit is contained in:
l_samenv 2022-11-21 14:37:53 +01:00
parent 4405b2b02c
commit 1b2e364f70
4 changed files with 632 additions and 283 deletions

97
secop_psi/dilsc.py Normal file
View File

@ -0,0 +1,97 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""vector field"""
from secop.core import Drivable, Done, BUSY, IDLE, WARN, ERROR
from secop.errors import BadValueError
from secop_psi.vector import Vector
DECREASE = 1
INCREASE = 2
class VectorField(Vector, Drivable):
_state = None
def doPoll(self):
"""periodically called method"""
try:
if self._starting:
# first decrease components
driving = False
for target, component in zip(self.target, self.components):
if target * component.value < 0:
# change sign: drive to zero first
target = 0
if abs(target) < abs(component.target):
if target != component.target:
component.write_target(target)
if component.isDriving():
driving = True
if driving:
return
# now we can go to the final targets
for target, component in zip(self.target, self.components):
component.write_target(target)
self._starting = False
else:
for component in self.components:
if component.isDriving():
return
self.setFastPoll(False)
finally:
super().doPoll()
def merge_status(self):
names = [c.name for c in self.components if c.status[0] >= ERROR]
if names:
return ERROR, 'error in %s' % ', '.join(names)
names = [c.name for c in self.components if c.isDriving()]
if self._state:
# self.log.info('merge %r', [c.status for c in self.components])
if names:
direction = 'down ' if self._state == DECREASE else ''
return BUSY, 'ramping %s%s' % (direction, ', '.join(names))
if self.status[0] == BUSY:
return self.status
return BUSY, 'driving'
if names:
return WARN, 'moving %s directly' % ', '.join(names)
names = [c.name for c in self.components if c.status[0] >= WARN]
if names:
return WARN, 'warnings in %s' % ', '.join(names)
return IDLE, ''
def write_target(self, value):
"""initiate target change"""
# first make sure target is valid
for target, component in zip(self.target, self.components):
# check against limits if individual components
component.check_limits(target)
if sum(v * v for v in value) > 1:
raise BadValueError('norm of vector too high')
self.log.info('decrease')
self.setFastPoll(True)
self.target = value
self._state = DECREASE
self.doPoll()
self.log.info('done write_target %r', value)
return Done

View File

@ -21,10 +21,10 @@
"""oxford instruments mercury IPS power supply"""
import time
from secop.core import Parameter, EnumType, FloatRange, BoolType
from secop.core import Parameter, EnumType, FloatRange, BoolType, IntRange, StringType, Property, BUSY
from secop.lib.enum import Enum
from secop.errors import BadValueError, HardwareError
from secop_psi.magfield import Magfield
from secop_psi.magfield import Magfield, SimpleMagfield, Status
from secop_psi.mercury import MercuryChannel, off_on, Mapped
from secop.lib.statemachine import Retry
@ -34,54 +34,39 @@ hold_rtoz_rtos_clmp = Mapped(HOLD=Action.hold, RTOS=Action.run_to_set,
CURRENT_CHECK_SIZE = 2
class Field(MercuryChannel, Magfield):
class SimpleField(MercuryChannel, SimpleMagfield):
nunits = Property('number of IPS subunits', IntRange(1, 6), default=1)
action = Parameter('action', EnumType(Action), readonly=False)
setpoint = Parameter('field setpoint', FloatRange(unit='T'), default=0)
voltage = Parameter('leads voltage', FloatRange(unit='V'), default=0)
atob = Parameter('field to amp', FloatRange(0, unit='A/T'), default=0)
I1 = Parameter('master current', FloatRange(unit='A'), default=0)
I2 = Parameter('slave 2 current', FloatRange(unit='A'), default=0)
I3 = Parameter('slave 3 current', FloatRange(unit='A'), default=0)
V1 = Parameter('master voltage', FloatRange(unit='V'), default=0)
V2 = Parameter('slave 2 voltage', FloatRange(unit='V'), default=0)
V3 = Parameter('slave 3 voltage', FloatRange(unit='V'), default=0)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
working_ramp = Parameter('effective ramp', FloatRange(0, unit='T/min'), default=0)
channel_type = 'PSU'
_field_mismatch = None
nslaves = 3
slave_currents = None
__init = True
classdict = {}
def doPoll(self):
super().doPoll()
self.read_current()
def __new__(cls, name, logger, cfgdict, srv):
base = cls.__bases__[1]
nunits = cfgdict.get('nunits', 1)
if nunits == 1:
obj = object.__new__(cls)
return obj
classname = cls.__name__ + str(nunits)
newclass = cls.classdict.get(classname)
if not newclass:
# create individual current and voltage parameters dynamically
attrs = {}
for i in range(1, nunits + 1):
attrs['I%d' % i] = Parameter('slave %s current' % i, FloatRange(unit='A'), default=0)
attrs['V%d' % i] = Parameter('slave %s voltage' % i, FloatRange(unit='V'), default=0)
newclass = type(classname, (cls,), attrs)
cls.classdict[classname] = newclass
obj = object.__new__(newclass)
return obj
def read_value(self):
self.current = self.query('PSU:SIG:FLD')
pf = self.query('PSU:SIG:PFLD')
if self.__init:
self.__init = False
self.persistent_field = pf
if self.switch_heater == self.switch_heater.on or self._field_mismatch is None:
self.forced_persistent_field = False
self._field_mismatch = False
return self.current
self._field_mismatch = abs(self.persistent_field - pf) > self.tolerance
return pf
def write_persistent_field(self, value):
if self.forced_persistent_field:
self._field_mismatch = False
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
return self.query('PSU:SIG:FLD')
def read_ramp(self):
return self.query('PSU:SIG:RFST')
@ -95,86 +80,224 @@ class Field(MercuryChannel, Magfield):
def write_action(self, value):
return self.change('PSU:ACTN', value, hold_rtoz_rtos_clmp)
def read_switch_heater(self):
value = self.query('PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < (self.switch_time[self.switch_heater] or 0) + 10:
# probably switch heater was changed, but IPS reply is not yet updated
return self.switch_heater
return value
def write_switch_heater(self, value):
return self.change('PSU:SIG:SWHT', value, off_on)
def read_atob(self):
return self.query('PSU:ATOB')
def read_voltage(self):
return self.query('PSU:SIG:VOLT')
def read_working_ramp(self):
return self.query('PSU:SIG:RFLD')
def read_setpoint(self):
return self.query('PSU:SIG:FSET')
def set_and_go(self, value):
self.setpoint = self.change('PSU:SIG:FSET', value)
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_set') == 'run_to_set'
def start_ramp_to_target(self, sm):
# if self.action != 'hold':
# assert self.write_action('hold') == 'hold'
# return Retry
self.set_and_go(sm.target)
sm.try_cnt = 5
return self.ramp_to_target
def ramp_to_target(self, sm):
try:
return super().ramp_to_target(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.target)
return Retry
def final_status(self, *args, **kwds):
print('FINAL-hold')
self.write_action('hold')
return super().final_status(*args, **kwds)
def on_restart(self, sm):
print('ON_RESTART-hold', sm.sm)
self.write_action('hold')
return super().on_restart(sm)
class Field(SimpleField, Magfield):
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=True, default=60)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=True, default=60)
forced_persistent_field = Parameter(
'manual indication that persistent field is bad', BoolType(), readonly=False, default=False)
_field_mismatch = None
__init = True
__switch_heater_fix = 0
def doPoll(self):
super().doPoll()
self.read_current()
def startModule(self, start_events):
# on restart, assume switch is changed long time ago, if not, the mercury
# will complain and this will be handled in start_ramp_to_field
self.switch_on_time = 0
self.switch_off_time = 0
self.switch_heater = self.query('PSU:SIG:SWHT', off_on)
super().startModule(start_events)
def read_value(self):
current = self.query('PSU:SIG:FLD')
pf = self.query('PSU:SIG:PFLD')
if self.__init:
self.__init = False
self.persistent_field = pf
if self.switch_heater == self.switch_heater.on or self._field_mismatch is None:
self.forced_persistent_field = False
self._field_mismatch = False
return current
self._field_mismatch = abs(self.persistent_field - pf) > self.tolerance
return pf
def read_current(self):
if self.slave_currents is None:
self.slave_currents = [[] for _ in range(self.nslaves + 1)]
current = self.query('PSU:SIG:CURR')
for i in range(self.nslaves + 1):
if i:
self.slave_currents = [[] for _ in range(self.nunits + 1)]
if self.nunits > 1:
for i in range(1, self.nunits + 1):
curri = self.query('DEV:PSU.M%d:PSU:SIG:CURR' % i)
volti = self.query('DEV:PSU.M%d:PSU:SIG:VOLT' % i)
setattr(self, 'I%d' % i, curri)
setattr(self, 'V%d' % i, volti)
self.slave_currents[i].append(curri)
else:
self.slave_currents[i].append(current)
min_i = min(self.slave_currents[i])
max_i = max(self.slave_currents[i])
min_ = min(self.slave_currents[0]) / self.nslaves
max_ = max(self.slave_currents[0]) / self.nslaves
if len(self.slave_currents[i]) > CURRENT_CHECK_SIZE:
self.slave_currents[i] = self.slave_currents[i][-CURRENT_CHECK_SIZE:]
if i and (min_i -1 > max_ or min_ > max_i + 1):
self.log.warning('individual currents mismatch %r', self.slave_currents)
current = self.query('PSU:SIG:CURR')
self.slave_currents[0].append(current)
min_ = min(self.slave_currents[0]) / self.nunits
max_ = max(self.slave_currents[0]) / self.nunits
# keep one element more for the total current (first and last measurement is a total)
self.slave_currents[0] = self.slave_currents[0][-CURRENT_CHECK_SIZE-1:]
for i in range(1, self.nunits + 1):
min_i = min(self.slave_currents[i])
max_i = max(self.slave_currents[i])
if len(self.slave_currents[i]) > CURRENT_CHECK_SIZE:
self.slave_currents[i] = self.slave_currents[i][-CURRENT_CHECK_SIZE:]
if min_i - 0.1 > max_ or min_ > max_i + 0.1: # use an arbitrary 0.1 A tolerance
self.log.warning('individual currents mismatch %r', self.slave_currents)
else:
current = self.query('PSU:SIG:CURR')
if self.atob:
return current / self.atob
return 0
def set_and_go(self, value):
self.change('PSU:SIG:FSET', value)
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_set') == 'run_to_set'
def write_persistent_field(self, value):
if self.forced_persistent_field:
self._field_mismatch = False
return value
raise BadValueError('changing persistent field needs forced_persistent_field=True')
def start_ramp_to_field(self, state):
def write_target(self, target):
if self._field_mismatch:
self.forced_persistent_field = True
raise BadValueError('persistent field does not match - set persistent field to guessed value first')
return super().write_target(target)
def read_switch_heater(self):
value = self.query('PSU:SIG:SWHT', off_on)
now = time.time()
if value != self.switch_heater:
if now < self.__switch_heater_fix:
# probably switch heater was changed, but IPS reply is not yet updated
if self.switch_heater:
self.switch_on_time = time.time()
else:
self.switch_off_time = time.time()
return self.switch_heater
return value
def read_wait_switch_on(self):
return self.query('PSU:SWONT') * 0.001
def read_wait_switch_off(self):
return self.query('PSU:SWOFT') * 0.001
def write_switch_heater(self, value):
if value == self.read_switch_heater():
self.log.info('switch heater already %r', value)
# we do not want to restart the timer
return value
self.__switch_heater_fix = time.time() + 10
return self.change('PSU:SIG:SWHT', value, off_on)
def start_ramp_to_field(self, sm):
if abs(self.current - self.persistent_field) <= self.tolerance:
self.log.info('leads %g are already at %g', self.current, self.persistent_field)
return self.ramp_to_field
try:
self.set_and_go(self.persistent_field)
except (HardwareError, AssertionError):
state.switch_undef = self.switch_time[self.switch_heater.on] or state.now
except (HardwareError, AssertionError) as e:
if self.switch_heater:
self.log.warn('switch is already on!')
return self.ramp_to_field
self.log.warn('wait first for switch off current=%g pf=%g', self.current, self.persistent_field)
return Retry
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_field
return self.wait_for_switch
return self.ramp_to_field
def ramp_to_field(self, state):
if self.action != 'run_to_set':
self.status = Status.PREPARING, 'restart ramp to field'
return self.start_ramp_to_field
return super().ramp_to_field(state)
def wait_for_switch(self, state):
if state.now - state.switch_undef < self.wait_switch_on:
return Retry()
self.set_and_go(self.persistent_field)
return self.ramp_to_field
def start_ramp_to_target(self, state):
self.set_and_go(self.target)
def start_ramp_to_target(self, sm):
sm.try_cnt = 5
try:
self.set_and_go(sm.target)
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch on'
sm.after_wait = self.ramp_to_target
return self.wait_for_switch
return self.ramp_to_target
def start_ramp_to_zero(self, state):
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_zero') == 'run_to_zero'
def ramp_to_field(self, sm):
try:
return super().ramp_to_field(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
self.set_and_go(sm.persistent_field)
return Retry
def wait_for_switch(self, sm):
if not self.delay(10):
return Retry
try:
self.log.warn('try again')
# try again
self.set_and_go(self.persistent_field)
except (HardwareError, AssertionError) as e:
return Retry
return sm.after_wait
def start_ramp_to_zero(self, sm):
try:
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_zero') == 'run_to_zero'
except (HardwareError, AssertionError) as e:
self.log.warn('switch not yet ready %r', e)
self.status = Status.PREPARING, 'wait for switch off'
sm.after_wait = self.ramp_to_zero
return self.wait_for_switch
return self.ramp_to_zero
def finish_state(self, state):
self.write_action('hold')
super().finish_state(state)
def ramp_to_zero(self, sm):
try:
return super().ramp_to_zero(sm)
except HardwareError:
sm.try_cnt -= 1
if sm.try_cnt < 0:
raise
assert self.write_action('hold') == 'hold'
assert self.write_action('run_to_zero') == 'run_to_zero'
return Retry

View File

@ -20,12 +20,12 @@
"""generic persistent magnet driver"""
import time
from secop.core import Drivable, Parameter, Done
from secop.core import Drivable, Parameter, Done, IDLE, BUSY, ERROR
from secop.datatypes import FloatRange, EnumType, ArrayOf, TupleOf, StatusType
from secop.features import HasLimits
from secop.errors import ConfigError, ProgrammingError
from secop.errors import ConfigError, ProgrammingError, HardwareError
from secop.lib.enum import Enum
from secop.lib.statemachine import Retry, StateMachine
from secop.states import Retry, HasStates, status_code
UNLIMITED = FloatRange()
@ -48,52 +48,23 @@ OFF = 0
ON = 1
class Magfield(HasLimits, Drivable):
class SimpleMagfield(HasStates, HasLimits, Drivable):
value = Parameter('magnetic field', datatype=FloatRange(unit='T'))
status = Parameter(datatype=StatusType(Status))
mode = Parameter(
'persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT)
ramp = Parameter(
'wanted ramp rate for field', FloatRange(unit='$/min'), readonly=False)
# export only when different from ramp:
workingramp = Parameter(
'effective ramp rate for field', FloatRange(unit='$/min'), export=False)
tolerance = Parameter(
'tolerance', FloatRange(0, unit='$'), readonly=False, default=0.0002)
switch_heater = Parameter('switch heater', EnumType(off=OFF, on=ON),
readonly=False, default=0)
persistent_field = Parameter(
'persistent field', FloatRange(unit='$'), readonly=False)
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
ramp = Parameter(
'ramp rate for field', FloatRange(unit='$/min'), readonly=False)
trained = Parameter(
'trained field (positive)',
TupleOf(FloatRange(-99, 0, unit='$'), FloatRange(0, unit='$')),
readonly=False, default=(0, 0))
# TODO: time_to_target
# profile = Parameter(
# 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))),
# readonly=False)
# profile_training = Parameter(
# 'ramp limit table when in training',
# ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False)
# TODO: the following parameters should be changed into properties after tests
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=60)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=60)
wait_stable_leads = Parameter(
'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6)
wait_stable_field = Parameter(
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=30)
persistent_limit = Parameter(
'above this limit, lead currents are not driven to 0',
FloatRange(0, unit='$'), readonly=False, default=99)
'wait time to ensure field is stable', FloatRange(0, unit='s'), readonly=False, default=31)
_state = None
_last_target = None
switch_time = None, None
def doPoll(self):
self.read_value()
self._state.cycle()
def checkProperties(self):
dt = self.parameters['target'].datatype
@ -104,216 +75,285 @@ class Magfield(HasLimits, Drivable):
dt.min = -max_
super().checkProperties()
def initModule(self):
super().initModule()
self.registerCallbacks(self) # for update_switch_heater
self._state = StateMachine(logger=self.log, threaded=False, cleanup=self.cleanup_state)
def startModule(self, start_events):
start_events.queue(self.startupCheck)
super().startModule(start_events)
def startupCheck(self):
if self.read_switch_heater() and self.mode == Mode.PERSISTENT:
self.read_value() # check for persistent field mismatch
# switch off heater from previous live or manual intervention
self.write_mode(self.mode)
self.write_target(self.persistent_field)
else:
self._last_target = self.persistent_field
def write_target(self, target):
self.check_limits(target)
self.target = target
if not self._state.is_active:
# as long as the state machine is still running, it takes care of changing targets
self._state.start(self.start_field_change)
self.doPoll()
return Done
def write_mode(self, value):
self.mode = value
if not self._state.is_active:
self._state.start(self.start_field_change)
self.doPoll()
return Done
def cleanup_state(self, state):
self.status = Status.ERROR, repr(state.last_error)
self.log.error('in state %s: %r', state.state.__name__, state.last_error)
self.setFastPoll(False)
if self.switch_heater != 0:
self.persistent_field = self.read_value()
if self.mode != Mode.DRIVEN:
self.log.warning('turn switch heater off')
self.write_switch_heater(0)
def stop(self):
"""keep field at current value"""
# let the state machine do the needed steps to finish
self.write_target(self.value)
def start_field_change(self, state):
def write_target(self, target):
self.check_limits(target)
self.start_machine(self.start_field_change, target=target)
return target
def init_progress(self, sm, value):
sm.prev_point = sm.now, value
def get_progress(self, sm, value):
"""return the time passed for at least one tolerance step"""
t, v = sm.prev_point
dif = abs(v - value)
tdif = sm.now - t
if dif > self.tolerance:
sm.prev_point = sm.now, value
return tdif
@status_code(BUSY, 'start ramp to target')
def start_field_change(self, sm):
self.setFastPoll(True, 1.0)
self.status = Status.PREPARING, 'changed target field'
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
return self.start_ramp_to_target
@status_code(BUSY, 'ramping field')
def ramp_to_target(self, sm):
if sm.init:
self.init_progress(sm, self.value)
# Remarks: assume there is a ramp limiting feature
if abs(self.value - sm.target) > self.tolerance:
if self.get_progress(sm, self.value):
return Retry
raise HardwareError('no progress')
sm.stabilize_start = time.time()
return self.stabilize_field
@status_code(BUSY, 'stabilizing field')
def stabilize_field(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_field:
return Retry
return self.final_status()
def read_workingramp(self):
return self.ramp
class Magfield(SimpleMagfield):
status = Parameter(datatype=StatusType(Status))
mode = Parameter(
'persistent mode', EnumType(Mode), readonly=False, default=Mode.PERSISTENT)
switch_heater = Parameter('switch heater', EnumType(off=OFF, on=ON),
readonly=False, default=0)
persistent_field = Parameter(
'persistent field', FloatRange(unit='$'), readonly=False)
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
# TODO: time_to_target
# profile = Parameter(
# 'ramp limit table', ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))),
# readonly=False)
# profile_training = Parameter(
# 'ramp limit table when in training',
# ArrayOf(TupleOf(FloatRange(unit='$'), FloatRange(unit='$/min'))), readonly=False)
# TODO: the following parameters should be changed into properties after tests
wait_switch_on = Parameter(
'wait time to ensure switch is on', FloatRange(0, unit='s'), readonly=False, default=61)
wait_switch_off = Parameter(
'wait time to ensure switch is off', FloatRange(0, unit='s'), readonly=False, default=61)
wait_stable_leads = Parameter(
'wait time to ensure current is stable', FloatRange(0, unit='s'), readonly=False, default=6)
persistent_limit = Parameter(
'above this limit, lead currents are not driven to 0',
FloatRange(0, unit='$'), readonly=False, default=99)
leads_ramp_tmo = Parameter(
'timeout for leads ramp progress',
FloatRange(0, unit='s'), readonly=False, default=30)
ramp_tmo = Parameter(
'timeout for field ramp progress',
FloatRange(0, unit='s'), readonly=False, default=30)
__init = True
switch_on_time = None
switch_off_time = None
def doPoll(self):
if self.__init:
self.__init = False
if self.read_switch_heater() and self.mode == Mode.PERSISTENT:
self.read_value() # check for persistent field mismatch
# switch off heater from previous live or manual intervention
self.write_target(self.persistent_field)
else:
self._last_target = self.persistent_field
else:
super().doPoll()
def initModule(self):
super().initModule()
self.registerCallbacks(self) # for update_switch_heater
def write_mode(self, value):
self.start_machine(self.start_field_change, cleanup=self.cleanup, target=self.target, mode=value)
return value
def write_target(self, target):
self.check_limits(target)
self.start_machine(self.start_field_change, cleanup=self.cleanup, target=target, mode=self.mode)
return target
def cleanup(self, sm): # sm is short for statemachine
if self.switch_heater != 0:
self.persistent_field = self.read_value()
if sm.mode != Mode.DRIVEN:
self.log.warning('turn switch heater off')
self.write_switch_heater(0)
@status_code('PREPARING')
def start_field_change(self, sm):
self.setFastPoll(True, 1.0)
if sm.target == self.persistent_field or (
sm.target == self._last_target and
abs(sm.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
if self.switch_heater:
return self.start_switch_on
return self.start_ramp_to_field
def start_ramp_to_field(self, state):
@status_code('PREPARING')
def start_ramp_to_field(self, sm):
"""start ramping current to persistent field
should return ramp_to_field
initiate ramp to persistent field (with corresponding ramp rate)
the implementation should return ramp_to_field
"""
raise NotImplementedError
def ramp_to_field(self, state):
"""ramping, wait for current at persistent field"""
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
if abs(self.current - self.persistent_field) > self.tolerance:
if state.init:
self.status = Status.PREPARING, 'ramping leads current to field'
return Retry()
state.stabilize_start = time.time()
@status_code('PREPARING', 'ramp leads to match field')
def ramp_to_field(self, sm):
if sm.init:
sm.stabilize_start = 0 # in case current is already at field
self.init_progress(sm, self.current)
dif = abs(self.current - self.persistent_field)
if dif > self.tolerance:
tdif = self.get_progress(sm, self.current)
if tdif > self.leads_ramp_tmo:
raise HardwareError('no progress')
sm.stabilize_start = None # force reset
return Retry
if sm.stabilize_start is None:
sm.stabilize_start = sm.now
return self.stabilize_current
def stabilize_current(self, state):
"""wait for stable current at persistent field"""
if state.now - state.stabilize_start < self.wait_stable_leads:
if state.init:
self.status = Status.PREPARING, 'stabilizing leads current'
return Retry()
@status_code('PREPARING')
def stabilize_current(self, sm):
if sm.now - sm.stabilize_start < self.wait_stable_leads:
return Retry
return self.start_switch_on
def update_switch_heater(self, value):
"""is called whenever switch heater was changed"""
switch_time = self.switch_time[value]
if switch_time is None:
switch_time = time.time()
self.switch_time = [None, None]
self.switch_time[value] = switch_time
print('SW', value)
if value == 0:
if self.switch_off_time is None:
self.log.info('restart switch_off_time')
self.switch_off_time = time.time()
self.switch_on_time = None
else:
if self.switch_on_time is None:
self.log.info('restart switch_on_time')
self.switch_on_time = time.time()
self.switch_off_time = None
def start_switch_on(self, state):
"""switch heater on"""
if self.switch_heater == 0:
@status_code('PREPARING')
def start_switch_on(self, sm):
if self.read_switch_heater() == 0:
self.status = Status.PREPARING, 'turn switch heater on'
try:
self.write_switch_heater(True)
except Exception as e:
self.log.warning('write_switch_heater %r', e)
return Retry()
return Retry
else:
self.status = Status.PREPARING, 'wait for heater on'
return self.switch_on
return self.wait_for_switch_on
def switch_on(self, state):
"""wait for switch heater open"""
if (self.target == self._last_target and
abs(self.target - self.persistent_field) <= self.tolerance): # short cut
@status_code('PREPARING')
def wait_for_switch_on(self, sm):
if (sm.target == self._last_target and
abs(sm.target - self.persistent_field) <= self.tolerance): # short cut
return self.check_switch_off
self.read_switch_heater()
if self.switch_time[ON] is None:
self.read_switch_heater() # trigger switch_on/off_time
if self.switch_heater == 0:
self.log.warning('switch turned off manually?')
return self.start_switch_on
if state.now - self.switch_time[ON] < self.wait_switch_on:
return Retry()
self._last_target = self.target
if sm.now - self.switch_on_time < self.wait_switch_on:
if sm.delta(10):
self.log.info('waited for %g sec', sm.now - self.switch_on_time)
return Retry
self._last_target = sm.target
return self.start_ramp_to_target
def start_ramp_to_target(self, state):
"""start ramping current to target
@status_code('RAMPING')
def start_ramp_to_target(self, sm):
"""start ramping current to target field
should return ramp_to_target
initiate ramp to target
the implementation should return ramp_to_target
"""
raise NotImplementedError
def ramp_to_target(self, state):
"""ramp field to target"""
if self.target != self._last_target: # target was changed
self._last_target = self.target
return self.start_ramp_to_target
@status_code('RAMPING')
def ramp_to_target(self, sm):
self.persistent_field = self.value
# Remarks: assume there is a ramp limiting feature
if abs(self.value - self.target) > self.tolerance:
if state.init:
self.status = Status.RAMPING, 'ramping field'
return Retry()
state.stabilize_start = time.time()
dif = abs(self.value - sm.target)
if sm.init:
sm.stabilize_start = 0 # in case current is already at target
self.init_progress(sm, self.value)
if dif > self.tolerance:
sm.stabilize_start = sm.now
tdif = self.get_progress(sm, self.value)
if tdif > self.workingramp / self.tolerance * 60 + self.ramp_tmo:
raise HardwareError('no progress')
sm.stabilize_start = None
return Retry
if sm.stabilize_start is None:
sm.stabilize_start = sm.now
return self.stabilize_field
def stabilize_field(self, state):
"""stabilize field"""
if self.target != self._last_target: # target was changed
self._last_target = self.target
return self.start_ramp_to_target
@status_code('STABILIZING')
def stabilize_field(self, sm):
self.persistent_field = self.value
if state.now - state.stabilize_start < self.wait_stable_field:
if state.init:
self.status = Status.STABILIZING, 'stabilizing field'
return Retry()
if sm.now > sm.stablize_start + self.wait_stable_field:
return Retry
return self.check_switch_off
def check_switch_off(self, state):
if self.mode == Mode.DRIVEN:
self.status = Status.PREPARED, 'driven'
return self.finish_state
def check_switch_off(self, sm):
if sm.mode == Mode.DRIVEN:
return self.final_status(Status.PREPARED, 'driven')
return self.start_switch_off
def start_switch_off(self, state):
"""turn off switch heater"""
@status_code('FINALIZING')
def start_switch_off(self, sm):
if self.switch_heater == 1:
self.status = Status.FINALIZING, 'turn switch heater off'
self.write_switch_heater(False)
else:
self.status = Status.FINALIZING, 'wait for heater off'
return self.switch_off
return self.wait_for_switch_off
def switch_off(self, state):
"""wait for switch heater closed"""
if self.target != self._last_target or self.mode == Mode.DRIVEN:
# target or mode has changed -> redo
self._last_target = None
return self.start_switch_on
@status_code('FINALIZING')
def wait_for_switch_off(self, sm):
self.persistent_field = self.value
self.read_switch_heater()
if self.switch_time[OFF] is None:
if self.switch_off_time is None:
self.log.warning('switch turned on manually?')
return self.start_switch_off
if state.now - self.switch_time[OFF] < self.wait_switch_off:
return Retry()
if sm.now - self.switch_off_time < self.wait_switch_off:
return Retry
if abs(self.value) > self.persistent_limit:
self.status = Status.IDLE, 'leads current at field, switch off'
return self.finish_state
return self.final_status(Status.IDLE, 'leads current at field, switch off')
return self.start_ramp_to_zero
def start_ramp_to_zero(self, state):
"""start ramping current to target
@status_code('FINALIZING')
def start_ramp_to_zero(self, sm):
"""start ramping current to zero
initiate ramp to zero (with corresponding ramp rate)
should return ramp_to_zero
the implementation should return ramp_to_zero
"""
raise NotImplementedError
def ramp_to_zero(self, state):
@status_code('FINALIZING')
def ramp_to_zero(self, sm):
"""ramp field to zero"""
if self.target != self._last_target or self.mode == Mode.DRIVEN:
# target or mode has changed -> redo
self._last_target = None
return self.start_field_change
if sm.init:
self.init_progress(sm, self.current)
if abs(self.current) > self.tolerance:
if state.init:
self.status = Status.FINALIZING, 'ramp leads to zero'
return Retry()
if self.mode == Mode.DISABLED and self.persistent_field == 0:
self.status = Status.DISABLED, 'disabled'
else:
self.status = Status.IDLE, 'persistent mode'
return self.finish_state
def finish_state(self, state):
"""finish"""
self.setFastPoll(False)
return None
if self.get_progress(sm, self.current, self.ramp) > self.leads_ramp_tmo:
raise HardwareError('no progress')
return Retry
if sm.mode == Mode.DISABLED and self.persistent_field == 0:
return self.final_status(Status.DISABLED, 'disabled')
return self.final_status(Status.IDLE, 'persistent mode')

89
secop_psi/vector.py Normal file
View File

@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""generic 3D vector"""
from secop.core import Attached, Drivable, Readable, Parameter, Done
from secop.datatypes import FloatRange, TupleOf, StatusType, Enum
class VectorRd(Readable):
"""generic readable vector"""
value = Parameter(datatype=TupleOf(FloatRange(), FloatRange(), FloatRange()))
x = Attached()
y = Attached()
z = Attached()
pollFuncs = None
components = None
def initModule(self):
super().initModule()
members = []
status_codes = {} # collect all possible status codes
components = []
for name in 'xyz':
component = getattr(self, name)
members.append(component.parameters['value'].datatype.copy())
components.append(component)
for code in component.status[0].enum.members:
status_codes[int(code)] = code.name
self.parameters['value'].datatype = TupleOf(*members)
self.parameters['status'].datatype = StatusType(Enum(
'status', **{k: v for v, k in status_codes.items()}))
self.components = components
def doPoll(self):
for component in self.components:
component.doPoll()
# update
component.pollInfo.last_main = self.pollInfo.last_main
self.value = self.merge_value()
self.status = self.merge_status()
def merge_value(self):
return [c.value for c in self.components]
def merge_status(self):
status = -1, ''
for c in self.components:
if c.status[0] > status[0]:
status = c.status
return status
def read_value(self):
return tuple((c.read_value() for c in self.components))
def read_status(self):
[c.read_status() for c in self.components]
return self.merge_status()
class Vector(Drivable, VectorRd):
"""generic drivable vector"""
target = Parameter(datatype=TupleOf(FloatRange(), FloatRange(), FloatRange()))
def initModule(self):
super().initModule()
members = []
for component in self.components:
members.append(component.parameters['target'].datatype.copy())
self.parameters['target'].datatype = TupleOf(*members)
def write_target(self, value):
return tuple((c.write_target(v) for v, c in zip(value, self.components)))