add flamemag / flamedil config

This commit is contained in:
l_samenv 2022-11-30 14:14:47 +01:00
parent ef9d89993c
commit 3ab7eb99ab
3 changed files with 824 additions and 0 deletions

166
cfg/main/flamemag.cfg Normal file
View File

@ -0,0 +1,166 @@
[cio]
class = secop_psi.cryoltd.IO
description = 'IO to cryo ltd software'
uri = tcp://flamedil:3128
[main]
class = secop_psi.cryoltd.Main
description = master module
io= cio
[B]
description = 'magnetic field'
class = secop_psi.cryoltd.MainMagfield
channel = Main
constraint = 80000
target.max = 35000
hw_units = T
# A_to_G is needed for ramp rate
A_to_G = 285.73
ramp.max = 412
overshoot = {'o': 1, 't': 180}
degauss = {'s': 500, 'd': 30, 'f': 5, 't': 120}
tolerance = 5
[Bx]
description = 'magnetic field x component'
class = secop_psi.cryoltd.ComponentField
channel = VMX
check_against = B
target.max = 200
hw_units = A
A_to_G = 4.134
ramp.max = 23
tolerance = 1
[By]
description = 'magnetic field y component'
class = secop_psi.cryoltd.ComponentField
channel = VMY
check_against = B
target.max = 100
hw_units = A
A_to_G = 4.1117
ramp.max = 22.9
tolerance = 1
[Bz]
description = 'magnetic field z component'
class = secop_psi.cryoltd.ComponentField
channel = VMZ
check_against = B
target.max = 100
hw_units = A
A_to_G = 5.74
ramp.max = 33.6
tolerance = 1
[compressorA]
description = 'compressor A'
class = secop_psi.cryoltd.Compressor
channel = A
[compressorB]
description = 'compressor B'
class = secop_psi.cryoltd.Compressor
channel = B
[T_stage1_A]
class = secop_psi.cryoltd.Temperature
channel = 1st Stage A
main = main
[T_stage2_A]
class = secop_psi.cryoltd.Temperature
channel = 2nd Stage A
main = main
[T_stage1_B]
class = secop_psi.cryoltd.Temperature
channel = 1st Stage B
main = main
[T_stage2_B]
class = secop_psi.cryoltd.Temperature
channel = 2nd Stage B
main = main
[T_top_A]
class = secop_psi.cryoltd.Temperature
channel = Inner Magnet A (Top)
main = main
[T_bottom_A]
class = secop_psi.cryoltd.Temperature
channel = Inner Magnet A (Bottom)
main = main
[T_top_B]
class = secop_psi.cryoltd.Temperature
channel = Inner Magnet B (Top)
main = main
[T_bottom_B]
class = secop_psi.cryoltd.Temperature
channel = Inner Magnet B (Bottom)
main = main
[T_Z_shim]
class = secop_psi.cryoltd.Temperature
channel = Z Shim Former
main = main
[T_XY_shim]
class = secop_psi.cryoltd.Temperature
channel = XY Shim Former
main = main
[T_XY_vector]
class = secop_psi.cryoltd.Temperature
channel = XY Vector Former
main = main
[T_radiation_shield]
class = secop_psi.cryoltd.Temperature
channel = Radiation Shield
main = main
[T_persistent_joints]
class = secop_psi.cryoltd.Temperature
channel = Persistent Joints
main = main
[T_outer_A]
class = secop_psi.cryoltd.Temperature
channel = Outer Magnet A
main = main
[T_outer_B]
class = secop_psi.cryoltd.Temperature
channel = Outer Magnet B
main = main
[T_shim_B]
class = secop_psi.cryoltd.Temperature
channel = Z Shim Former B
main = main
[T_bore_shield]
class = secop_psi.cryoltd.Temperature
channel = Bore Radiation Shield
main = main
[T_XYZ_shim]
class = secop_psi.cryoltd.Temperature
channel = XYZ Shim Plate
main = main
[T_Z_shim_switch]
class = secop_psi.cryoltd.Temperature
channel = Z Shim Switch
main = main
[T_main_switch]
class = secop_psi.cryoltd.Temperature
channel = Main Coil Switch
main = main

146
cfg/stick/flamedil.cfg Normal file
View File

