# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """drivers for CCU4, the cryostat control unit at SINQ""" import time import math import numpy as np from frappy.lib.enum import Enum from frappy.lib import clamp, formatExtendedTraceback from frappy.lib.interpolation import Interpolation # the most common Frappy classes can be imported from frappy.core from frappy.core import HasIO, Parameter, Command, Readable, Writable, Drivable, \ Property, StringIO, BUSY, IDLE, WARN, ERROR, DISABLED, Attached, nopoll from frappy.datatypes import BoolType, EnumType, FloatRange, StructOf, \ StatusType, IntRange, StringType, TupleOf, ArrayOf from frappy.errors import CommunicationFailedError from frappy.states import HasStates, status_code, Retry M = Enum(idle=0, opening=1, closing=2, opened=3, closed=4, no_motor=5) A = Enum(disabled=0, manual=1, auto=2) class IO(StringIO): """communication with CCU4""" # for completeness: (not needed, as it is the default) end_of_line = '\n' # on connect, we send 'cid' and expect a reply starting with 'CCU4' identification = [('cid', r'cid=CCU4.*')] class Base(HasIO): ioClass = IO def command(self, **kwds): """send a command and get the response :param kwds: = for changing a parameter = for querying a parameter :returns: the (new) values of the parameters """ types = {} cmds = [] for key, value in kwds.items(): if isinstance(value, type): types[key] = value cmds.append(key) elif isinstance(value, str): types[key] = str cmds.append(f'{key}={value}') else: types[key] = float cmds.append(f'{key}={value:g}') reply = self.io.communicate(' '.join(cmds)).split() if len(reply) != len(types): raise CommunicationFailedError('number of reply items does not match') result = [] for (given, typ), res in zip(types.items(), reply): name, txtvalue = res.split('=') if given != name: raise CommunicationFailedError('result keys do not match given keys') result.append(typ(txtvalue)) if len(kwds) == 1: return result[0] return result class HeLevel(Base, Readable): """He Level channel of CCU4""" value = Parameter(unit='%') empty_length = Parameter('warm length when empty', FloatRange(0, 2000, unit='mm'), readonly=False) full_length = Parameter('warm length when full', FloatRange(0, 2000, unit='mm'), readonly=False) sample_rate = Parameter('sample rate', EnumType(slow=0, fast=1), readonly=False) status = Parameter(datatype=StatusType(Readable, 'DISABLED')) # conversion of the code from the CCU4 parameter 'hsf' STATUS_MAP = { 0: (IDLE, 'sensor ok'), 1: (ERROR, 'sensor warm'), 2: (ERROR, 'no sensor'), 3: (ERROR, 'timeout'), 4: (ERROR, 'not yet read'), 5: (DISABLED, 'disabled'), } def read_value(self): return self.command(h=float) def read_status(self): return self.STATUS_MAP[int(self.command(hsf=int))] def read_sample_rate(self): value, self.empty_length, self.full_length = self.command(hf=int, hem=float, hfu=float) return value def write_sample_rate(self, value): return self.command(hf=value) def write_empty_length(self, value): return self.command(hem=value) def write_full_length(self, value): return self.command(hfu=value) class Valve(Base, Writable): value = Parameter('relay state', BoolType()) target = Parameter('relay target', BoolType()) ioClass = IO STATE_MAP = {0: (0, (IDLE, 'off')), 1: (1, (IDLE, 'on')), 2: (0, (ERROR, 'no valve')), 3: (0, (WARN, 'timeout')), # timeout in filling process (takes too long to fill) 4: (0, (WARN, 'timeout1')), # timeout in filling process (takes too long to start) 5: (1, (IDLE, 'boost')), } _open_command = None _close_command = None _query_state = None def write_target(self, target): if target: self.command(**self._open_command) else: self.command(**self._close_command) def read_status(self): state = int(self.command(**self._query_state)) self.value, status = self.STATE_MAP[state] return status class HeFillValve(Valve): _open_command = {'hcd': 1, 'hf': 1} _close_command = {'hcd': 0, 'hf': 0} _query_state = {'hv': int} class N2FillValve(Valve): _open_command = {'nc': 1} _close_command = {'nc': 0} _query_state = {'nv': int} class AuxValve(Valve): channel = Property('valve number', IntRange(1, 12)) def initModule(self): self._open_command = {f'vc{self.channel}': 1} self._close_command = {f'vc{self.channel}': 0} self._query_state = {f'v{self.channel}': int} class N2TempSensor(Readable): value = Parameter('LN2 T sensor', FloatRange(unit='K'), default=0) class N2Level(Base, Readable): valve = Attached(Writable, mandatory=False) lower = Attached(Readable, mandatory=False) upper = Attached(Readable, mandatory=False) value = Parameter('vessel state', EnumType(empty=0, ok=1, full=2)) status = Parameter(datatype=StatusType(Readable, 'DISABLED', 'BUSY')) mode = Parameter('auto mode', EnumType(A), readonly=False, default=A.manual) threshold = Parameter('threshold triggering start/stop filling', FloatRange(unit='K'), readonly=False, default=90) cool_delay = Parameter('max. minutes needed to cool the lower sensor', FloatRange(unit='s'), readonly=False, default=30) fill_timeout = Parameter('max. minutes needed to fill', FloatRange(unit='s'), readonly=False) names = Property('''names of attached modules configure members as empty strings to disable the creation ''', StructOf(valve=StringType(), lower=StringType(), upper=StringType()), default={'valve': '$_valve', 'lower': '$_lower', 'upper': '$_upper'}) # conversion of the code from the CCU4 parameter 'ns' STATUS_MAP = { 0: (IDLE, 'sensor ok'), 1: (ERROR, 'no sensor'), 2: (ERROR, 'short circuit'), 3: (ERROR, 'upside down'), 4: (ERROR, 'sensor warm'), 5: (WARN, 'empty'), } def initialReads(self): self.command(nav=1) # tell CCU4 to activate LN2 sensor readings super().initialReads() def read_status(self): auto, nstate = self.command(na=int, ns=int) if not self.valve or not auto: if self.mode == A.auto: # no valve assigned self.mode = A.manual if self.mode == A.disabled: return DISABLED, '' status = self.STATUS_MAP[nstate] if status[0] // 100 != IDLE // 100: return status if self.mode == A.manual: return IDLE, '' vstatus = self.valve.status if vstatus[0] // 100 == WARN // 100: return ERROR, vstatus[1] if vstatus[0] // 100 != IDLE // 100: return vstatus if self.valve.value: return BUSY, 'filling' return IDLE, 'watching' def read_value(self): # read sensors lower, upper = self.command(nl=float, nu=float) if self.lower: self.lower.value = lower if self.upper: self.upper.value = upper if upper < self.threshold: return 'full' if lower < self.threshold: return 'ok' return 'empty' def write_mode(self, mode): if mode == A.auto: if self.isBusy(): return mode # set to watching self.command(nc=3) else: # set to off self.command(nc=2) return mode def read_threshold(self): value, self.cool_delay, self.fill_timeout = self.command(nth=float, ntc=float, ntm=float) return value def write_threshold(self, value): return self.command(nth=value) def write_cool_delay(self, value): return self.command(ntc=value) def write_fill_timeout(self, value): return self.command(ntm=value) @Command() def fill(self): """start filling""" self.mode = A.auto self.command(nc=1) @Command() def stop(self): """stop filling""" if self.mode == A.auto: # set to watching self.command(nc=3) else: # set to off self.command(nc=0) class N2LevelGuess(N2Level): """guess the current level from hold time""" value = Parameter('estimated level', FloatRange(unit='%'), default=20) fill_time = Parameter('min fill time - for raw level indicator', FloatRange(unit='s'), default=600) hold_time = Parameter('min hold time - for raw level indicator', FloatRange(unit='s'), default=24 * 3600) _full_since = None _empty_since = None _fill_state = '' # may also be 'empty', 'full' or 'unknown' _lower = 0 _upper = 0 def read_status(self): status = super().read_status() if status == (IDLE, ''): return IDLE, self._fill_state return status def read_value(self): # read sensors now = time.time() lower, upper = self.command(nl=float, nu=float) if self.lower: self.lower.value = lower if self.upper: self.upper.value = upper if upper < self.threshold: self._full_since = now if self._empty_since is not None: self.fill_time = now - self._empty_since self._empty_since = None self._fill_state = 'full' return 100 if lower < self.threshold: if self._empty_since is None: if self._full_since is None: self._fill_state = 'unknown' return 20 delay = now - self._full_since value = max(10, 100 * 1 - delay / self.hold_time) if value < 99: self._fill_state = '' return value delay = now - self._empty_since - self.cool_delay value = min(90, 100 * max(0, delay / self.fill_time)) if value >= 10: self._fill_state = '' return value if self._full_since is not None: self.hold_time = now - self._full_since self._full_since = None self.log.info('lower %g upper %g threshold %g', lower, upper, self.threshold) self._empty_since = now self._fill_state = 'empty' return 0 class HasFilter: __value1 = None __value = None __last = None def filter(self, filter_time, value): now = time.time() if self.__value is None: self.__last = now self.__value1 = value self.__value = value weight = (now - self.__last) / filter_time self.__value1 += weight * (value - self.__value) self.__value += weight * (self.__value1 - self.__value) self.__last = now return self.__value class Pressure(HasFilter, Base, Readable): value = Parameter(unit='mbar') mbar_offset = Parameter('offset in mbar', FloatRange(unit='mbar'), default=0.8, readonly=False) filter_time = Parameter('filter time', FloatRange(unit='sec'), readonly=False, default=3) pollinterval = Parameter(default=0.25) def read_value(self): return self.filter(self.filter_time, self.command(f=float)) - self.mbar_offset def Table(miny=None, maxy=None): return ArrayOf(TupleOf(FloatRange(), FloatRange(miny, maxy))) class NeedleValveFlow(HasStates, Base, Drivable): flow_sensor = Attached(Readable, mandatory=False) pressure = Attached(Pressure, mandatory=False) use_pressure = Parameter('flag (use pressure instead of flow meter)', BoolType(), readonly=False, default=False) lnm_per_mbar = Parameter('scale factor', FloatRange(unit='lnm/mbar'), readonly=False, default=0.6) pump_type = Parameter('pump type', EnumType(unknown=0, neodry=1, xds35=2, sv65=3), readonly=False, value=0) value = Parameter(unit='ln/min') target = Parameter(unit='ln/min') motor_state = Parameter('motor_state', EnumType(M), default=0) speed = Parameter('speed moving time / passed time', FloatRange()) tolerance = Parameter('tolerance', Table(0), value=[(2,0.1),(4,0.4)], readonly=False) prop_open = Parameter('proportional term for opening', Table(0), readonly=False, value=[(1,0.05)]) prop_close = Parameter('proportional term for closing', Table(0), readonly=False, value=[(1,0.02)]) deriv = Parameter('min progress time constant', FloatRange(unit='s'), default=30, readonly=False) control_active = Parameter('control active flag', BoolType(), readonly=False, default=1) min_open_pulse = Parameter('minimal open step', FloatRange(0, unit='s'), readonly=False, default=0.02) min_close_pulse = Parameter('minimal close step', FloatRange(0, unit='s'), readonly=False, default=0.0) flow_closed = Parameter('flow when needle valve is closed', FloatRange(unit='ln/min'), readonly=False, default=0.0) # raw_open_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.12) # raw_close_step = Parameter('step after direction change', FloatRange(unit='s'), readonly=False, default=0.04) FLOW_SCALE = {'unknown': 1, 'neodry': 0.55, 'xds35': 0.6, 'sv65': 0.9} pollinterval = Parameter(datatype=FloatRange(1, unit='s'), default=5) _last_dirchange = 0 _ref_time = 0 _ref_dif = 0 _dir = 0 _rawdir = 0 _step = 0 _speed_sum = 0 _last_era = 0 _value = None def doPoll(self): # poll at least every sec, but update value only # every pollinterval and status when changed if not self.pollInfo.fast_flag: self.pollInfo.interval = min(1, self.pollinterval) # reduce internal poll interval self._value = self.get_value() self._last.append(self._value) del self._last[0:-300] self.read_motor_state() era = time.time() // self.pollinterval if era != self._last_era: self.speed = self._speed_sum / self.pollinterval self._speed_sum = 0 self.value = self._value self._last_era = era self.read_status() self.cycle_machine() def get_value(self): p = self.pressure.read_value() * self.lnm_per_mbar f = self.flow_sensor.read_value() return p if self.use_pressure else f def initModule(self): self._last = [] if self.pressure: self.pressure.addCallback('value', self.update_from_pressure) if self.flow_sensor: self.flow_sensor.addCallback('value', self.update_from_flow) super().initModule() def update_from_flow(self, value): if not self.use_pressure: self._value = value def update_from_pressure(self, value): if self.use_pressure: self._value = value * self.lnm_per_mbar # self.cycle_machine() def read_value(self): self._value = self.get_value() return self._value def read_use_pressure(self): if self.pressure: if self.flow_sensor: return self.use_pressure return True return False def write_target(self, value): self.log.debug('change target') self.start_machine(self.change_target, target=value, try_close=True) def write_pump_type(self, value): self.pressure_scale = self.FLOW_SCALE[value.name] def write_prop_open(self, value): self._prop_open = Interpolation(value) return self._prop_open def write_prop_close(self, value): self._prop_close = Interpolation(value) return self._prop_close def write_tolerance(self, value): self._tolerance = Interpolation(value) return self._tolerance @status_code(BUSY) def change_target(self, sm): self.target = sm.target sm.last_progress = sm.now sm.ref_time = 0 sm.ref_dif = 0 sm.last_pulse_time = 0 sm.no_progress_pulse = (0.1, -0.05) self.log.debug('target %s value %s', self.target, self._value) tol = self._tolerance(self.target) if abs(self.target - self._value) < tol: self.log.debug('go to at_target') return self.at_target self.log.debug('go to controlling') return self.controlling def _dif_medians(self): return np.array([self.target - np.median(self._last[-m:]) for m in (1, 5, 12, 30, 60)]) @status_code(BUSY) def controlling(self, sm): tol = self._tolerance(self.target) dif = self._dif_medians() if sm.init: self.log.debug('restart controlling') direction = math.copysign(1, dif[1]) if direction != self._dir: self.log.debug('new dir %g dif=%g', direction, dif[1]) self._dir = direction self._last_dirchange = sm.now sm.ref_dif = abs(dif[1]) sm.ref_time = sm.now difdir = dif * self._dir # negative when overshoot happend # difdif = dif - self._prev_dif # self._prev_dif = dif expected_dif = sm.ref_dif * math.exp((sm.ref_time - sm.now) / self.deriv) if np.all(difdir < tol): if np.all(difdir < -tol): self.log.debug('overshoot %r', dif) return self.controlling if not np.any(difdir < -tol): # within tolerance self.log.debug('at target %r tol %g', dif, tol) return self.at_target if np.all(difdir > expected_dif): # not enough progress if sm.now > sm.last_progress + self.deriv: lim = self.flow_closed + 0.5 if sm.try_close and self._value <= lim - tol and self.target >= lim + tol: sm.try_close = False self.command(mp=-60) sm.after_close = self.open_until_flow_increase self.log.debug('go to closing / open_until_flow_increase') return self.closing if sm.no_progress_pulse: pulse = abs(sm.no_progress_pulse[self._dir < 0]) * self._dir self.log.debug('not enough progress %g %r', pulse, sm.try_close) self.pulse(pulse) sm.last_progress = sm.now if sm.now < sm.last_pulse_time + 2.5: return Retry # TODO: check motor state for closed / opened ? difd = min(difdir[:2]) sm.last_pulse_time = sm.now if self._dir > 0: minstep = self.min_open_pulse prop = self._prop_open(self._value) else: minstep = self.min_close_pulse prop = self._prop_close(self._value) if difd > 0: if prop * tol > minstep: # step outside tol is already minstep step = difd * prop else: if difd > tol: step = (minstep + (difd - tol) * prop) else: step = minstep * difd / tol step *= self._dir self.log.debug('MP %g dif=%g tol=%g', step, difd * self._dir, tol) self.command(mp=step) self._speed_sum += step return Retry # still approaching difmax = max(difdir) if difmax < expected_dif: sm.ref_time = sm.now sm.ref_dif = difmax # self.log.info('new ref %g', sm.ref_dif) sm.last_progress = sm.now return Retry # progressing: no pulse needed @status_code(IDLE) def at_target(self, sm): tol = self._tolerance(self.target) dif = self._dif_medians() if np.all(dif > tol) or np.all(dif < -tol): self.log.debug('unstable %r %g', dif, tol) return self.unstable return Retry @status_code(IDLE, 'unstable') def unstable(self, sm): sm.no_progress_pulse = None return self.controlling(sm) def read_motor_state(self): return self.command(fm=int) @Command def close(self): """close valve fully""" self.command(mp=-60) self.motor_state = self.command(fm=int) self.start_machine(self.closing, after_close=None, fast_poll=0.1) @status_code(BUSY) def closing(self, sm): if sm.init: sm.start_time = sm.now self._speed_sum -= sm.delta() self.read_motor_state() if self.motor_state == M.closing: return Retry if self.motor_state == M.closed: if sm.after_close: return sm.after_close return self.final_status(IDLE, 'closed') if sm.now < sm.start_time + 1: return Retry return self.final_status(IDLE, 'fixed') @Command def open(self): """open valve fully""" self.command(mp=60) self.read_motor_state() self.start_machine(self.opening, threshold=None) @status_code(BUSY) def opening(self, sm): if sm.init: sm.start_time = sm.now self._speed_sum += sm.dleta() self.read_motor_state() if self.motor_state == M.opening: return Retry if self.motor_state == M.opened: return self.final_status(IDLE, 'opened') if sm.now < sm.start_time + 1: return Retry return self.final_status(IDLE, 'fixed') @Command def close_test(self): """close and then try to open until the flow starts to increase save a """ self.command(mp=-60) self.start_machine(self.closing, fast_poll=0.1, after_close=self.open_until_flow_increase, target=0) @status_code(BUSY) def open_until_flow_increase(self, sm): if sm.init: p = self.command(f=float) sm.threshold = 0.5 sm.prev = [p] sm.ref = p sm.cnt = 0 sm.low_flow = 0 self.read_motor_state() if self.motor_state == M.opening: return Retry if self.motor_state == M.opened: return self.final_status(IDLE, 'opened') press, measured = self.command(f=float, mmp=float) sm.prev.append(press) if press > sm.ref + 0.2: sm.cnt += 1 if sm.cnt > 5 or press > sm.ref + 0.5: self.flow_closed = sm.low_flow self.log.debug('flow increased %g', press) if sm.target == 0: sm.target = sm.low_flow + 0.5 return self.change_target self.log.debug('wait count %g', press) return Retry sm.low_flow = self.value sm.cnt = 0 last5 = sm.prev[-5:] median = sorted(last5)[len(last5) // 2] if press > median: # avoid to pulse again after an even small increase self.log.debug('wait %g', press) return Retry sm.ref = min(sm.prev[0], median) if measured: self._speed_sum += measured if measured < 0.1: sm.threshold = round(sm.threshold * 1.1, 2) elif measured > 0.3: sm.threshold = round(sm.threshold * 0.9, 2) self.log.debug('measured %g new threshold %g press %g', measured, sm.threshold, press) else: self._speed_sum += 1 self.log.debug('full pulse') sm.cnt = 0 self.command(mft=sm.ref + sm.threshold, mp=1) return Retry @Command(FloatRange()) def pulse(self, value): """perform a motor pulse""" self.command(mp=value) self._speed_sum += value if value > 0: self.motor_state = M.opening return self.opening self.motor_state = M.closing return self.closing @Command() def autopar(self): """adjust automatically needle valve parameters""" self.close() self.start_machine(self.auto_wait, open_pulse=0.1, close_pulse=0.05, minflow=self.read_value(), last=None) return self.auto_wait def is_stable(self, sm, n, tol=0.01): """wait for a stable flow n: size of buffer tol: a tolerance """ if sm.last is None: sm.last = [] sm.cnt = 0 v = self.read_value() sm.last.append(v) del sm.last[:-n] dif = v - sm.last[0] if dif < -tol: sm.cnt -= 1 elif dif > tol: sm.cnt += 1 else: sm.cnt -= clamp(-1, sm.cnt, 1) if len(sm.last) < n: return False return abs(sm.cnt) < n // 2 def is_unstable(self, sm, n, tol=0.01): """wait for a stable flow return 0, -1 or 1 """ if sm.last is None: sm.last = [] sm.cnt = 0 v = self.read_value() prevmax = max(sm.last) prevmin = min(sm.last) sm.last.append(v) del sm.last[:-n] self.log.debug('unstable %g >? %g prevmax + tol: return 1 if v < prevmin - tol: return -1 return 0 @status_code(BUSY) def auto_wait(self, sm): stable = self.is_stable(sm, 5, 0.01) if self._value < sm.minflow: sm.minflow = self._value if self.read_motor_state() == M.closing or not stable: return Retry return self.auto_open @status_code(BUSY) def auto_open(self, sm): stable = self.is_unstable(sm, 5, 0.1) if stable > 0: sm.start_time = sm.now sm.flow_before = sm.last[-1] self.pulse(sm.open_pulse) return self.auto_close if sm.delta(sm.open_pulse * 2) is not None: self.pulse(sm.open_pulse) return Retry @status_code(BUSY) def auto_open_stable(self, sm): if self.is_stable(sm, 5, 0.01): return Retry return self.auto_close @status_code(BUSY) def auto_close(self, sm): if not self.is_stable(sm, 10, 0.01): return Retry self.log.debug('before %g pulse %g, flowstep %g', sm.flow_before, sm.open_pulse, sm.last[-1] - sm.flow_before) self.close() return self.final_status(IDLE, '')