frappy/frappy_psi/cryoltd.py
Markus Zolliker d55ee42612 remove more coding cookies
mainly from frappy_psi

Change-Id: I192811459aebe97f3076888cd31a308a51e6aa49
2024-01-29 16:00:44 +01:00

606 lines
20 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""flame magnet using cryogenic limited software suite
Remarks: for reading, we use the GetAll command, which is quite fast
(3 ms response time). However, as in many such sloppy programmed software
the timing is badly handled, as after changing parameters, the readback values
are not yet up to date. We use the array block_until for this purpose: any value
changed from the client is fixed for at least 10 seconds.
"""
import re
import time
from math import copysign
from frappy.core import HasIO, StringIO, Readable, Drivable, Parameter, Command, \
Module, Property, Attached, Enum, IDLE, BUSY, ERROR
from frappy.errors import ConfigError, BadValueError, HardwareError
from frappy.datatypes import FloatRange, StringType, EnumType, StructOf
from frappy.states import HasStates, status_code, Retry
import frappy_psi.magfield as magfield
# floating point value followed with unit
VALUE_UNIT = re.compile(r'([-0-9.E]*\d|inf)([A-Za-z/%]*)$')
def as_float(value):
"""converts string (with unit) to float"""
return float(VALUE_UNIT.match(value).group(1))
BOOL_MAP = {'TRUE': True, 'FALSE': False}
def as_bool(value):
return BOOL_MAP[value]
def as_string(value):
return value
class Mapped:
def __init__(self, **kwds):
self.mapping = kwds
self.mapping.update({v: k for k, v in kwds.items()})
def __call__(self, value):
return self.mapping[value]
class IO(StringIO):
identification = [('Get:Remote:Remote_Ready', 'Remote_Ready RECEIVED: TRUE')]
end_of_line = (b'\n', b'\r\n')
timeout = 15
class Main(HasIO, Module):
pollinterval = Parameter('main poll interval', FloatRange(0.001), default=1)
export = False
params_map = None
triggers = None
initpolls = None
ioClass = IO
def register_module(self, obj, **params):
"""register a channel
:param obj: the remote object
:param **params:
pname=<Key in GetAll> or pname=(<Key in Getall>, convert function)
the convert function is as_float by default
"""
if self.params_map is None:
self.params_map = {}
self.triggers = set()
obj.block_until = {}
if hasattr(obj, 'trigger_update'):
self.triggers.add(obj.trigger_update)
for pname, value in params.items():
if isinstance(value, str):
key, cvt = value, as_float
else:
key, cvt = value
self.params_map[key.replace('<CH>', obj.channel)] = obj, pname, cvt
def doPoll(self):
# format of reply:
# "key1:value1;key2:value2;"
reply = self.communicate('GetAll').split('RECEIVED ', 1)[1].split(';')
missing = None, None, None
for line in reply:
try:
key, value = line.split(':', 1)
except ValueError: # missing ':'
if line:
pass
# ignore multiline values
# if needed, we may collect here and treat with a special key
continue
obj, pname, cvt = self.params_map.get(key, missing)
if obj:
if not hasattr(obj, pname):
raise ConfigError('%s has no attribute %s' % (obj.name, pname))
if time.time() > obj.block_until.get(pname, 0):
setattr(obj, pname, cvt(value))
for trigger in self.triggers: # do trigger after updates
trigger()
def communicate(self, cmd):
return self.io.communicate(cmd)
class Channel:
main = Attached(Main)
channel = Property('channel name', StringType())
pollinterval = Parameter(export=False)
block_until = None
def sendcmd(self, cmd):
cmd = cmd.replace('<CH>', self.channel)
reply = self.main.communicate(cmd)
if not reply.startswith(cmd):
self.log.warn('reply %r does not match command %r', reply, cmd)
def block(self, pname, value=None, delay=10):
self.block_until[pname] = time.time() + delay
if value is not None:
setattr(self, pname, value)
def doPoll(self):
"""poll is done by main module"""
class Temperature(Channel, Readable):
channel = Property('channel name as in reply to GetAll', StringType())
value = Parameter('T sensor value', FloatRange(0, unit='K'))
description = '' # by default take description from channel name
KEYS = {
'1st Stage A',
'2nd Stage A',
'1st Stage B',
'2nd Stage B',
'Inner Magnet A (Top)',
'Inner Magnet A (Bottom)',
'Z Shim Former',
'XY Shim Former',
'XY Vector Former',
'Radiation Shield',
'Persistent Joints',
'Outer Magnet A',
'Inner Magnet B (Top)',
'Inner Magnet B (Bottom)',
'Z Shim Former B',
'Outer Magnet B',
'Bore Radiation Shield',
'XYZ Shim Plate',
'Z Shim Switch',
'Main Coil Switch',
}
def initModule(self):
super().initModule()
if not self.description:
self.description = self.channel
if self.channel not in self.KEYS:
raise ConfigError('channel (%s) must be one of %r' % (self.channel, self.KEYS))
self.main.register_module(self, value=self.channel)
CsMode = Enum(
PERSISTENT=1,
SEMIPERSISTENT=2,
DRIVEN=0,
)
PersistencyMode = Enum(
DISABLED=0,
PERSISTENT=30,
SEMIPERSISTENT=31,
DRIVEN=50,
)
class BaseMagfield(HasStates, Channel):
_status_text = ''
_ready_text = ''
_error_text = ''
_last_error = ''
_rate_units = ''
hw_units = Property('hardware units: A or T', EnumType(A=0, T=1))
A_to_G = Property('A_to_G = "Gauss Value / Amps Value"', FloatRange(0))
tolerance = Parameter('tolerance', FloatRange(0), readonly=False, default=1)
def earlyInit(self):
super().earlyInit()
dt = self.parameters['target'].datatype
if dt.min < 1e-99: # make limits symmetric
dt.min = - dt.max
# min_, max_ = self.target_limits
# self.target_limits = [max(min_, dt.min), min(max_, dt.max)]
dt = self.parameters['ramp'].datatype
if self.ramp == 0: # unconfigured: take max.
self.ramp = dt.max
if not isinstance(self, magfield.Magfield):
# add unneeded attributes, as they appear in GetAll
self.switch_heater = None
self.current = None
def doPoll(self):
super().doPoll() # does not call cycle_machine
self.cycle_machine()
def to_gauss(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'T':
return float(value) * 10000
if unit == 'A':
return float(value) * self.A_to_G
raise HardwareError('received unknown unit: %s' % unit)
def to_gauss_min(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'A/s':
return float(value) * self.A_to_G * 60.0
if unit == 'T/s':
return float(value) * 10000 * 60
if unit == 'T/min':
return float(value) * 10000
if unit == 'T/h':
return float(value) * 10000 / 60.0
raise HardwareError('received unknown unit: %s' % unit)
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('<CH>_Field', self.to_gauss),
_status_text=('<CH>_Status', str),
_ready_text=('<CH>_Ready', str),
_error_text=('<CH>_Error', self.cvt_error),
_rate_units=('<CH>_Rate Units', str),
current=('<CH>_PSU Output', self.to_gauss),
voltage='<CH>_Voltage',
workingramp=('<CH>_Ramp Rate', self.to_gauss_min),
setpoint=('<CH>_Setpoint', self.to_gauss),
switch_heater=('<CH>_Heater', self.cvt_switch_heater),
cs_mode=('<CH>_Persistent Mode', self.cvt_cs_mode),
# no readback: approach_mode=('<CH>_Approach', self.cvt_approach_mode),
)
def cvt_error(self, text):
if text != self._last_error:
self._last_error = text
self.log.error(text)
return text
return self._error_text
def trigger_update(self):
# called after treating result of GetAll message
if self._error_text:
if self.status[0] == ERROR:
errtxt = self._error_text
else:
errtxt = '%s while %s' % (self._error_text, self.status[1])
# self.stop_machine((ERROR, errtxt))
value = Parameter('magnetic field in the coil', FloatRange(unit='G'))
setpoint = Parameter('setpoint', FloatRange(unit='G'), default=0)
ramp = Parameter()
target = Parameter()
def write_ramp(self, ramp):
if self._rate_units != 'A/s':
self.sendcmd('Set:<CH>:ChangeRateUnits A/s')
self.sendcmd('Set:<CH>:SetRate %g' % (ramp / self.A_to_G / 60.0))
return ramp
def write_target(self, target):
self.reset_error()
super().write_target(target)
return target
def start_sweep(self, target):
if self.approach_mode == self.approach_mode.OVERSHOOT:
sign = copysign(1, target - self.value) * copysign(1, target)
oval = copysign(self.overshoot['o'], sign)
tval = self.overshoot['t']
self.sendcmd('Set:<CH>:SetOvershoot %g,%g' % (oval, tval))
self.write_ramp(self.ramp)
if self.hw_units == 'A':
self.sendcmd('Set:<CH>:Sweep %gA' % (target / self.A_to_G))
else:
self.sendcmd('Set:<CH>:Sweep %gT' % (target / 10000))
self.block('_ready_text', 'FALSE')
def start_field_change(self, sm):
if self._ready_text == 'FALSE':
self.sendcmd('Set:<CH>:Abort')
return self.wait_ready
return super().start_field_change
def wait_ready(self, sm):
if self._ready_text == 'FALSE':
return Retry
return super().start_field_change
def start_ramp_to_target(self, sm):
self.start_sweep(sm.target)
return self.ramp_to_target # -> stabilize_field
def stabilize_field(self, sm):
if self._ready_text == 'FALSE':
# wait for overshoot/degauss/cycle
sm.stabilize_start = sm.now
return Retry
if sm.stabilize_start is None:
sm.stabilize_start = sm.now
if sm.now - sm.stabilize_start < self.wait_stable_field:
return Retry
return self.end_stablilize
def end_stablilize(self, sm):
return self.final_status()
cs_mode = Parameter('persistent mode', EnumType(CsMode), readonly=False, default=0)
@staticmethod
def cvt_cs_mode(text):
text = text.lower()
if 'off' in text:
if '0' in text:
return CsMode.PERSISTENT
return CsMode.SEMIPERSISTENT
return CsMode.DRIVEN
def write_cs_mode(self, value):
self.sendcmd('Set:<CH>:SetPM %d' % int(value))
self.block('cs_mode')
return value
ApproachMode = Enum(
DIRECT=0,
OVERSHOOT=1,
CYCLING=2,
DEGAUSSING=3,
)
approach_mode = Parameter('approach mode', EnumType(ApproachMode), readonly=False,
group='approach_settings', default=0)
def write_approach_mode(self, value):
self.sendcmd('Set:<CH>:SetApproach %d' % value)
self.block('approach_mode')
return value
# no readback:
# @classmethod
# def cvt_approach_mode(cls, text):
# return cls.ApproachMode(text.upper())
overshoot = Parameter('overshoot [%] and hold time [s]',
StructOf(o=FloatRange(0, 100, unit='%'), t=FloatRange(0, unit='s')),
readonly=False, default=dict(o=0, t=0),
group='approach_settings')
#def write_overshoot(self, value):
# self.sendcmd('Set:<CH>:SetOvershoot %g,%g' % (value['o'], value['t']))
# return value
cycle = Parameter('start value, damping factor, final value, hold time',
StructOf(s=FloatRange(-100, 100, unit='%'),
d=FloatRange(0, 100, unit='%'),
a=FloatRange(0, 100, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, a=0, t=0),
group='approach_settings')
def write_cycle(self, value):
self.sendcmd('Set:<CH>:SetCycling %g,%g,%g,%g' %
(value['s'], value['d'], value['a'] * 1e-4, value['t']))
return value
degauss = Parameter('start value [G], damping factor [%], accuracy [G], hold time [s]',
StructOf(s=FloatRange(-10000, 10000, unit='G'),
d=FloatRange(0, 100, unit='%'),
f=FloatRange(0, 10000, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, f=0, t=0),
group='approach_settings')
def write_degauss(self, value):
self.sendcmd('Set:<CH>:SetDegaussing %g,%g,%g,%g' %
(value['s'] * 1e-4, value['d'], value['f'] * 1e-4, value['t']))
return value
voltage = Parameter(
'voltage over leads', FloatRange(unit='V'))
@staticmethod
def cvt_switch_heater(text):
return 'ON' in text
@Command()
def stop(self):
self.sendcmd('Set:<CH>:Abort')
@Command()
def reset_quench(self):
"""reset quench condition"""
self.sendcmd('Set:<CH>:ResetQuench')
@Command()
def reset_error(self):
"""reset error"""
self._error_text = ''
class MainField(BaseMagfield, magfield.Magfield):
checked_modules = None
def earlyInit(self):
super().earlyInit()
self.checked_modules = []
def check_limits(self, value):
super().check_limits(value)
self.check_combined(None, 0, value)
# TODO: turn into a property
constraint = Parameter('product check', FloatRange(unit='G^2'), default=80000)
def check_combined(self, obj, value, main_target):
sumvalue2 = sum(((value ** 2 if o == obj else o.value ** 2)
for o in self.checked_modules))
if sumvalue2 * max(self.value ** 2, main_target ** 2) > self.constraint ** 2:
raise BadValueError('outside constraint (B * Bxyz > %g G^2' % self.constraint)
def check_limits(self, value):
super().check_limits(value)
self.check_combined(None, 0, value)
mode = Parameter(datatype=EnumType(PersistencyMode))
def write_mode(self, mode):
self.reset_error()
super().write_mode(mode) # updates mode
return mode
@status_code('PREPARING')
def start_ramp_to_field(self, sm):
self.sendcmd('Set:<CH>:SetApproach 0') # direct mode
self.start_sweep(self.value)
return self.ramp_to_field # -> stabilize_current -> start_switch_on
@status_code('PREPARING')
def start_switch_on(self, sm):
self.write_cs_mode(CsMode.DRIVEN)
self.sendcmd('Set:<CH>:SetApproach 0') # direct mode
# self.block('switch_heater', 1, 60)
self.start_sweep(self.value)
self.block('_ready_text', 'FALSE')
# self.switch_on_time = sm.now # not needed? don ein update_switch_heater
return self.wait_for_switch_on # -> start_ramp_to_target -> ramp_to_target -> stabilize_field
wait_switch_on = Parameter('minimum time to wait for opening switch')
@status_code('PREPARING')
def wait_for_switch_on(self, sm):
if self.switch_on_time is None:
self.switch_on_time = sm.now
if sm.now - self.switch_on_time < self.wait_switch_on:
return Retry
if self._ready_text == 'FALSE':
return Retry
self._last_target = sm.target
self.sendcmd('Set:<CH>:SetApproach %d' % self.approach_mode)
return self.start_ramp_to_target
def end_stablilize(self, sm):
return self.start_switch_off
@status_code('FINALIZING')
def start_switch_off(self, sm):
if self.mode == PersistencyMode.DRIVEN:
return self.final_status(IDLE, 'driven')
if self.mode == PersistencyMode.SEMIPERSISTENT:
self.write_cs_mode(CsMode.SEMIPERSISTENT)
else: # PERSISTENT or DISABLED
self.write_cs_mode(CsMode.PERSISTENT)
self.sendcmd('Set:<CH>:SetApproach 0') # direct mode
self.start_sweep(sm.target)
self.block('_ready_text', 'FALSE')
self.block('switch_heater', 1)
self.switch_off_time = sm.now
return self.wait_for_switch_off # -> start_ramp_to_zero
wait_switch_off = Parameter('minimum time to wait for closing switch')
@status_code('PREPARING')
def wait_for_switch_off(self, sm):
if self.switch_off_time is None:
self.switch_off_time = sm.now
if sm.now - self.switch_off_time < self.wait_switch_off:
return Retry
if self.switch_heater:
return Retry
self._last_target = sm.target
if self.mode == PersistencyMode.SEMIPERSISTENT:
return self.final_status(IDLE, 'semipersistent')
return self.ramp_to_zero
@status_code('FINALIZING')
def ramp_to_zero(self, sm):
if self._ready_text == 'FALSE':
return Retry
if self.mode == PersistencyMode.DISABLED:
return self.final_status(PersistencyMode.DISABLED, 'disabled')
return self.final_status(IDLE, 'persistent')
class ComponentField(BaseMagfield, magfield.SimpleMagfield):
check_against = Attached(MainField)
# status = Parameter(datatype=EnumType(Drivable.Status, RAMPING=370, STABILIZING=380, FINALIZING=390))
def initModule(self):
super().initModule()
self.check_against.checked_modules.append(self)
def check_limits(self, value):
super().check_limits(value)
self.check_against.check_combined(self, value, 0)
class Compressor(Channel, Drivable):
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('Compressor<CH>_Status', self.cvt_value),
_ready_text=('Compressor<CH>_Ready', str),
_error_text=('Compressor<CH>_Error', str),
run_time='Compressor<CH>_RunTime',
)
# TODO: what is Compressor_Error? (without A or B)
value = Parameter('compressor switch', EnumType(OFF=0, ON=1))
run_time = Parameter('run time', FloatRange(0, unit='h'))
_status_text = ''
_ready_text = ''
_error_text = ''
def cvt_value(self, text):
self._status_text = text
value = text == 'Running'
if time.time() > self.block_until.get('target', 0):
self.target = value
return value
def read_value(self):
return self._status_text == 'Running'
def read_status(self):
if self.target != self.value:
return BUSY, 'switching %s' % self.target.name
# TODO: find out possible status texts
if self._ready_text == 'TRUE':
return IDLE, 'ready'
if self._error_text:
return ERROR, self._error_text
return IDLE, self._status_text
target = Parameter('compressor switch', EnumType(OFF=0, ON=1))
def write_target(self, value):
if value:
self.sendcmd('Set:Compressor:Start <CH>')
else:
self.sendcmd('Set:Compressor:Stop <CH>')
self.block('target')
self.read_status()
return value
@Command()
def reset_error(self):
"""reset error"""
self.sendcmd('Set:Compressor:Reset <CH>')
self._error_text = ''