From 8d115b888efe6aadc876013daa0bd8d11f5e573e Mon Sep 17 00:00:00 2001 From: Anik Stark Date: Wed, 11 Mar 2026 09:47:15 +0100 Subject: [PATCH] frappy_psi: add bluefors --- frappy_psi/bluefors.py | 134 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 frappy_psi/bluefors.py diff --git a/frappy_psi/bluefors.py b/frappy_psi/bluefors.py new file mode 100644 index 00000000..d09f111f --- /dev/null +++ b/frappy_psi/bluefors.py @@ -0,0 +1,134 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Anik Stark +# ***************************************************************************** +"""BlueFors gas handling system""" + +from frappy.core import StringIO, Readable, Writable, HasIO, Parameter, Property, \ + BoolType, FloatRange, IntRange, StringType +from frappy.errors import CommunicationFailedError + + +class IO(StringIO): + end_of_line = '\n' + identification = [('names', '.*')] # ? TODO ln 7 + + def doPoll(self): + reply = self.communicate('status')[5:] # reply expected in the shape of 'v1=1,v2=0,...' + v_list = reply.split(',') + valve_dict = {} + for element in v_list: + name, value = element.split('=') + valve_dict[name] = int(value) + + +class SimpleValve(Readable, HasIO): + + ioClass = IO + + value = Parameter('state of valve (open/close)', datatype=BoolType()) + addr = Property('valve address', datatype=StringType(), default='') # TODO type? + + def initModule(self): + super().initModule() + if not self.addr: + self.setProperty('addr', self.name) + + def read_value(self): + if self.io.valve_dict is None: + self.io.doPoll() + return self.io.valve_dict[self.addr] + + +class Valve(SimpleValve, Writable): + + ioClass = IO + + target = Parameter('target state of valve (open/close)', datatype=BoolType(), readonly=False) + value = Parameter('state of valve (open/close)', datatype=BoolType()) + + def write_target(self, target): + if self.addr not in ['V15', 'V17', 'V18']: + self.communicate('control 1') + self.communicate('remote 1') + cmd = 'on' if target else 'off' + self.communicate(f'{cmd} {self.addr}') + self.communicate('remote 0') + self.communicate('control 0') + return self.read_value() + + +# PRESSURE_MAP = {'pivc': 1, +# 'pstill': 2, +# 'pcond': 3, +# 'phep': 4, +# 'pdump': 5, +# 'paux': 6, +# } + +class Pressure(Readable, HasIO): + + ioClass = IO + + value = Parameter('pressure value', dataType=FloatRange(unit='mbar')) + addr = Property('pressure device', datatype=IntRange(1,6)) + + def read_value(self): + reply, txtval = self.communicate(f'mgstatus {self.addr}').split() + if reply != 'S05:': + raise CommunicationFailedError(f'bad reply: {reply}') + return float(txtval) + + +class Flow(Readable, HasIO): + + ioClass = IO + + value = Parameter('flow value', datatype=FloatRange(unit='')) # TODO: unit = ln/min ? + + def read_value(self): + reply, txtval = self.communicate('fmstatus').split() + if reply != 'S09:': + raise CommunicationFailedError(f'bad reply: {reply}') + return float(txtval) + + +# class Turbo_IO(StringIO): +# end_of_line = '\r' +# identification = [('0010031202=?101', '.*')] # TODO: check reply + + +# class Turbo(Writable, HasIO): +# """Pfeiffer TC400 telegram structure +# command: a2a1a0*0n2n1n0l1l0dnd0c2c1c0 +# a2a1a0 address (e.g. 001) +# *0 action: data request 00, control command 10 +# n2n1n0 Pfeiffer Vacuum parameter numbers +# l1l0 data length of dddd (has variable length) +# dnd0 data in type concerned (has variable lenght, from n to 0) +# c1c0 checksum (sum of ASCII values from a2 to d0, modulo 256) """ + +# ioClass = Turbo_IO + +# value = Parameter('state of turbo (on/off)', datatype=BoolType()) +# target = Parameter('target state (on/off)', datatype=BoolType()) +# dev_addr = Property('device address', datatype=IntRange(), default=001) + +# def write_target(self, target): +# cmd = f'{self.dev_addr}10010' +# ctr_sum = sum(ord(c) for c in cmd) +# self.communicate() \ No newline at end of file