179 lines
6.9 KiB
Python
179 lines
6.9 KiB
Python
"""Usage:
|
|
|
|
in a config file:
|
|
|
|
include('racktools')
|
|
|
|
|
|
"""
|
|
import os
|
|
from frappy.errors import ConfigError
|
|
|
|
|
|
class Rack:
|
|
racks = {}
|
|
|
|
def __init__(self, name, **kwds):
|
|
self.racks[name] = kwds
|
|
|
|
|
|
# contains
|
|
# Rack('<rack name>', ls_type=336, ls_io='...')
|
|
|
|
include('racklist')
|
|
|
|
# contains either rack='<current rack>'
|
|
# or rack = """
|
|
# prep8=cc12
|
|
# prep9=...
|
|
# """
|
|
rack = None
|
|
include('rack')
|
|
|
|
|
|
class RackConfig:
|
|
def __init__(self, config_or_rack=None):
|
|
if isinstance(config_or_rack, dict):
|
|
# direct configuration
|
|
self.config = config_or_rack
|
|
else:
|
|
instrument = os.environ.get('Instrument') or os.uname().nodename.split('.')[0]
|
|
racks = config_or_rack.split('\n')
|
|
if len(racks) == 1:
|
|
found_rack = racks[0].strip()
|
|
else:
|
|
for line in racks:
|
|
if line.strip().startswith(instrument):
|
|
found_rack = line.split('=', 1)[-1].strip()
|
|
break
|
|
else:
|
|
found_rack = instrument
|
|
# take from rack config
|
|
self.config = Rack.racks.get(found_rack)
|
|
self.props = {} # dict (<property>, <method>) of value
|
|
self.mods = {} # dict (<property>, <method>) of list of <cfg>
|
|
self.ccu_cfg = {}
|
|
self.log = include.log
|
|
self.ls_dev = None
|
|
|
|
def set_props(self, mod, **kwds):
|
|
for prop, method in kwds.items():
|
|
value = self.props.get((prop, method))
|
|
if value is None:
|
|
# add mod to the list of cfgs to be fixed
|
|
self.mods.setdefault((prop, method), []).append(mod)
|
|
else:
|
|
# set prop in current module
|
|
if not mod.get(prop): # do not override given and not empty property
|
|
mod[prop] = value
|
|
|
|
def fix_props(self, method, **kwds):
|
|
for prop, value in kwds.items():
|
|
if (prop, method) in self.props:
|
|
raise ConfigError(f'duplicate call to {method}()')
|
|
self.props[prop, method] = value
|
|
# set property in modules to be fixed
|
|
for mod in self.mods.get((prop, method), ()):
|
|
mod[prop] = value
|
|
|
|
def lakeshore(self, io='ls_io', dev='ls', **kwds):
|
|
if self.ls_dev:
|
|
return
|
|
self.fix_props('lakeshore', io=io, device=dev)
|
|
self.ls_model = model = self.config.get('ls_model')
|
|
self.ls_dev = dev
|
|
Mod(io, cls=f'frappy_psi.lakeshore.IO{self.ls_model}',
|
|
description='comm. to lakeshore in cc rack', uri=self.config.get('ls_uri'))
|
|
self.dev = Mod(dev, cls=f'frappy_psi.lakeshore.Device{self.ls_model}',
|
|
description='lakeshore in cc rack', io=io, curve_handling=True)
|
|
|
|
def sensor(self, name, channel, calcurve, **kwds):
|
|
self.lakeshore()
|
|
kwds.setdefault('cls', f'frappy_psi.lakeshore.Sensor{self.ls_model}')
|
|
kwds.setdefault('description', f'T sensor {name}')
|
|
mod = Mod(name, channel=channel, calcurve=calcurve,
|
|
device=self.ls_dev, **kwds)
|
|
self.set_props(mod, io='lakeshore', dev='lakeshore')
|
|
|
|
def loop(self, name, channel, calcurve, output_module, **kwds):
|
|
self.lakeshore()
|
|
kwds.setdefault('cls', f'frappy_psi.lakeshore.Loop{self.ls_model}')
|
|
kwds.setdefault('description', f'T loop {name}')
|
|
Mod(name, channel=channel, calcurve=calcurve, output_module=output_module,
|
|
device=self.ls_dev, **kwds)
|
|
self.fix_props(f'heater({output_module})', description=f'heater for {name}')
|
|
|
|
def heater(self, name, output_no, max_heater, resistance, **kwds):
|
|
self.lakeshore()
|
|
if output_no == 1:
|
|
kwds.setdefault('cls', f'frappy_psi.lakeshore.MainOutput{self.ls_model}')
|
|
elif output_no == 2:
|
|
kwds.setdefault('cls', f'frappy_psi.lakeshore.SecondaryOutput{self.ls_model}')
|
|
else:
|
|
return
|
|
kwds.setdefault('description', '')
|
|
mod = Mod(name, max_heater=max_heater, resistance=resistance, **kwds)
|
|
self.set_props(mod, io='lakeshore', device='lakeshore', description=f'heater({name})')
|
|
|
|
def ccu(self, name=None, **kwds):
|
|
"""internal method, called by the methods below"""
|
|
name = name or 'ccu_io'
|
|
cfg = {'name': name, **kwds}
|
|
if self.ccu_cfg:
|
|
if self.ccu_cfg != cfg:
|
|
raise ConfigError('ccu config mismatch')
|
|
return name
|
|
self.ccu_cfg = cfg
|
|
Mod(name, 'frappy_psi.ccu4.IO', 'comm. to CCU4', uri=self.config['ccu_uri'], **kwds)
|
|
return name
|
|
|
|
def he(self, name='He_lev', **kwds):
|
|
Mod(name, cls='frappy_psi.ccu4.HeLevel',
|
|
description='the He Level', io=self.ccu(), **kwds)
|
|
|
|
def n2(self, name='N2_lev', valve='N2_valve', upper='N2_upper', lower='N2_lower', **kwds):
|
|
ccu_io = self.ccu()
|
|
Mod(name, cls='frappy_psi.ccu4.N2Level',
|
|
description='the N2 Level', io=ccu_io,
|
|
valve=valve, upper=upper, lower=lower)
|
|
Mod(valve, cls='frappy_psi.ccu4.N2FillValve',
|
|
description='LN2 fill valve', io=ccu_io)
|
|
Mod(upper, cls='frappy_psi.ccu4.N2TempSensor',
|
|
description='upper LN2 sensor')
|
|
Mod(lower, cls='frappy_psi.ccu4.N2TempSensor',
|
|
description='lower LN2 sensor')
|
|
|
|
def flow(self, hepump_io='hepump_io', hepump='hepump', hepump_mot='hepump_mot',
|
|
hepump_valve='hepump_valve', flow_sensor='flow_sensor', pump_pressure='pump_pressure',
|
|
nv='nv', **kwds):
|
|
"""creates needle valve and pump access if available"""
|
|
ccu_io = self.ccu()
|
|
hepump_type = self.config.get('hepump_type', 'unknown')
|
|
hepump_uri = self.config.get('hepump_uri')
|
|
if not hepump_uri:
|
|
flow_sensor = None
|
|
Mod(nv, 'frappy_psi.ccu4.NeedleValveFlow', 'flow from flow sensor or pump pressure',
|
|
flow_sensor=flow_sensor, pressure=pump_pressure, pump_type=hepump_type, io=ccu_io, **kwds)
|
|
Mod(pump_pressure, 'frappy_psi.ccu4.Pressure', 'He pump pressure', io=ccu_io)
|
|
if not flow_sensor:
|
|
self.log.warning('no pump remote control, no flow meter - using flow from pressure alone')
|
|
return
|
|
Mod(hepump_io, 'frappy.io.BytesIO', 'He pump connection', uri=hepump_uri)
|
|
Mod(hepump, 'frappy_psi.hepump.HePump', 'He pump', pump_type=hepump_type,
|
|
valvemotor=hepump_mot, valve=hepump_valve)
|
|
Mod(hepump_mot, 'frappy_psi.hepump.Motor', 'He pump valve motor', io=hepump_io, maxcurrent=2.8)
|
|
Mod(hepump_valve, 'frappy_psi.butterflyvalve.Valve', 'He pump valve', motor=hepump_mot)
|
|
Mod(flow_sensor, 'frappy_psi.sensirion.FlowSensor', 'Flow Sensor', io=hepump_io, nsamples=160)
|
|
|
|
|
|
rackcfg = RackConfig(rack)
|
|
|
|
LakeShore = rackcfg.lakeshore
|
|
LsSensor = rackcfg.sensor
|
|
LsLoop = rackcfg.loop
|
|
LsHeater = rackcfg.heater
|
|
Ccu = rackcfg.ccu
|
|
HeLevel = rackcfg.he
|
|
N2Level = rackcfg.n2
|
|
Flow = rackcfg.flow
|