@ -0,0 +1,146 @@
[NODE]
id = triton.psi.ch
description = triton test
[INTERFACE]
uri = tcp://5000
[T_mix]
class = secop_psi.triton.TemperatureLoop
description = mix. chamber temperature
slot = T5
output_module = htr_mix
io = triton
[T_sorb]
class = secop_psi.triton.TemperatureSensor
description = sorb temperature
slot = T1
io = triton
[T_still]
class = secop_psi.triton.TemperatureSensor
description = still temperature
slot = T4
io = triton
[T_hx]
class = secop_psi.triton.TemperatureSensor
description = heat exchanger T
slot = T2
io = triton
[T_jtstage]
class = secop_psi.triton.TemperatureSensor
description = jt stage temperature
slot = T3
io = triton
[htr_mix]
class = secop_psi.triton.HeaterOutputWithRange
description = mix. chamber heater
slot = H1,T5
io = triton
[htr_sorb]
class = secop_psi.triton.HeaterOutput
description = sorb heater
slot = H3
io = triton
[htr_still]
class = secop_psi.triton.HeaterOutput
description = still heater
slot = H2
io = triton
[action]
class = secop_psi.triton.Action
description = higher level scripts
io = triton
slot = DR
[p_dump]
class = secop_psi.mercury.PressureSensor
description = dump pressure
slot = P1
io = triton
[p_cond]
class = secop_psi.mercury.PressureSensor
description = condenser pressure
slot = P2
io = triton
[p_still]
class = secop_psi.mercury.PressureSensor
description = still pressure
slot = P3
io = triton
#[p_fore]
#class = secop_psi.mercury.PressureSensor
#description = pressure on the pump side
#slot = P4
#io = triton
[p_back]
class = secop_psi.mercury.PressureSensor
description = pressure on the back side of the pump
slot = P5
io = triton
[V1]
class = secop_psi.triton.Valve
description = valve V1
slot = V1
io = triton
[V2]
class = secop_psi.triton.Valve
description = valve V2
slot = V2
io = triton
[V4]
class = secop_psi.triton.Valve
description = valve V4
slot = V4
io = triton
[V5]
class = secop_psi.triton.Valve
description = valve V5
slot = V5
io = triton
[V9]
class = secop_psi.triton.Valve
description = valve V9
slot = V9
io = triton
#[turbo]
#class = secop_psi.triton.TurboPump
#description = still turbo pump
#slot = TURB1
#io = triton
[fp]
class = secop_psi.triton.Pump
description = still fore pump
slot = FP
io = triton
[compressor]
class = secop_psi.triton.Pump
description = compressor
slot = COMP
io = triton
[triton]
class = secop_psi.mercury.IO
description = connection to triton software
uri = tcp://flamedil:33576
timeout = 9

512
secop_psi/cryoltd.py Normal file
View File

