frappy_psi: add bluefors

This commit is contained in:
2026-03-11 09:47:15 +01:00
parent 3661079cba
commit 8d115b888e
+134
View File
@@ -0,0 +1,134 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Anik Stark <anik.stark@psi.ch>
# *****************************************************************************
"""BlueFors gas handling system"""
from frappy.core import StringIO, Readable, Writable, HasIO, Parameter, Property, \
BoolType, FloatRange, IntRange, StringType
from frappy.errors import CommunicationFailedError
class IO(StringIO):
end_of_line = '\n'
identification = [('names', '.*')] # ? TODO ln 7
def doPoll(self):
reply = self.communicate('status')[5:] # reply expected in the shape of 'v1=1,v2=0,...'
v_list = reply.split(',')
valve_dict = {}
for element in v_list:
name, value = element.split('=')
valve_dict[name] = int(value)
class SimpleValve(Readable, HasIO):
ioClass = IO
value = Parameter('state of valve (open/close)', datatype=BoolType())
addr = Property('valve address', datatype=StringType(), default='') # TODO type?
def initModule(self):
super().initModule()
if not self.addr:
self.setProperty('addr', self.name)
def read_value(self):
if self.io.valve_dict is None:
self.io.doPoll()
return self.io.valve_dict[self.addr]
class Valve(SimpleValve, Writable):
ioClass = IO
target = Parameter('target state of valve (open/close)', datatype=BoolType(), readonly=False)
value = Parameter('state of valve (open/close)', datatype=BoolType())
def write_target(self, target):
if self.addr not in ['V15', 'V17', 'V18']:
self.communicate('control 1')
self.communicate('remote 1')
cmd = 'on' if target else 'off'
self.communicate(f'{cmd} {self.addr}')
self.communicate('remote 0')
self.communicate('control 0')
return self.read_value()
# PRESSURE_MAP = {'pivc': 1,
# 'pstill': 2,
# 'pcond': 3,
# 'phep': 4,
# 'pdump': 5,
# 'paux': 6,
# }
class Pressure(Readable, HasIO):
ioClass = IO
value = Parameter('pressure value', dataType=FloatRange(unit='mbar'))
addr = Property('pressure device', datatype=IntRange(1,6))
def read_value(self):
reply, txtval = self.communicate(f'mgstatus {self.addr}').split()
if reply != 'S05:':
raise CommunicationFailedError(f'bad reply: {reply}')
return float(txtval)
class Flow(Readable, HasIO):
ioClass = IO
value = Parameter('flow value', datatype=FloatRange(unit='')) # TODO: unit = ln/min ?
def read_value(self):
reply, txtval = self.communicate('fmstatus').split()
if reply != 'S09:':
raise CommunicationFailedError(f'bad reply: {reply}')
return float(txtval)
# class Turbo_IO(StringIO):
# end_of_line = '\r'
# identification = [('0010031202=?101', '.*')] # TODO: check reply
# class Turbo(Writable, HasIO):
# """Pfeiffer TC400 telegram structure
# command: a2a1a0*0n2n1n0l1l0dnd0c2c1c0
# a2a1a0 address (e.g. 001)
# *0 action: data request 00, control command 10
# n2n1n0 Pfeiffer Vacuum parameter numbers
# l1l0 data length of dddd (has variable length)
# dnd0 data in type concerned (has variable lenght, from n to 0)
# c1c0 checksum (sum of ASCII values from a2 to d0, modulo 256) """
# ioClass = Turbo_IO
# value = Parameter('state of turbo (on/off)', datatype=BoolType())
# target = Parameter('target state (on/off)', datatype=BoolType())
# dev_addr = Property('device address', datatype=IntRange(), default=001)
# def write_target(self, target):
# cmd = f'{self.dev_addr}10010'
# ctr_sum = sum(ord(c) for c in cmd)
# self.communicate()