Files
frappycfg/racktools.py

179 lines
6.9 KiB
Python

"""Usage:
in a config file:
include('racktools')
"""
import os
from frappy.errors import ConfigError
class Rack:
racks = {}
def __init__(self, name, **kwds):
self.racks[name] = kwds
# contains
# Rack('<rack name>', ls_type=336, ls_io='...')
include('racklist')
# contains either rack='<current rack>'
# or rack = """
# prep8=cc12
# prep9=...
# """
rack = None
include('rack')
class RackConfig:
def __init__(self, config_or_rack=None):
if isinstance(config_or_rack, dict):
# direct configuration
self.config = config_or_rack
else:
instrument = os.environ.get('Instrument') or os.uname().nodename.split('.')[0]
racks = config_or_rack.split('\n')
if len(racks) == 1:
found_rack = racks[0].strip()
else:
for line in racks:
if line.strip().startswith(instrument):
found_rack = line.split('=', 1)[-1].strip()
break
else:
found_rack = instrument
# take from rack config
self.config = Rack.racks.get(found_rack)
self.props = {} # dict (<property>, <method>) of value
self.mods = {} # dict (<property>, <method>) of list of <cfg>
self.ccu_cfg = {}
self.log = include.log
self.ls_dev = None
def set_props(self, mod, **kwds):
for prop, method in kwds.items():
value = self.props.get((prop, method))
if value is None:
# add mod to the list of cfgs to be fixed
self.mods.setdefault((prop, method), []).append(mod)
else:
# set prop in current module
if not mod.get(prop): # do not override given and not empty property
mod[prop] = value
def fix_props(self, method, **kwds):
for prop, value in kwds.items():
if (prop, method) in self.props:
raise ConfigError(f'duplicate call to {method}()')
self.props[prop, method] = value
# set property in modules to be fixed
for mod in self.mods.get((prop, method), ()):
mod[prop] = value
def lakeshore(self, io='ls_io', dev='ls', **kwds):
if self.ls_dev:
return
self.fix_props('lakeshore', io=io, device=dev)
self.ls_model = model = self.config.get('ls_model')
self.ls_dev = dev
Mod(io, cls=f'frappy_psi.lakeshore.IO{self.ls_model}',
description='comm. to lakeshore in cc rack', uri=self.config.get('ls_uri'))
self.dev = Mod(dev, cls=f'frappy_psi.lakeshore.Device{self.ls_model}',
description='lakeshore in cc rack', io=io, curve_handling=True)
def sensor(self, name, channel, calcurve, **kwds):
self.lakeshore()
kwds.setdefault('cls', f'frappy_psi.lakeshore.Sensor{self.ls_model}')
kwds.setdefault('description', f'T sensor {name}')
mod = Mod(name, channel=channel, calcurve=calcurve,
device=self.ls_dev, **kwds)
self.set_props(mod, io='lakeshore', dev='lakeshore')
def loop(self, name, channel, calcurve, output_module, **kwds):
self.lakeshore()
kwds.setdefault('cls', f'frappy_psi.lakeshore.Loop{self.ls_model}')
kwds.setdefault('description', f'T loop {name}')
Mod(name, channel=channel, calcurve=calcurve, output_module=output_module,
device=self.ls_dev, **kwds)
self.fix_props(f'heater({output_module})', description=f'heater for {name}')
def heater(self, name, output_no, max_heater, resistance, **kwds):
self.lakeshore()
if output_no == 1:
kwds.setdefault('cls', f'frappy_psi.lakeshore.MainOutput{self.ls_model}')
elif output_no == 2:
kwds.setdefault('cls', f'frappy_psi.lakeshore.SecondaryOutput{self.ls_model}')
else:
return
kwds.setdefault('description', '')
mod = Mod(name, max_heater=max_heater, resistance=resistance, **kwds)
self.set_props(mod, io='lakeshore', device='lakeshore', description=f'heater({name})')
def ccu(self, name=None, **kwds):
"""internal method, called by the methods below"""
name = name or 'ccu_io'
cfg = {'name': name, **kwds}
if self.ccu_cfg:
if self.ccu_cfg != cfg:
raise ConfigError('ccu config mismatch')
return name
self.ccu_cfg = cfg
Mod(name, 'frappy_psi.ccu4.IO', 'comm. to CCU4', uri=self.config['ccu_uri'], **kwds)
return name
def he(self, name='He_lev', **kwds):
Mod(name, cls='frappy_psi.ccu4.HeLevel',
description='the He Level', io=self.ccu(), **kwds)
def n2(self, name='N2_lev', valve='N2_valve', upper='N2_upper', lower='N2_lower', **kwds):
ccu_io = self.ccu()
Mod(name, cls='frappy_psi.ccu4.N2Level',
description='the N2 Level', io=ccu_io,
valve=valve, upper=upper, lower=lower)
Mod(valve, cls='frappy_psi.ccu4.N2FillValve',
description='LN2 fill valve', io=ccu_io)
Mod(upper, cls='frappy_psi.ccu4.N2TempSensor',
description='upper LN2 sensor')
Mod(lower, cls='frappy_psi.ccu4.N2TempSensor',
description='lower LN2 sensor')
def flow(self, hepump_io='hepump_io', hepump='hepump', hepump_mot='hepump_mot',
hepump_valve='hepump_valve', flow_sensor='flow_sensor', pump_pressure='pump_pressure',
nv='nv', **kwds):
"""creates needle valve and pump access if available"""
ccu_io = self.ccu()
hepump_type = self.config.get('hepump_type', 'unknown')
hepump_uri = self.config.get('hepump_uri')
if not hepump_uri:
flow_sensor = None
Mod(nv, 'frappy_psi.ccu4.NeedleValveFlow', 'flow from flow sensor or pump pressure',
flow_sensor=flow_sensor, pressure=pump_pressure, pump_type=hepump_type, io=ccu_io, **kwds)
Mod(pump_pressure, 'frappy_psi.ccu4.Pressure', 'He pump pressure', io=ccu_io)
if not flow_sensor:
self.log.warning('no pump remote control, no flow meter - using flow from pressure alone')
return
Mod(hepump_io, 'frappy.io.BytesIO', 'He pump connection', uri=hepump_uri)
Mod(hepump, 'frappy_psi.hepump.HePump', 'He pump', pump_type=hepump_type,
valvemotor=hepump_mot, valve=hepump_valve)
Mod(hepump_mot, 'frappy_psi.hepump.Motor', 'He pump valve motor', io=hepump_io, maxcurrent=2.8)
Mod(hepump_valve, 'frappy_psi.butterflyvalve.Valve', 'He pump valve', motor=hepump_mot)
Mod(flow_sensor, 'frappy_psi.sensirion.FlowSensor', 'Flow Sensor', io=hepump_io, nsamples=160)
rackcfg = RackConfig(rack)
LakeShore = rackcfg.lakeshore
LsSensor = rackcfg.sensor
LsLoop = rackcfg.loop
LsHeater = rackcfg.heater
Ccu = rackcfg.ccu
HeLevel = rackcfg.he
N2Level = rackcfg.n2
Flow = rackcfg.flow