Files
frappy/frappy_psi/leidenghs.py

233 lines
7.2 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Anik Stark <anik.stark@psi.ch>
# *****************************************************************************
"""dilution refrigerator: Leiden Cryogenics GHS-2T-1T-700
! writing limits does not work (reading does)
! communication with turbo pumps is not implemented
"""
from frappy.core import StringIO, HasIO, Readable, Writable, Parameter, Property, EnumType, BoolType, FloatRange
from frappy.errors import CommunicationFailedError
P_SCALE = 2**14 / 1e4
class IO(StringIO):
end_of_line = '\n'
identification = [('ID?', '0\\tGHS-2T-1T-700-CF.*')]
default_settings = {'baudrate': 9600}
_key_status = None
_pressure = None
_pressure_limits = None
def initModule(self):
self.modules = []
super().initModule()
def doPoll(self):
# read key (valve) states
reply = self.communicate('KEYS?')
status, key_status = reply.split('\t')
self._key_status = key_status.split(',')
if status != '0':
raise CommunicationFailedError(f'bad reply: {reply}')
# read pressure values
reply = self.communicate('ADC?')
status, pressure = reply.split('\t')
if status != '0':
raise CommunicationFailedError(f'bad reply: {reply}')
self._pressure = pressure.split(',')
# read pressure limits
status, reply = self.communicate('SETTINGS?').split('\t')
if status != '0':
raise CommunicationFailedError(f'bad reply: {reply}')
self._pressure_limits = reply.split(',')
# read value for all registred modules
for module in self.modules:
module.update_values()
def register(self, module):
self.modules.append(module)
class Base(HasIO):
def initModule(self):
self.io.register(self)
super().initModule()
def change(self, cmd, key):
reply = self.communicate(f'{cmd} {key}')
if not reply.startswith('0'):
raise CommunicationFailedError(f'bad reply: {reply}')
def get_key(self, key):
return self.io._key_status[key - 1] == '2'
def get_pressure(self, addr):
return float(self.io._pressure[addr]) / P_SCALE
def update_values(self):
self.read_value()
KEY_MAP = {'mix': 1, # mixture compressor
'bypass': 2,
'15': 3,
'16': 4,
'gate18': 6,
'4': 7,
'5': 9,
'reset': 10,
'9': 12,
'14': 13,
'13': 15,
'ledtest': 16,
'12': 18,
'10': 19,
'11': 21,
'A0': 22,
'7': 24,
'S3': 25,
'6': 27,
'17': 28,
'8': 30,
'1': 31,
'S2': 33,
'3': 34,
'2': 36,
'0': 37,
'S1': 39,
'A9': 40,
'A8': 42,
'A2': 43,
'S5': 45,
'A10': 46,
'start': 48,
'A5': 49,
'A7': 51,
'auto': 54,
'A4': 55,
'A6': 57,
'S4': 58,
'A3': 60,
'He3': 61, # condense He3
'He4': 62, # condense He4
'circulation': 63, # normal circulation
'recovery': 64,
}
class Valve(Base, Writable):
ioClass = IO
target = Parameter('target state of valve', datatype=BoolType(), readonly=False)
value = Parameter('status of valve (open/close)', datatype=BoolType())
key = Property('key (button) number', datatype=EnumType(KEY_MAP))
def write_target(self, target):
"""toggle button state (as if pressed manually on the panel)"""
self.io.doPoll()
state = self.read_value()
if state != target:
self.change('DEVMAN', self.key.value)
self.io.doPoll()
self.read_value()
def read_value(self):
if self.io._key_status is None:
self.io.doPoll()
return self.get_key(self.key.value)
PRESSURE_MAP = {'P1': 0,
'P2': 1,
'P3': 2,
'P4': 3,
'P5': 4,
'P6': 5,
'P7': 6,
'flow': 7,
}
class Pressure(Base, Readable):
ioClass = IO
value = Parameter('pressure and flow values', datatype=FloatRange(unit='mbar'))
addr = Property('address of pressure sensor', datatype=EnumType(PRESSURE_MAP))
def read_value(self):
value = self.get_pressure(self.addr.value)
return value
class PressureLimit(Pressure, Readable):
"""P6 and P7 pressure sensors of dilution refrigerator (Leiden Cryogenics GHS-2T-1T-700)
have pressure limit settings"""
ioClass = IO
value = Parameter('pressure values for sensors with limit', datatype=FloatRange(unit='mbar'))
offset = Property('address of pressure sensor', datatype=EnumType({'P6': 0, 'P7': 2}))
limit_low = Parameter('low pressure limit', datatype=FloatRange(0, 2000, unit='mbar'), readonly=False)
limit_high = Parameter('high pressure limit', datatype=FloatRange(0, 2000, unit='mbar'), readonly=False)
def get_pressure_limit(self, lim):
return float(self.io._pressure_limits[lim])
def set_limit(self, pos, limit):
limit_list = self.io._pressure_limits
limit_list[pos] = f'{limit:.0f}'
reply = self.communicate(f'SETTINGS {",".join(limit_list)},0')
if reply != '0':
raise CommunicationFailedError(f'bad reply: {reply}')
self.io.doPoll()
def read_limit_low(self):
return self.get_pressure_limit(self.offset.value)
def read_limit_high(self):
return self.get_pressure_limit(self.offset.value + 1)
#self._P6_low, self._P6_high, self._P7_low, self._P7_high = reply.split(',')
def write_limit_low(self, limit):
return self.set_limit(self.offset.value, limit)
def write_limit_high(self, limit):
return self.set_limit(self.offset.value + 1, limit)
def update_values(self):
self.read_value()
self.read_limit_low()
self.read_limit_high()
class Turbo(Base, Writable):
ioClass = IO
target = Parameter('target state of turbo pump', datatype=EnumType(off=0, on=1))
value = Parameter('state of turbo pump', datatype=EnumType(off=0, on=1))
addr = Parameter('turbo pump address', datatype=EnumType(turbo1=1, turbo2=2))
# def write_target(self, target):
# self.change(f'TURBO ON/OFF {target}', addr)