frappy/secop_psi/cryoltd.py
2022-11-30 14:14:47 +01:00

513 lines
18 KiB
Python

# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""flame magnet using cryogenic limited software suite
Remarks: for reading, we use the GetAll command, which is quite fast
(3 ms response time). However, as in many such sloppy programmed software
the timing is badly handled, as after changing parameters, the readback values
are not yet up to date. We use the array block_until for this purpose: any value
changed from the client is fixed for at least 10 seconds.
"""
import re
import time
from secop.core import HasIO, StringIO, Readable, Drivable, Parameter, Command, \
Module, Property, Attached, Enum, IDLE, BUSY, ERROR
from secop.features import HasLimits
from secop.errors import ConfigError, BadValueError, HardwareError
from secop.datatypes import FloatRange, StringType, EnumType, StructOf, OrType
# floating point value followed with unit
VALUE_UNIT = re.compile(r'([-0-9.E]*\d|inf)([A-Za-z/%]*)$')
def as_float(value):
"""converts string (with unit) to float"""
return float(VALUE_UNIT.match(value).group(1))
BOOL_MAP = {'TRUE': True, 'FALSE': False}
def as_bool(value):
return BOOL_MAP[value]
def as_string(value):
return value
class Mapped:
def __init__(self, **kwds):
self.mapping = kwds
self.mapping.update({v: k for k, v in kwds.items()})
def __call__(self, value):
return self.mapping[value]
class IO(StringIO):
identification = [('Get:Remote:Remote_Ready', 'Remote_Ready RECEIVED: TRUE')]
end_of_line = (b'\n', b'\r\n')
timeout = 15
class Main(HasIO, Module):
pollinterval = Parameter('main poll interval', FloatRange(0.001), default=1)
export = False
params_map = None
triggers = None
initpolls = None
ioClass = IO
def register_module(self, obj, **params):
"""register a channel
:param obj: the remote object
:param **params:
pname=<Key in GetAll> or pname=(<Key in Getall>, convert function)
the convert function is as_float by default
"""
if self.params_map is None:
self.params_map = {}
self.triggers = set()
obj.block_until = {}
if hasattr(obj, 'trigger_update'):
self.triggers.add(obj.trigger_update)
for pname, value in params.items():
if isinstance(value, str):
key, cvt = value, as_float
else:
key, cvt = value
self.params_map[key.replace('<CH>', obj.channel)] = obj, pname, cvt
def doPoll(self):
# format of reply:
# "key1:value1;key2:value2;"
reply = self.communicate('GetAll').split('RECEIVED ', 1)[1].split(';')
missing = None, None, None
for line in reply:
try:
key, value = line.split(':', 1)
except ValueError: # missing ':'
if line:
pass
# ignore multiline values
# if needed, we may collect here and treat with a special key
continue
obj, pname, cvt = self.params_map.get(key, missing)
if obj:
if not hasattr(obj, pname):
raise ConfigError('%s has no attribute %s' % (obj.name, pname))
if time.time() > obj.block_until.get(pname, 0):
setattr(obj, pname, cvt(value))
for trigger in self.triggers: # do trigger after updates
trigger()
def communicate(self, cmd):
return self.io.communicate(cmd)
class Channel:
main = Attached(Main, default='main')
channel = Property('channel name', StringType())
pollinterval = Parameter(export=False)
block_until = None
def sendcmd(self, cmd):
cmd = cmd.replace('<CH>', self.channel)
reply = self.main.communicate(cmd)
if not reply.startswith(cmd):
print('MISMATCH', cmd, reply)
def block(self, pname, value=None):
self.block_until[pname] = time.time() + 10
if value is not None:
setattr(self, pname, value)
def doPoll(self):
"""poll is done by main module"""
class Temperature(Channel, Readable):
channel = Property('channel name as in reply to GetAll', StringType())
value = Parameter('T sensor value', FloatRange(0, unit='K'))
description = '' # by default take description from channel name
KEYS = {
'1st Stage A',
'2nd Stage A',
'1st Stage B',
'2nd Stage B',
'Inner Magnet A (Top)',
'Inner Magnet A (Bottom)',
'Z Shim Former',
'XY Shim Former',
'XY Vector Former',
'Radiation Shield',
'Persistent Joints',
'Outer Magnet A',
'Inner Magnet B (Top)',
'Inner Magnet B (Bottom)',
'Z Shim Former B',
'Outer Magnet B',
'Bore Radiation Shield',
'XYZ Shim Plate',
'Z Shim Switch',
'Main Coil Switch',
}
def initModule(self):
super().initModule()
if not self.description:
self.description = self.channel
if self.channel not in self.KEYS:
raise ConfigError('channel (%s) must be one of %r' % (self.channel, self.KEYS))
self.main.register_module(self, value=self.channel)
class Magfield(HasLimits, Channel, Drivable):
_status_text = ''
_ready_text = ''
_error_text = ''
_last_error = ''
_rate_units = ''
_next_target = None
_last_target = None
hw_units = Property('hardware units: A or T', EnumType(A=0, T=1))
A_to_G = Property('A_to_G = "Gauss Value / Amps Value"', FloatRange(0))
tolerance = Parameter('tolerance', FloatRange(0), readonly=False, default=1)
def earlyInit(self):
super().earlyInit()
dt = self.parameters['target'].datatype
if dt.min < 1e-99: # make limits symmetric
dt.min = - dt.max
min_, max_ = self.target_limits
self.target_limits = [max(min_, dt.min), max_]
dt = self.parameters['ramp'].datatype
if self.ramp == 0: # unconfigured: take max.
self.ramp = dt.max
def to_gauss(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'T':
return float(value) * 10000
if unit == 'A':
return float(value) * self.A_to_G
raise HardwareError('received unknown unit: %s' % unit)
def to_gauss_min(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'A/s':
return float(value) * self.A_to_G * 60.0
if unit == 'T/s':
return float(value) * 10000 * 60
if unit == 'T/min':
return float(value) * 10000
if unit == 'T/h':
return float(value) * 10000 / 60.0
raise HardwareError('received unknown unit: %s' % unit)
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('<CH>_Field', self.to_gauss),
_status_text=('<CH>_Status', str),
_ready_text=('<CH>_Ready', str),
_error_text=('<CH>_Error', self.cvt_error),
_rate_units=('<CH>_Rate Units', str),
current=('<CH>_PSU Output', self.to_gauss),
voltage='<CH>_Voltage',
working_ramp=('<CH>_Ramp Rate', self.to_gauss_min),
setpoint=('<CH>_Setpoint', self.to_gauss),
switch_heater=('<CH>_Heater', self.cvt_switch_heater),
mode=('<CH>_Persistent Mode', self.cvt_mode),
approach_mode=('<CH>_Approach', self.cvt_approach_mode),
)
def cvt_error(self, text):
if text != self._last_error:
self._last_error = text
return text
return self._error_text
def trigger_update(self):
# called after treating result of GetAll message
if self._error_text:
status = ERROR, '%s while %s' % (self._error_text, self._status_text)
elif self._ready_text == 'TRUE':
with self.accessLock: # must not be in parallel with write_target
target = self._next_target
if target is not None: # target change pending
if target == self._last_target and abs(self.value - target) <= self.tolerance:
# we are already there
self._last_target = None
status = IDLE, self._status_text
else:
if self.hw_units == 'A':
self.sendcmd('Set:<CH>:Sweep %gA' % (target / self.A_to_G))
else:
self.sendcmd('Set:<CH>:Sweep %gT' % (target / 10000))
self._next_target = None
self._last_target = target
status = BUSY, 'changed target'
else:
status = IDLE, self._status_text
elif self._next_target is None:
txt = self._status_text
if txt.startswith('Ramping Magnet'):
if txt.endswith(': ') or ' 1 seconds' in txt:
txt = 'stabilizing'
else:
txt = 'ramping'
status = BUSY, txt
else:
return # do not change status when aborting
self.status = status
value = Parameter('magnetic field in the coil', FloatRange(unit='G'))
setpoint = Parameter('setpoint', FloatRange(unit='G'), default=0)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), default=0, readonly=False)
def write_ramp(self, ramp):
if self._rate_units != 'A/s':
self.sendcmd('Set:<CH>:ChangeRateUnits A/s')
self.sendcmd('Set:<CH>:SetRate %g' % (ramp / self.A_to_G / 60.0))
return ramp
def write_target(self, target):
self.reset_error()
self.check_limits(target)
self.write_ramp(self.ramp)
if self.approach_mode == self.approach_mode.OVERSHOOT:
o = self.overshoot['o']
if (target - self.value) * o < 0:
self.write_overshoot(dict(self.overshoot, o=-o))
self.block('_error_text', '')
if self._ready_text == 'FALSE':
if target != self._last_target or abs(self.value - target) > self.tolerance:
self.status = BUSY, 'aborting'
self.sendcmd('Set:<CH>:Abort')
self._next_target = target
else:
self._next_target = target
self.trigger_update() # update status
return target
working_ramp = Parameter('actual ramp rate', FloatRange(0, unit='$/min'))
Mode = Enum(
# DISABLED=0,
PERSISTENT=30,
SEMIPERSISTENT=31,
DRIVEN=50,
)
mode = Parameter('persistent mode', EnumType(Mode), readonly=False, default=30)
mode_map = Mapped(DRIVEN=0, PERSISTENT=1, SEMIPERSISTENT=2)
def write_mode(self, value):
code = self.mode_map(value)
self.sendcmd('Set:<CH>:SetPM %d' % code)
self.block('mode')
return value
@staticmethod
def cvt_mode(text):
text = text.lower()
if 'off' in text:
if '0' in text:
return 30
return 31
return 50
ApproachMode = Enum(
DIRECT=0,
OVERSHOOT=1,
CYCLING=2,
DEGAUSSING=3,
)
approach_mode = Parameter('approach mode', EnumType(ApproachMode), readonly=False,
group='approach_settings', default=0)
def write_approach_mode(self, value):
self.sendcmd('Set:<CH>:SetApproach %d' % value)
self.block('approach_mode')
return value
@classmethod
def cvt_approach_mode(cls, text):
return cls.ApproachMode(text.upper())
overshoot = Parameter('overshoot [%] and hold time [s]',
StructOf(o=FloatRange(-100, 100, unit='%'), t=FloatRange(0, unit='s')),
readonly=False, default=dict(o=0, t=0),
group='approach_settings')
def write_overshoot(self, value):
self.sendcmd('Set:<CH>:SetOvershoot %g,%g' % (value['o'], value['t']))
return value
cycle = Parameter('start value, damping factor, final value, hold time',
StructOf(s=FloatRange(-100, 100, unit='%'),
d=FloatRange(0, 100, unit='%'),
a=FloatRange(0, 100, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, a=0, t=0),
group='approach_settings')
def write_cycle(self, value):
self.sendcmd('Set:<CH>:SetCycling %g,%g,%g,%g' %
(value['s'], value['d'], value['a'] * 1e-4, value['t']))
return value
degauss = Parameter('start value [G], damping factor [%], accuracy [G], hold time [s]',
StructOf(s=FloatRange(-10000, 10000, unit='G'),
d=FloatRange(0, 100, unit='%'),
f=FloatRange(0, 10000, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, f=0, t=0),
group='approach_settings')
def write_degauss(self, value):
self.sendcmd('Set:<CH>:SetDegaussing %g,%g,%g,%g' %
(value['s'] * 1e-4, value['d'], value['f'] * 1e-4, value['t']))
return value
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
voltage = Parameter(
'voltage over leads', FloatRange(unit='V'))
switch_heater = Parameter(
'voltage over leads', EnumType(OFF=0, ON=1))
@staticmethod
def cvt_switch_heater(text):
return 'ON' in text
@Command()
def stop(self):
self.sendcmd('Set:<CH>:Abort')
@Command()
def reset_quench(self):
"""reset quench condition"""
self.sendcmd('Set:<CH>:ResetQuench')
@Command()
def reset_error(self):
"""reset error"""
self._error_text = ''
class MainMagfield(Magfield):
checked_modules = None
def earlyInit(self):
super().earlyInit()
self.checked_modules = []
# TODO: turn into a property
constraint = Parameter('product check', FloatRange(unit='G^2'), default=80000)
def check_combined(self, obj, value, main_target):
sumvalue2 = sum((max(o.value ** 2, value ** 2 if o == obj else 0)
for o in self.checked_modules))
if sumvalue2 * max(self.value ** 2, main_target) > self.constraint ** 2:
raise BadValueError('outside constraint (B * Bxyz > %g G^2' * self.constraint)
def check_limits(self, value):
super().check_limits(value)
self.check_combined(None, 0, value)
class ComponentField(Magfield):
check_against = Attached(MainMagfield)
def initModule(self):
super().initModule()
self.check_against.checked_modules.append(self)
def check_limits(self, value):
super().check_limits(value)
self.check_against.check_combined(self, value, 0)
class Compressor(Channel, Drivable):
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('Compressor<CH>_Status', self.cvt_value),
_ready_text=('Compressor<CH>_Ready', str),
_error_text=('Compressor<CH>_Error', str),
run_time='Compressor<CH>_RunTime',
)
# TODO: what is Compressor_Error? (without A or B)
value = Parameter('compressor switch', EnumType(OFF=0, ON=1))
run_time = Parameter('run time', FloatRange(0, unit='h'))
_status_text = ''
_ready_text = ''
_error_text = ''
def cvt_value(self, text):
self._status_text = text
value = text == 'Running'
if time.time() > self.block_until.get('target', 0):
self.target = value
return value
def read_value(self):
return self._status_text == 'Running'
def read_status(self):
if self.target != self.value:
return BUSY, 'switching %s' % self.target.name
# TODO: find out possible status texts
if self._ready_text == 'TRUE':
return IDLE, 'ready'
if self._error_text:
return ERROR, self._error_text
return IDLE, self._status_text
target = Parameter('compressor switch', EnumType(OFF=0, ON=1))
def write_target(self, value):
if value:
self.sendcmd('SetCompressor:Start <CH>')
else:
self.sendcmd('SetCompressor:Stop <CH>')
self.block('target')
self.read_status()
return value
@Command()
def reset_error(self):
"""reset error"""
self.sendcmd('Set:Compressor:Reset <CH>')
self._error_text = ''