# -*- coding: utf-8 -*- # ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """flame magnet using cryogenic limited software suite Remarks: for reading, we use the GetAll command, which is quite fast (3 ms response time). However, as in many such sloppy programmed software the timing is badly handled, as after changing parameters, the readback values are not yet up to date. We use the array block_until for this purpose: any value changed from the client is fixed for at least 10 seconds. """ import re import time from secop.core import HasIO, StringIO, Readable, Drivable, Parameter, Command, \ Module, Property, Attached, Enum, IDLE, BUSY, ERROR from secop.features import HasLimits from secop.errors import ConfigError, BadValueError, HardwareError from secop.datatypes import FloatRange, StringType, EnumType, StructOf, OrType # floating point value followed with unit VALUE_UNIT = re.compile(r'([-0-9.E]*\d|inf)([A-Za-z/%]*)$') def as_float(value): """converts string (with unit) to float""" return float(VALUE_UNIT.match(value).group(1)) BOOL_MAP = {'TRUE': True, 'FALSE': False} def as_bool(value): return BOOL_MAP[value] def as_string(value): return value class Mapped: def __init__(self, **kwds): self.mapping = kwds self.mapping.update({v: k for k, v in kwds.items()}) def __call__(self, value): return self.mapping[value] class IO(StringIO): identification = [('Get:Remote:Remote_Ready', 'Remote_Ready RECEIVED: TRUE')] end_of_line = (b'\n', b'\r\n') timeout = 15 class Main(HasIO, Module): pollinterval = Parameter('main poll interval', FloatRange(0.001), default=1) export = False params_map = None triggers = None initpolls = None ioClass = IO def register_module(self, obj, **params): """register a channel :param obj: the remote object :param **params: pname= or pname=(, convert function) the convert function is as_float by default """ if self.params_map is None: self.params_map = {} self.triggers = set() obj.block_until = {} if hasattr(obj, 'trigger_update'): self.triggers.add(obj.trigger_update) for pname, value in params.items(): if isinstance(value, str): key, cvt = value, as_float else: key, cvt = value self.params_map[key.replace('', obj.channel)] = obj, pname, cvt def doPoll(self): # format of reply: # "key1:value1;key2:value2;" reply = self.communicate('GetAll').split('RECEIVED ', 1)[1].split(';') missing = None, None, None for line in reply: try: key, value = line.split(':', 1) except ValueError: # missing ':' if line: pass # ignore multiline values # if needed, we may collect here and treat with a special key continue obj, pname, cvt = self.params_map.get(key, missing) if obj: if not hasattr(obj, pname): raise ConfigError('%s has no attribute %s' % (obj.name, pname)) if time.time() > obj.block_until.get(pname, 0): setattr(obj, pname, cvt(value)) for trigger in self.triggers: # do trigger after updates trigger() def communicate(self, cmd): return self.io.communicate(cmd) class Channel: main = Attached(Main, default='main') channel = Property('channel name', StringType()) pollinterval = Parameter(export=False) block_until = None def sendcmd(self, cmd): cmd = cmd.replace('', self.channel) reply = self.main.communicate(cmd) if not reply.startswith(cmd): print('MISMATCH', cmd, reply) def block(self, pname, value=None): self.block_until[pname] = time.time() + 10 if value is not None: setattr(self, pname, value) def doPoll(self): """poll is done by main module""" class Temperature(Channel, Readable): channel = Property('channel name as in reply to GetAll', StringType()) value = Parameter('T sensor value', FloatRange(0, unit='K')) description = '' # by default take description from channel name KEYS = { '1st Stage A', '2nd Stage A', '1st Stage B', '2nd Stage B', 'Inner Magnet A (Top)', 'Inner Magnet A (Bottom)', 'Z Shim Former', 'XY Shim Former', 'XY Vector Former', 'Radiation Shield', 'Persistent Joints', 'Outer Magnet A', 'Inner Magnet B (Top)', 'Inner Magnet B (Bottom)', 'Z Shim Former B', 'Outer Magnet B', 'Bore Radiation Shield', 'XYZ Shim Plate', 'Z Shim Switch', 'Main Coil Switch', } def initModule(self): super().initModule() if not self.description: self.description = self.channel if self.channel not in self.KEYS: raise ConfigError('channel (%s) must be one of %r' % (self.channel, self.KEYS)) self.main.register_module(self, value=self.channel) class Magfield(HasLimits, Channel, Drivable): _status_text = '' _ready_text = '' _error_text = '' _last_error = '' _rate_units = '' _next_target = None _last_target = None hw_units = Property('hardware units: A or T', EnumType(A=0, T=1)) A_to_G = Property('A_to_G = "Gauss Value / Amps Value"', FloatRange(0)) tolerance = Parameter('tolerance', FloatRange(0), readonly=False, default=1) def earlyInit(self): super().earlyInit() dt = self.parameters['target'].datatype if dt.min < 1e-99: # make limits symmetric dt.min = - dt.max min_, max_ = self.target_limits self.target_limits = [max(min_, dt.min), max_] dt = self.parameters['ramp'].datatype if self.ramp == 0: # unconfigured: take max. self.ramp = dt.max def to_gauss(self, value): value, unit = VALUE_UNIT.match(value).groups() if unit == 'T': return float(value) * 10000 if unit == 'A': return float(value) * self.A_to_G raise HardwareError('received unknown unit: %s' % unit) def to_gauss_min(self, value): value, unit = VALUE_UNIT.match(value).groups() if unit == 'A/s': return float(value) * self.A_to_G * 60.0 if unit == 'T/s': return float(value) * 10000 * 60 if unit == 'T/min': return float(value) * 10000 if unit == 'T/h': return float(value) * 10000 / 60.0 raise HardwareError('received unknown unit: %s' % unit) def initModule(self): super().initModule() self.main.register_module( self, value=('_Field', self.to_gauss), _status_text=('_Status', str), _ready_text=('_Ready', str), _error_text=('_Error', self.cvt_error), _rate_units=('_Rate Units', str), current=('_PSU Output', self.to_gauss), voltage='_Voltage', working_ramp=('_Ramp Rate', self.to_gauss_min), setpoint=('_Setpoint', self.to_gauss), switch_heater=('_Heater', self.cvt_switch_heater), mode=('_Persistent Mode', self.cvt_mode), approach_mode=('_Approach', self.cvt_approach_mode), ) def cvt_error(self, text): if text != self._last_error: self._last_error = text return text return self._error_text def trigger_update(self): # called after treating result of GetAll message if self._error_text: status = ERROR, '%s while %s' % (self._error_text, self._status_text) elif self._ready_text == 'TRUE': with self.accessLock: # must not be in parallel with write_target target = self._next_target if target is not None: # target change pending if target == self._last_target and abs(self.value - target) <= self.tolerance: # we are already there self._last_target = None status = IDLE, self._status_text else: if self.hw_units == 'A': self.sendcmd('Set::Sweep %gA' % (target / self.A_to_G)) else: self.sendcmd('Set::Sweep %gT' % (target / 10000)) self._next_target = None self._last_target = target status = BUSY, 'changed target' else: status = IDLE, self._status_text elif self._next_target is None: txt = self._status_text if txt.startswith('Ramping Magnet'): if txt.endswith(': ') or ' 1 seconds' in txt: txt = 'stabilizing' else: txt = 'ramping' status = BUSY, txt else: return # do not change status when aborting self.status = status value = Parameter('magnetic field in the coil', FloatRange(unit='G')) setpoint = Parameter('setpoint', FloatRange(unit='G'), default=0) ramp = Parameter('ramp rate', FloatRange(0, unit='$/min'), default=0, readonly=False) def write_ramp(self, ramp): if self._rate_units != 'A/s': self.sendcmd('Set::ChangeRateUnits A/s') self.sendcmd('Set::SetRate %g' % (ramp / self.A_to_G / 60.0)) return ramp def write_target(self, target): self.reset_error() self.check_limits(target) self.write_ramp(self.ramp) if self.approach_mode == self.approach_mode.OVERSHOOT: o = self.overshoot['o'] if (target - self.value) * o < 0: self.write_overshoot(dict(self.overshoot, o=-o)) self.block('_error_text', '') if self._ready_text == 'FALSE': if target != self._last_target or abs(self.value - target) > self.tolerance: self.status = BUSY, 'aborting' self.sendcmd('Set::Abort') self._next_target = target else: self._next_target = target self.trigger_update() # update status return target working_ramp = Parameter('actual ramp rate', FloatRange(0, unit='$/min')) Mode = Enum( # DISABLED=0, PERSISTENT=30, SEMIPERSISTENT=31, DRIVEN=50, ) mode = Parameter('persistent mode', EnumType(Mode), readonly=False, default=30) mode_map = Mapped(DRIVEN=0, PERSISTENT=1, SEMIPERSISTENT=2) def write_mode(self, value): code = self.mode_map(value) self.sendcmd('Set::SetPM %d' % code) self.block('mode') return value @staticmethod def cvt_mode(text): text = text.lower() if 'off' in text: if '0' in text: return 30 return 31 return 50 ApproachMode = Enum( DIRECT=0, OVERSHOOT=1, CYCLING=2, DEGAUSSING=3, ) approach_mode = Parameter('approach mode', EnumType(ApproachMode), readonly=False, group='approach_settings', default=0) def write_approach_mode(self, value): self.sendcmd('Set::SetApproach %d' % value) self.block('approach_mode') return value @classmethod def cvt_approach_mode(cls, text): return cls.ApproachMode(text.upper()) overshoot = Parameter('overshoot [%] and hold time [s]', StructOf(o=FloatRange(-100, 100, unit='%'), t=FloatRange(0, unit='s')), readonly=False, default=dict(o=0, t=0), group='approach_settings') def write_overshoot(self, value): self.sendcmd('Set::SetOvershoot %g,%g' % (value['o'], value['t'])) return value cycle = Parameter('start value, damping factor, final value, hold time', StructOf(s=FloatRange(-100, 100, unit='%'), d=FloatRange(0, 100, unit='%'), a=FloatRange(0, 100, unit='G'), t=FloatRange(0, unit='s')), readonly=False, default=dict(s=0, d=0, a=0, t=0), group='approach_settings') def write_cycle(self, value): self.sendcmd('Set::SetCycling %g,%g,%g,%g' % (value['s'], value['d'], value['a'] * 1e-4, value['t'])) return value degauss = Parameter('start value [G], damping factor [%], accuracy [G], hold time [s]', StructOf(s=FloatRange(-10000, 10000, unit='G'), d=FloatRange(0, 100, unit='%'), f=FloatRange(0, 10000, unit='G'), t=FloatRange(0, unit='s')), readonly=False, default=dict(s=0, d=0, f=0, t=0), group='approach_settings') def write_degauss(self, value): self.sendcmd('Set::SetDegaussing %g,%g,%g,%g' % (value['s'] * 1e-4, value['d'], value['f'] * 1e-4, value['t'])) return value current = Parameter( 'leads current (in units of field)', FloatRange(unit='$')) voltage = Parameter( 'voltage over leads', FloatRange(unit='V')) switch_heater = Parameter( 'voltage over leads', EnumType(OFF=0, ON=1)) @staticmethod def cvt_switch_heater(text): return 'ON' in text @Command() def stop(self): self.sendcmd('Set::Abort') @Command() def reset_quench(self): """reset quench condition""" self.sendcmd('Set::ResetQuench') @Command() def reset_error(self): """reset error""" self._error_text = '' class MainMagfield(Magfield): checked_modules = None def earlyInit(self): super().earlyInit() self.checked_modules = [] # TODO: turn into a property constraint = Parameter('product check', FloatRange(unit='G^2'), default=80000) def check_combined(self, obj, value, main_target): sumvalue2 = sum((max(o.value ** 2, value ** 2 if o == obj else 0) for o in self.checked_modules)) if sumvalue2 * max(self.value ** 2, main_target) > self.constraint ** 2: raise BadValueError('outside constraint (B * Bxyz > %g G^2' * self.constraint) def check_limits(self, value): super().check_limits(value) self.check_combined(None, 0, value) class ComponentField(Magfield): check_against = Attached(MainMagfield) def initModule(self): super().initModule() self.check_against.checked_modules.append(self) def check_limits(self, value): super().check_limits(value) self.check_against.check_combined(self, value, 0) class Compressor(Channel, Drivable): def initModule(self): super().initModule() self.main.register_module( self, value=('Compressor_Status', self.cvt_value), _ready_text=('Compressor_Ready', str), _error_text=('Compressor_Error', str), run_time='Compressor_RunTime', ) # TODO: what is Compressor_Error? (without A or B) value = Parameter('compressor switch', EnumType(OFF=0, ON=1)) run_time = Parameter('run time', FloatRange(0, unit='h')) _status_text = '' _ready_text = '' _error_text = '' def cvt_value(self, text): self._status_text = text value = text == 'Running' if time.time() > self.block_until.get('target', 0): self.target = value return value def read_value(self): return self._status_text == 'Running' def read_status(self): if self.target != self.value: return BUSY, 'switching %s' % self.target.name # TODO: find out possible status texts if self._ready_text == 'TRUE': return IDLE, 'ready' if self._error_text: return ERROR, self._error_text return IDLE, self._status_text target = Parameter('compressor switch', EnumType(OFF=0, ON=1)) def write_target(self, value): if value: self.sendcmd('SetCompressor:Start ') else: self.sendcmd('SetCompressor:Stop ') self.block('target') self.read_status() return value @Command() def reset_error(self): """reset error""" self.sendcmd('Set:Compressor:Reset ') self._error_text = ''