From 9440b1fe92f72a3fe69d4ed7b71727ba224db8a8 Mon Sep 17 00:00:00 2001 From: Anik Stark Date: Wed, 18 Feb 2026 11:12:54 +0100 Subject: [PATCH] frappy_psi: add leigenghs, limits and turbo pump not implemented --- frappy_psi/leidenghs.py | 232 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) create mode 100644 frappy_psi/leidenghs.py diff --git a/frappy_psi/leidenghs.py b/frappy_psi/leidenghs.py new file mode 100644 index 00000000..822251d9 --- /dev/null +++ b/frappy_psi/leidenghs.py @@ -0,0 +1,232 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Anik Stark +# ***************************************************************************** +"""dilution refrigerator: Leiden Cryogenics GHS-2T-1T-700 + +! writing limits does not work (reading does) +! communication with turbo pumps is not implemented +""" + +from frappy.core import StringIO, HasIO, Readable, Writable, Parameter, Property, EnumType, BoolType, FloatRange +from frappy.errors import CommunicationFailedError + +P_SCALE = 2**14 / 1e4 + +class IO(StringIO): + end_of_line = '\n' + identification = [('ID?', '0\\tGHS-2T-1T-700-CF.*')] + default_settings = {'baudrate': 9600} + _key_status = None + _pressure = None + _pressure_limits = None + + def initModule(self): + self.modules = [] + super().initModule() + + def doPoll(self): + # read key (valve) states + reply = self.communicate('KEYS?') + status, key_status = reply.split('\t') + self._key_status = key_status.split(',') + if status != '0': + raise CommunicationFailedError(f'bad reply: {reply}') + # read pressure values + reply = self.communicate('ADC?') + status, pressure = reply.split('\t') + if status != '0': + raise CommunicationFailedError(f'bad reply: {reply}') + self._pressure = pressure.split(',') + # read pressure limits + status, reply = self.communicate('SETTINGS?').split('\t') + if status != '0': + raise CommunicationFailedError(f'bad reply: {reply}') + self._pressure_limits = reply.split(',') + # read value for all registred modules + for module in self.modules: + module.update_values() + + def register(self, module): + self.modules.append(module) + + +class Base(HasIO): + + def initModule(self): + self.io.register(self) + super().initModule() + + def change(self, cmd, key): + reply = self.communicate(f'{cmd} {key}') + if not reply.startswith('0'): + raise CommunicationFailedError(f'bad reply: {reply}') + + def get_key(self, key): + return self.io._key_status[key - 1] == '2' + + def get_pressure(self, addr): + return float(self.io._pressure[addr]) / P_SCALE + + def update_values(self): + self.read_value() + + +KEY_MAP = {'mix': 1, # mixture compressor + 'bypass': 2, + '15': 3, + '16': 4, + 'gate18': 6, + '4': 7, + '5': 9, + 'reset': 10, + '9': 12, + '14': 13, + '13': 15, + 'ledtest': 16, + '12': 18, + '10': 19, + '11': 21, + 'A0': 22, + '7': 24, + 'S3': 25, + '6': 27, + '17': 28, + '8': 30, + '1': 31, + 'S2': 33, + '3': 34, + '2': 36, + '0': 37, + 'S1': 39, + 'A9': 40, + 'A8': 42, + 'A2': 43, + 'S5': 45, + 'A10': 46, + 'start': 48, + 'A5': 49, + 'A7': 51, + 'auto': 54, + 'A4': 55, + 'A6': 57, + 'S4': 58, + 'A3': 60, + 'He3': 61, # condense He3 + 'He4': 62, # condense He4 + 'circulation': 63, # normal circulation + 'recovery': 64, + } + + +class Valve(Base, Writable): + + ioClass = IO + + target = Parameter('target state of valve', datatype=BoolType(), readonly=False) + value = Parameter('status of valve (open/close)', datatype=BoolType()) + key = Property('key (button) number', datatype=EnumType(KEY_MAP)) + + def write_target(self, target): + """toggle button state (as if pressed manually on the panel)""" + self.io.doPoll() + state = self.read_value() + if state != target: + self.change('DEVMAN', self.key.value) + self.io.doPoll() + self.read_value() + + def read_value(self): + if self.io._key_status is None: + self.io.doPoll() + return self.get_key(self.key.value) + + +PRESSURE_MAP = {'P1': 0, + 'P2': 1, + 'P3': 2, + 'P4': 3, + 'P5': 4, + 'P6': 5, + 'P7': 6, + 'flow': 7, + } + + +class Pressure(Base, Readable): + + ioClass = IO + + value = Parameter('pressure and flow values', datatype=FloatRange(unit='mbar')) + addr = Property('address of pressure sensor', datatype=EnumType(PRESSURE_MAP)) + + def read_value(self): + value = self.get_pressure(self.addr.value) + return value + + +class PressureLimit(Pressure, Readable): + """P6 and P7 pressure sensors of dilution refrigerator (Leiden Cryogenics GHS-2T-1T-700) + have pressure limit settings""" + + ioClass = IO + + value = Parameter('pressure values for sensors with limit', datatype=FloatRange(unit='mbar')) + offset = Property('address of pressure sensor', datatype=EnumType({'P6': 0, 'P7': 2})) + limit_low = Parameter('low pressure limit', datatype=FloatRange(0, 2000, unit='mbar'), readonly=False) + limit_high = Parameter('high pressure limit', datatype=FloatRange(0, 2000, unit='mbar'), readonly=False) + + def get_pressure_limit(self, lim): + return float(self.io._pressure_limits[lim]) + + def set_limit(self, pos, limit): + limit_list = self.io._pressure_limits + limit_list[pos] = f'{limit:.0f}' + reply = self.communicate(f'SETTINGS {",".join(limit_list)},0') + if reply != '0': + raise CommunicationFailedError(f'bad reply: {reply}') + self.io.doPoll() + + def read_limit_low(self): + return self.get_pressure_limit(self.offset.value) + + def read_limit_high(self): + return self.get_pressure_limit(self.offset.value + 1) + #self._P6_low, self._P6_high, self._P7_low, self._P7_high = reply.split(',') + + def write_limit_low(self, limit): + return self.set_limit(self.offset.value, limit) + + def write_limit_high(self, limit): + return self.set_limit(self.offset.value + 1, limit) + + def update_values(self): + self.read_value() + self.read_limit_low() + self.read_limit_high() + + +class Turbo(Base, Writable): + + ioClass = IO + + target = Parameter('target state of turbo pump', datatype=EnumType(off=0, on=1)) + value = Parameter('state of turbo pump', datatype=EnumType(off=0, on=1)) + addr = Parameter('turbo pump address', datatype=EnumType(turbo1=1, turbo2=2)) + + # def write_target(self, target): + # self.change(f'TURBO ON/OFF {target}', addr)