@ -0,0 +1,512 @@
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""flame magnet using cryogenic limited software suite
Remarks: for reading, we use the GetAll command, which is quite fast
(3 ms response time). However, as in many such sloppy programmed software
the timing is badly handled, as after changing parameters, the readback values
are not yet up to date. We use the array block_until for this purpose: any value
changed from the client is fixed for at least 10 seconds.
"""
import re
import time
from secop.core import HasIO, StringIO, Readable, Drivable, Parameter, Command, \
Module, Property, Attached, Enum, IDLE, BUSY, ERROR
from secop.features import HasLimits
from secop.errors import ConfigError, BadValueError, HardwareError
from secop.datatypes import FloatRange, StringType, EnumType, StructOf, OrType
# floating point value followed with unit
VALUE_UNIT = re.compile(r'([-0-9.E]*\d|inf)([A-Za-z/%]*)$')
def as_float(value):
"""converts string (with unit) to float"""
return float(VALUE_UNIT.match(value).group(1))
BOOL_MAP = {'TRUE': True, 'FALSE': False}
def as_bool(value):
return BOOL_MAP[value]
def as_string(value):
return value
class Mapped:
def __init__(self, **kwds):
self.mapping = kwds
self.mapping.update({v: k for k, v in kwds.items()})
def __call__(self, value):
return self.mapping[value]
class IO(StringIO):
identification = [('Get:Remote:Remote_Ready', 'Remote_Ready RECEIVED: TRUE')]
end_of_line = (b'\n', b'\r\n')
timeout = 15
class Main(HasIO, Module):
pollinterval = Parameter('main poll interval', FloatRange(0.001), default=1)
export = False
params_map = None
triggers = None
initpolls = None
ioClass = IO
def register_module(self, obj, **params):
"""register a channel
:param obj: the remote object
:param **params:
pname=<Key in GetAll> or pname=(<Key in Getall>, convert function)
the convert function is as_float by default
"""
if self.params_map is None:
self.params_map = {}
self.triggers = set()
obj.block_until = {}
if hasattr(obj, 'trigger_update'):
self.triggers.add(obj.trigger_update)
for pname, value in params.items():
if isinstance(value, str):
key, cvt = value, as_float
else:
key, cvt = value
self.params_map[key.replace('<CH>', obj.channel)] = obj, pname, cvt
def doPoll(self):
# format of reply:
# "key1:value1;key2:value2;"
reply = self.communicate('GetAll').split('RECEIVED ', 1)[1].split(';')
missing = None, None, None
for line in reply:
try:
key, value = line.split(':', 1)
except ValueError: # missing ':'
if line:
pass
# ignore multiline values
# if needed, we may collect here and treat with a special key
continue
obj, pname, cvt = self.params_map.get(key, missing)
if obj:
if not hasattr(obj, pname):
raise ConfigError('%s has no attribute %s' % (obj.name, pname))
if time.time() > obj.block_until.get(pname, 0):
setattr(obj, pname, cvt(value))
for trigger in self.triggers: # do trigger after updates
trigger()
def communicate(self, cmd):
return self.io.communicate(cmd)
class Channel:
main = Attached(Main, default='main')
channel = Property('channel name', StringType())
pollinterval = Parameter(export=False)
block_until = None
def sendcmd(self, cmd):
cmd = cmd.replace('<CH>', self.channel)
reply = self.main.communicate(cmd)
if not reply.startswith(cmd):
print('MISMATCH', cmd, reply)
def block(self, pname, value=None):
self.block_until[pname] = time.time() + 10
if value is not None:
setattr(self, pname, value)
def doPoll(self):
"""poll is done by main module"""
class Temperature(Channel, Readable):
channel = Property('channel name as in reply to GetAll', StringType())
value = Parameter('T sensor value', FloatRange(0, unit='K'))
description = '' # by default take description from channel name
KEYS = {
'1st Stage A',
'2nd Stage A',
'1st Stage B',
'2nd Stage B',
'Inner Magnet A (Top)',
'Inner Magnet A (Bottom)',
'Z Shim Former',
'XY Shim Former',
'XY Vector Former',
'Radiation Shield',
'Persistent Joints',
'Outer Magnet A',
'Inner Magnet B (Top)',
'Inner Magnet B (Bottom)',
'Z Shim Former B',
'Outer Magnet B',
'Bore Radiation Shield',
'XYZ Shim Plate',
'Z Shim Switch',
'Main Coil Switch',
}
def initModule(self):
super().initModule()
if not self.description:
self.description = self.channel
if self.channel not in self.KEYS:
raise ConfigError('channel (%s) must be one of %r' % (self.channel, self.KEYS))
self.main.register_module(self, value=self.channel)
class Magfield(HasLimits, Channel, Drivable):
_status_text = ''
_ready_text = ''
_error_text = ''
_last_error = ''
_rate_units = ''
_next_target = None
_last_target = None
hw_units = Property('hardware units: A or T', EnumType(A=0, T=1))
A_to_G = Property('A_to_G = "Gauss Value / Amps Value"', FloatRange(0))
tolerance = Parameter('tolerance', FloatRange(0), readonly=False, default=1)
def earlyInit(self):
super().earlyInit()
dt = self.parameters['target'].datatype
if dt.min < 1e-99: # make limits symmetric
dt.min = - dt.max
min_, max_ = self.target_limits
self.target_limits = [max(min_, dt.min), max_]
dt = self.parameters['ramp'].datatype
if self.ramp == 0: # unconfigured: take max.
self.ramp = dt.max
def to_gauss(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'T':
return float(value) * 10000
if unit == 'A':
return float(value) * self.A_to_G
raise HardwareError('received unknown unit: %s' % unit)
def to_gauss_min(self, value):
value, unit = VALUE_UNIT.match(value).groups()
if unit == 'A/s':
return float(value) * self.A_to_G * 60.0
if unit == 'T/s':
return float(value) * 10000 * 60
if unit == 'T/min':
return float(value) * 10000
if unit == 'T/h':
return float(value) * 10000 / 60.0
raise HardwareError('received unknown unit: %s' % unit)
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('<CH>_Field', self.to_gauss),
_status_text=('<CH>_Status', str),
_ready_text=('<CH>_Ready', str),
_error_text=('<CH>_Error', self.cvt_error),
_rate_units=('<CH>_Rate Units', str),
current=('<CH>_PSU Output', self.to_gauss),
voltage='<CH>_Voltage',
working_ramp=('<CH>_Ramp Rate', self.to_gauss_min),
setpoint=('<CH>_Setpoint', self.to_gauss),
switch_heater=('<CH>_Heater', self.cvt_switch_heater),
mode=('<CH>_Persistent Mode', self.cvt_mode),
approach_mode=('<CH>_Approach', self.cvt_approach_mode),
)
def cvt_error(self, text):
if text != self._last_error:
self._last_error = text
return text
return self._error_text
def trigger_update(self):
# called after treating result of GetAll message
if self._error_text:
status = ERROR, '%s while %s' % (self._error_text, self._status_text)
elif self._ready_text == 'TRUE':
with self.accessLock: # must not be in parallel with write_target
target = self._next_target
if target is not None: # target change pending
if target == self._last_target and abs(self.value - target) <= self.tolerance:
# we are already there
self._last_target = None
status = IDLE, self._status_text
else:
if self.hw_units == 'A':
self.sendcmd('Set:<CH>:Sweep %gA' % (target / self.A_to_G))
else:
self.sendcmd('Set:<CH>:Sweep %gT' % (target / 10000))
self._next_target = None
self._last_target = target
status = BUSY, 'changed target'
else:
status = IDLE, self._status_text
elif self._next_target is None:
txt = self._status_text
if txt.startswith('Ramping Magnet'):
if txt.endswith(': ') or ' 1 seconds' in txt:
txt = 'stabilizing'
else:
txt = 'ramping'
status = BUSY, txt
else:
return # do not change status when aborting
self.status = status
value = Parameter('magnetic field in the coil', FloatRange(unit='G'))
setpoint = Parameter('setpoint', FloatRange(unit='G'), default=0)
ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), default=0, readonly=False)
def write_ramp(self, ramp):
if self._rate_units != 'A/s':
self.sendcmd('Set:<CH>:ChangeRateUnits A/s')
self.sendcmd('Set:<CH>:SetRate %g' % (ramp / self.A_to_G / 60.0))
return ramp
def write_target(self, target):
self.reset_error()
self.check_limits(target)
self.write_ramp(self.ramp)
if self.approach_mode == self.approach_mode.OVERSHOOT:
o = self.overshoot['o']
if (target - self.value) * o < 0:
self.write_overshoot(dict(self.overshoot, o=-o))
self.block('_error_text', '')
if self._ready_text == 'FALSE':
if target != self._last_target or abs(self.value - target) > self.tolerance:
self.status = BUSY, 'aborting'
self.sendcmd('Set:<CH>:Abort')
self._next_target = target
else:
self._next_target = target
self.trigger_update() # update status
return target
working_ramp = Parameter('actual ramp rate', FloatRange(0, unit='$/min'))
Mode = Enum(
# DISABLED=0,
PERSISTENT=30,
SEMIPERSISTENT=31,
DRIVEN=50,
)
mode = Parameter('persistent mode', EnumType(Mode), readonly=False, default=30)
mode_map = Mapped(DRIVEN=0, PERSISTENT=1, SEMIPERSISTENT=2)
def write_mode(self, value):
code = self.mode_map(value)
self.sendcmd('Set:<CH>:SetPM %d' % code)
self.block('mode')
return value
@staticmethod
def cvt_mode(text):
text = text.lower()
if 'off' in text:
if '0' in text:
return 30
return 31
return 50
ApproachMode = Enum(
DIRECT=0,
OVERSHOOT=1,
CYCLING=2,
DEGAUSSING=3,
)
approach_mode = Parameter('approach mode', EnumType(ApproachMode), readonly=False,
group='approach_settings', default=0)
def write_approach_mode(self, value):
self.sendcmd('Set:<CH>:SetApproach %d' % value)
self.block('approach_mode')
return value
@classmethod
def cvt_approach_mode(cls, text):
return cls.ApproachMode(text.upper())
overshoot = Parameter('overshoot [%] and hold time [s]',
StructOf(o=FloatRange(-100, 100, unit='%'), t=FloatRange(0, unit='s')),
readonly=False, default=dict(o=0, t=0),
group='approach_settings')
def write_overshoot(self, value):
self.sendcmd('Set:<CH>:SetOvershoot %g,%g' % (value['o'], value['t']))
return value
cycle = Parameter('start value, damping factor, final value, hold time',
StructOf(s=FloatRange(-100, 100, unit='%'),
d=FloatRange(0, 100, unit='%'),
a=FloatRange(0, 100, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, a=0, t=0),
group='approach_settings')
def write_cycle(self, value):
self.sendcmd('Set:<CH>:SetCycling %g,%g,%g,%g' %
(value['s'], value['d'], value['a'] * 1e-4, value['t']))
return value
degauss = Parameter('start value [G], damping factor [%], accuracy [G], hold time [s]',
StructOf(s=FloatRange(-10000, 10000, unit='G'),
d=FloatRange(0, 100, unit='%'),
f=FloatRange(0, 10000, unit='G'),
t=FloatRange(0, unit='s')),
readonly=False, default=dict(s=0, d=0, f=0, t=0),
group='approach_settings')
def write_degauss(self, value):
self.sendcmd('Set:<CH>:SetDegaussing %g,%g,%g,%g' %
(value['s'] * 1e-4, value['d'], value['f'] * 1e-4, value['t']))
return value
current = Parameter(
'leads current (in units of field)', FloatRange(unit='$'))
voltage = Parameter(
'voltage over leads', FloatRange(unit='V'))
switch_heater = Parameter(
'voltage over leads', EnumType(OFF=0, ON=1))
@staticmethod
def cvt_switch_heater(text):
return 'ON' in text
@Command()
def stop(self):
self.sendcmd('Set:<CH>:Abort')
@Command()
def reset_quench(self):
"""reset quench condition"""
self.sendcmd('Set:<CH>:ResetQuench')
@Command()
def reset_error(self):
"""reset error"""
self._error_text = ''
class MainMagfield(Magfield):
checked_modules = None
def earlyInit(self):
super().earlyInit()
self.checked_modules = []
# TODO: turn into a property
constraint = Parameter('product check', FloatRange(unit='G^2'), default=80000)
def check_combined(self, obj, value, main_target):
sumvalue2 = sum((max(o.value ** 2, value ** 2 if o == obj else 0)
for o in self.checked_modules))
if sumvalue2 * max(self.value ** 2, main_target) > self.constraint ** 2:
raise BadValueError('outside constraint (B * Bxyz > %g G^2' * self.constraint)
def check_limits(self, value):
super().check_limits(value)
self.check_combined(None, 0, value)
class ComponentField(Magfield):
check_against = Attached(MainMagfield)
def initModule(self):
super().initModule()
self.check_against.checked_modules.append(self)
def check_limits(self, value):
super().check_limits(value)
self.check_against.check_combined(self, value, 0)
class Compressor(Channel, Drivable):
def initModule(self):
super().initModule()
self.main.register_module(
self,
value=('Compressor<CH>_Status', self.cvt_value),
_ready_text=('Compressor<CH>_Ready', str),
_error_text=('Compressor<CH>_Error', str),
run_time='Compressor<CH>_RunTime',
)
# TODO: what is Compressor_Error? (without A or B)
value = Parameter('compressor switch', EnumType(OFF=0, ON=1))
run_time = Parameter('run time', FloatRange(0, unit='h'))
_status_text = ''
_ready_text = ''
_error_text = ''
def cvt_value(self, text):
self._status_text = text
value = text == 'Running'
if time.time() > self.block_until.get('target', 0):
self.target = value
return value
def read_value(self):
return self._status_text == 'Running'
def read_status(self):
if self.target != self.value:
return BUSY, 'switching %s' % self.target.name
# TODO: find out possible status texts
if self._ready_text == 'TRUE':
return IDLE, 'ready'
if self._error_text:
return ERROR, self._error_text
return IDLE, self._status_text
target = Parameter('compressor switch', EnumType(OFF=0, ON=1))
def write_target(self, value):
if value:
self.sendcmd('SetCompressor:Start <CH>')
else:
self.sendcmd('SetCompressor:Stop <CH>')
self.block('target')
self.read_status()
return value
@Command()
def reset_error(self):
"""reset error"""
self.sendcmd('Set:Compressor:Reset <CH>')
self._error_text = ''