add cetoni pump for flowsas project

This commit is contained in:
l_samenv 2024-04-03 11:31:38 +02:00
parent 70a31b5cae
commit adfb561308
2 changed files with 168 additions and 0 deletions

52
cfg/flowsas_cfg.py Normal file
View File

@ -0,0 +1,52 @@
Node('flowsas.psi.ch',
'flowsas test motors',
'tcp://5000',
)
#Mod('mot_io',
# 'frappy_psi.phytron.PhytronIO',
# 'io for motor control',
# uri = 'serial:///dev/ttyUSB0',
# )
#Mod('hmot',
# 'frappy_psi.phytron.Motor',
# 'horizontal axis',
# axis = 'X',
# io = 'mot_io',
# encoder_mode= 'NO',
# )
#Mod('vmot',
# 'frappy_psi.phytron.Motor',
# 'vertical axis',
# axis = 'Y',
# io = 'mot_io',
# encoder_mode= 'NO',
# )
Mod('syr_io',
'frappy_psi.cetoni_pump.LabCannBus',
'Module for bus',
deviceconfig = "/home/l_samenv/frappy/cetoniSDK/CETONI_SDK_Raspi_64bit_v20220627/config/dual_pumps",
)
Mod('syr1',
'frappy_psi.cetoni_pump.SyringePump',
'First syringe pump',
io='syr_io',
pump_name = "Nemesys_S_1_Pump",
valve_name = "Nemesys_S_1_Valve",
inner_diameter_set = 10,
piston_stroke_set = 60,
)
Mod('syr2',
'frappy_psi.cetoni_pump.SyringePump',
'Second syringe pump',
io='syr_io',
pump_name = "Nemesys_S_2_Pump",
valve_name = "Nemesys_S_2_Valve",
inner_diameter_set = 1,
piston_stroke_set = 60,
)

116
frappy_psi/cetoni_pump.py Normal file
View File

@ -0,0 +1,116 @@
libpath = '/home/l_samenv/frappy/cetoniSDK/CETONI_SDK_Raspi_64bit_v20220627/python/src/'
import sys
if libpath not in sys.path:
sys.path.append(libpath)
from frappy.core import Drivable, Readable, StringIO, HasIO, FloatRange, IntRange, StringType, BoolType, EnumType, \
Parameter, Property, PersistentParam, Command, IDLE, BUSY, ERROR, Attached
from qmixsdk import qmixbus
from qmixsdk import qmixpump
from qmixsdk import qmixvalve
from qmixsdk.qmixbus import UnitPrefix, TimeUnit
class LabCannBus(Readable):
deviceconfig = Property('config files', StringType(),default="/home/l_samenv/frappy/cetoniSDK/CETONI_SDK_Raspi_64bit_v20220627/config/dual_pumps")
def earlyInit(self):
super().earlyInit()
self.bus = qmixbus.Bus()
self.bus.open(self.deviceconfig, "")
def initModule(self):
super().initModule()
self.bus.start()
def shutdownModule(self):
"""Not so gracefully close the connection"""
self.bus.stop()
self.bus.close()
class SyringePump(Drivable):
io = Attached()
pump_name = Property('name of pump', StringType(),default="Nemesys_S_1_Pump")
valve_name = Property('name of valve', StringType(),default="Nemesys_S_1_Valve")
inner_diameter_set = Property('inner diameter', FloatRange(), default=1)
piston_stroke_set = Property('piston stroke', FloatRange(), default=60)
value = PersistentParam('volume', FloatRange(unit='mL'))
status = PersistentParam()
max_flow_rate = Parameter('max flow rate', FloatRange(0,100000, unit='mL/min',), readonly=True)
max_volume = Parameter('max volume', FloatRange(0,100000, unit='mL',), readonly=True)
target_flow_rate = Parameter('target flow rate', FloatRange(unit='mL/min'), readonly=False)
real_flow_rate = Parameter('actual flow rate', FloatRange(unit='mL/min'), readonly=True)
target = Parameter('target volume', FloatRange(unit='mL'), readonly=False)
no_of_valve_pos = Property('number of valve positions', IntRange(0,10), default=1)
valve_pos = Parameter('valve position', EnumType('valve', CLOSED=0, APP=1, RES=2, OPEN=3), readonly=False)
def initModule(self):
super().initModule()
self.pump = qmixpump.Pump()
self.pump.lookup_by_name(self.pump_name)
self.valve = qmixvalve.Valve()
self.valve.lookup_by_name(self.valve_name)
def initialReads(self):
if self.pump.is_in_fault_state():
self.pump.clear_fault()
if not self.pump.is_enabled():
self.pump.enable(True)
self.pump.set_syringe_param(self.inner_diameter_set, self.piston_stroke_set)
self.pump.set_volume_unit(qmixpump.UnitPrefix.milli, qmixpump.VolumeUnit.litres)
self.pump.set_flow_unit(qmixpump.UnitPrefix.milli, qmixpump.VolumeUnit.litres, qmixpump.TimeUnit.per_minute)
self.max_flow_rate = self.pump.get_flow_rate_max()
self.max_volume = self.pump.get_volume_max()
self.no_of_valve_pos = self.valve.number_of_valve_positions()
self.valve_pos = self.valve.actual_valve_position()
self.target_flow_rate = self.max_flow_rate * 0.5
self.target = self.pump.get_fill_level()
def read_value(self):
return self.pump.get_fill_level()
def write_target(self, target):
self.pump.set_fill_level(target, self.target_flow_rate)
self.status = BUSY, 'Target changed'
return target
def write_target_flow_rate(self, rate):
self.pump.target_flow_rate = rate
return rate
def read_real_flow_rate(self):
return self.pump.get_flow_is()
def read_valve_pos(self):
return self.valve.actual_valve_position()
def write_valve_pos(self, target_pos):
self.valve.switch_valve_to_position(target_pos)
return target_pos
def read_status(self):
fault_state = self.pump.is_in_fault_state()
pumping = self.pump.is_pumping()
if fault_state == True:
return ERROR, 'Pump in fault state'
elif pumping == True:
return BUSY, 'Pumping'
else:
return IDLE, ''
@Command
def stop(self):
self.pump.stop_pumping()
self.target = self.pump.get_fill_level()