"""Usage: in a config file: include('racktools') """ import os from frappy.errors import ConfigError class Rack: racks = {} def __init__(self, name, **kwds): self.racks[name] = kwds # contains # Rack('', ls_type=336, ls_io='...') include('racklist') # contains either rack='' # or rack = """ # prep8=cc12 # prep9=... # """ rack = None include('rack') class RackConfig: def __init__(self, config_or_rack=None): if isinstance(config_or_rack, dict): # direct configuration self.config = config_or_rack else: instrument = os.environ.get('Instrument') or os.uname().nodename.split('.')[0] racks = config_or_rack.split('\n') if len(racks) == 1: found_rack = racks[0].strip() else: for line in racks: if line.strip().startswith(instrument): found_rack = line.split('=', 1)[-1].strip() break else: found_rack = instrument # take from rack config self.config = Rack.racks.get(found_rack) self.props = {} # dict (, ) of value self.mods = {} # dict (, ) of list of self.ccu_cfg = {} self.log = include.log self.ls_dev = None def set_props(self, mod, **kwds): for prop, method in kwds.items(): value = self.props.get((prop, method)) if value is None: # add mod to the list of cfgs to be fixed self.mods.setdefault((prop, method), []).append(mod) else: # set prop in current module if not mod.get(prop): # do not override given and not empty property mod[prop] = value def fix_props(self, method, **kwds): for prop, value in kwds.items(): if (prop, method) in self.props: raise ConfigError(f'duplicate call to {method}()') self.props[prop, method] = value # set property in modules to be fixed for mod in self.mods.get((prop, method), ()): mod[prop] = value def lakeshore(self, io='ls_io', dev='ls', **kwds): if self.ls_dev: return self.fix_props('lakeshore', io=io, device=dev) self.ls_model = model = self.config.get('ls_model') self.ls_dev = dev Mod(io, cls=f'frappy_psi.lakeshore.IO{self.ls_model}', description='comm. to lakeshore in cc rack', uri=self.config.get('ls_uri')) self.dev = Mod(dev, cls=f'frappy_psi.lakeshore.Device{self.ls_model}', description='lakeshore in cc rack', io=io, curve_handling=True) def sensor(self, name, channel, calcurve, **kwds): self.lakeshore() kwds.setdefault('cls', f'frappy_psi.lakeshore.Sensor{self.ls_model}') kwds.setdefault('description', f'T sensor {name}') mod = Mod(name, channel=channel, calcurve=calcurve, device=self.ls_dev, **kwds) self.set_props(mod, io='lakeshore', dev='lakeshore') def loop(self, name, channel, calcurve, output_module, **kwds): self.lakeshore() kwds.setdefault('cls', f'frappy_psi.lakeshore.Loop{self.ls_model}') kwds.setdefault('description', f'T loop {name}') Mod(name, channel=channel, calcurve=calcurve, output_module=output_module, device=self.ls_dev, **kwds) self.fix_props(f'heater({output_module})', description=f'heater for {name}') def heater(self, name, output_no, max_heater, resistance, **kwds): self.lakeshore() if output_no == 1: kwds.setdefault('cls', f'frappy_psi.lakeshore.MainOutput{self.ls_model}') elif output_no == 2: kwds.setdefault('cls', f'frappy_psi.lakeshore.SecondaryOutput{self.ls_model}') else: return kwds.setdefault('description', '') mod = Mod(name, max_heater=max_heater, resistance=resistance, **kwds) self.set_props(mod, io='lakeshore', device='lakeshore', description=f'heater({name})') def ccu(self, name=None, **kwds): """internal method, called by the methods below""" name = name or 'ccu_io' cfg = {'name': name, **kwds} if self.ccu_cfg: if self.ccu_cfg != cfg: raise ConfigError('ccu config mismatch') return name self.ccu_cfg = cfg Mod(name, 'frappy_psi.ccu4.IO', 'comm. to CCU4', uri=self.config['ccu_uri'], **kwds) return name def he(self, name='He_lev', **kwds): Mod(name, cls='frappy_psi.ccu4.HeLevel', description='the He Level', io=self.ccu(), **kwds) def n2(self, name='N2_lev', valve='N2_valve', upper='N2_upper', lower='N2_lower', **kwds): ccu_io = self.ccu() Mod(name, cls='frappy_psi.ccu4.N2Level', description='the N2 Level', io=ccu_io, valve=valve, upper=upper, lower=lower) Mod(valve, cls='frappy_psi.ccu4.N2FillValve', description='LN2 fill valve', io=ccu_io) Mod(upper, cls='frappy_psi.ccu4.N2TempSensor', description='upper LN2 sensor') Mod(lower, cls='frappy_psi.ccu4.N2TempSensor', description='lower LN2 sensor') def flow(self, hepump_io='hepump_io', hepump='hepump', hepump_mot='hepump_mot', hepump_valve='hepump_valve', flow_sensor='flow_sensor', pump_pressure='pump_pressure', nv='nv', **kwds): """creates needle valve and pump access if available""" ccu_io = self.ccu() hepump_type = self.config.get('hepump_type', 'unknown') hepump_uri = self.config.get('hepump_uri') if not hepump_uri: flow_sensor = None Mod(nv, 'frappy_psi.ccu4.NeedleValveFlow', 'flow from flow sensor or pump pressure', flow_sensor=flow_sensor, pressure=pump_pressure, pump_type=hepump_type, io=ccu_io, **kwds) Mod(pump_pressure, 'frappy_psi.ccu4.Pressure', 'He pump pressure', io=ccu_io) if not flow_sensor: self.log.warning('no pump remote control, no flow meter - using flow from pressure alone') return Mod(hepump_io, 'frappy.io.BytesIO', 'He pump connection', uri=hepump_uri) Mod(hepump, 'frappy_psi.hepump.HePump', 'He pump', pump_type=hepump_type, valvemotor=hepump_mot, valve=hepump_valve) Mod(hepump_mot, 'frappy_psi.hepump.Motor', 'He pump valve motor', io=hepump_io, maxcurrent=2.8) Mod(hepump_valve, 'frappy_psi.butterflyvalve.Valve', 'He pump valve', motor=hepump_mot) Mod(flow_sensor, 'frappy_psi.sensirion.FlowSensor', 'Flow Sensor', io=hepump_io, nsamples=160) rackcfg = RackConfig(rack) LakeShore = rackcfg.lakeshore LsSensor = rackcfg.sensor LsLoop = rackcfg.loop LsHeater = rackcfg.heater Ccu = rackcfg.ccu HeLevel = rackcfg.he N2Level = rackcfg.n2 Flow = rackcfg.flow