3 Commits

Author SHA1 Message Date
1bb869b43e capillary heater: heater is now a writable
The value should show the actual heater power, but
we do not know yet the address. Currently the value
is just equal to the maxheater parameter.
2025-11-27 17:55:52 +01:00
3ede9eb9f4 frappy_psi.logo: revised version after merging capillary heater
- check that dil5 still works!

Change-Id: Ibe98e64088f2f886888af170a1f38d699927eb58
2025-11-27 09:32:37 +01:00
f57400feb9 frappy_psi.picontrol: stop when switching to manual mode
Change-Id: I3ffb9a109fb3b04fbca06f5a72acbfbd19525aae
2025-11-27 07:49:29 +01:00
5 changed files with 206 additions and 195 deletions

View File

@@ -83,38 +83,37 @@ Mod('compressor',
) )
Mod('p2', Mod('p2',
'frappy_psi.logo.Value', 'frappy_psi.logo.Pressure',
'pressure after compressor', 'pressure after compressor',
io = 'logo', io = 'logo',
addr ="VW0", addr ="VW0",
value = Param(unit='mbar'), pollinterval=0.5,
) )
Mod('p1', Mod('p1',
'frappy_psi.logo.Value', 'frappy_psi.logo.Pressure',
'dump pressure', 'dump pressure',
io = 'logo', io = 'logo',
addr ="VW28", addr ="VW28",
value = Param(unit='mbar'), pollinterval=0.5,
) )
Mod('p5', Mod('p5',
'frappy_psi.logo.Value', 'frappy_psi.logo.Pressure',
'pressure after forepump', 'pressure after forepump',
io = 'logo', io = 'logo',
addr ="VW4", addr ="VW4",
value = Param(unit='mbar'), pollinterval = 0.5,
) )
Mod('airpressure', Mod('airpressure',
'frappy_psi.logo.DigitalValue', 'frappy_psi.logo.Comparator',
'Airpressure state', 'Airpressure state',
io = 'logo', io = 'logo',
addr ="V1024.7", addr ="V1024.7",
) threshold = 500,
pollinterval = 0.5,
)
Mod('io_pfeiffer', Mod('io_pfeiffer',
'frappy_psi.pfeiffer_new.PfeifferProtocol', 'frappy_psi.pfeiffer_new.PfeifferProtocol',

94
cfg/gas10ka_cfg.py Normal file
View File

@@ -0,0 +1,94 @@
Node('gas10ka.psi.ch',
'10kBar Gas pressure stick',
interface='tcp://5010',
)
Mod('io',
'frappy_psi.logo.IO',
'',
ip_address = "192.168.1.1",
tcap_client = 0x3000,
tsap_server = 0x2000
)
Mod('R_pt10k',
'frappy_psi.logo.Resistor',
'raw sensor value of T_p10k',
io = 'io',
addr = "VW0",
)
Mod('T_pt10k',
'frappy_psi.softcal.Sensor',
'temperature close to sample',
value=Param(unit='K'),
rawsensor='R_pt10k',
calcurve='pt10000e',
)
Mod('R_top',
'frappy_psi.logo.Resistor',
'raw sensor value of T_top',
io = 'io',
addr = "VW2",
)
Mod('T_top',
'frappy_psi.softcal.Sensor',
'capillary temperature at highest position',
value=Param(unit='K'),
rawsensor='R_top',
calcurve='pt1000e',
)
Mod('R_mid',
'frappy_psi.logo.Resistor',
'raw sensor value of T_mid',
io = 'io',
addr = "VW6",
)
Mod('T_mid',
'frappy_psi.softcal.Sensor',
'capillary temperature at mid position',
value=Param(unit='K'),
rawsensor='R_mid',
calcurve='pt1000e',
)
Mod('R_bot',
'frappy_psi.logo.Resistor',
'raw sensor value of T_bot',
io = 'io',
addr = "VW4",
)
Mod('T_bot',
'frappy_psi.softcal.Sensor',
'capillary temperature at lower position',
value=Param(unit='K'),
rawsensor='R_bot',
calcurve='pt1000e',
)
Mod('R_sam_cx',
'frappy_psi.logo.Resistor',
'sensor',
io = 'io',
addr = "VW16",
)
Mod('T_sam_cx',
'frappy_psi.softcal.Sensor',
'?',
value=Param(unit='K'),
rawsensor='R_sam_cx',
calcurve='X174785',
)
Mod('heater',
'frappy_psi.capillary_heater.Heater',
'the capillary heater',
io = 'io',
)

View File

@@ -0,0 +1,53 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
from frappy.core import Writable, BoolType, FloatRange, HasIO, Property, StringType, Parameter
from frappy_psi.logo import Sensor
class Heater(Sensor):
addr = 'VW10' # heater readback address
scale = 0.1
switch_addr = Property('address for switch', StringType(), default='V0.1')
enable_addr = Property('address for enable', StringType(), default='V0.0')
maxheater_addr = Property('address for target', StringType(), default='VW10')
value = Parameter(unit='%')
switch = Parameter('heater is enabled', BoolType())
enable = Parameter('heater enabled', BoolType(), readonly=False)
maxheater = Parameter('max. heater power', FloatRange(unit='%'), readonly=False)
def read_switch(self):
return self.get_vm_value(self.switch_addr)
def read_enable(self):
return self.get_vm_value(self.enable_addr)
def write_enable(self, value):
self.set_vm_value(self.enable_addr, value)
return self.read_enable()
def read_maxheater(self):
return self.get_vm_value(self.maxheater_addr, self.scale)
def write_maxheater(self, value):
self.io.set_vm_value(self.maxheater_addr, value, self.scale)
return self.read_maxheater()

View File

@@ -21,8 +21,9 @@ import sys
from time import monotonic from time import monotonic
from ast import literal_eval from ast import literal_eval
import snap7 import snap7
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \ from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType IDLE, BUSY, WARN, ERROR, Writable, Drivable, Communicator
from frappy.datatypes import StringType, BoolType, IntRange, NoneOr, Int32
from frappy.errors import CommunicationFailedError, ConfigError from frappy.errors import CommunicationFailedError, ConfigError
from threading import RLock from threading import RLock
@@ -80,11 +81,16 @@ class IO(Communicator):
class LogoMixin(HasIO): class LogoMixin(HasIO):
ioclass = IO ioclass = IO
def get_vm_value(self, vm_address): def get_vm_value(self, addr, scale=None):
return literal_eval(self.io.communicate(vm_address)) if scale is None:
return int(self.io.communicate(addr))
return float(self.io.communicate(addr)) * scale
def set_vm_value(self, vm_address, value): def set_vm_value(self, addr, value, scale=None):
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}')) if scale is None:
return int(self.io.communicate(f'{addr} {value}'))
reply = self.io.communicate(f'{addr} {round(value / scale)}')
return int(reply) * scale
class DigitalActuator(LogoMixin, Writable): class DigitalActuator(LogoMixin, Writable):
@@ -219,195 +225,47 @@ class DelayedActuator(DigitalActuator, Drivable):
self._pulse_end = now + delay self._pulse_end = now + delay
class Value(LogoMixin, Readable): class Sensor(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType()) addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
def read_value(self): def read_value(self):
return self.get_vm_value(self.addr) return self.get_vm_value(self.addr, self.scale)
def read_status(self): def read_status(self):
return IDLE, '' return IDLE, ''
class DigitalValue(Value): class AnalogOutput(Sensor, Writable):
value = Parameter('airpressure state', datatype=BoolType()) output_addr = Property('VM address output', datatype=StringType(), default='')
def checkProperties(self):
super().checkProperties()
if not self.output_addr:
self.output_addr = self.addr
def read_value(self):
return self.get_vm_value(self.addr, self.scale)
def write_target(self, target):
return self.set_vm_value(self.output_addr, target, self.scale)
# TODO: the following classes are too specific, they have to be moved class Pressure(Sensor):
class Pressure(LogoMixin, Drivable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('pressure', datatype=FloatRange(unit='mbar')) value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
# pollinterval = 0.5
def read_value(self): class Resistor(Sensor):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Airpressure(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('airpressure state', datatype=BoolType())
# pollinterval = 0.5
def read_value(self):
if (self.get_vm_value(self.vm_address) > 500):
return 1
else:
return 0
def read_status(self):
return IDLE, ''
class Valve(LogoMixin, Drivable):
vm_address_input = Property('VM address input', datatype=StringType())
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Value state', datatype=BoolType())
_remaining_tries = None
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
self.set_vm_value(self.vm_address_output, target)
self._remaining_tries = 5
self.status = BUSY, 'switching'
self.setFastPoll(True, 0.5)
def read_status(self):
self.log.debug('read_status')
value = self.read_value()
self.log.debug('value %d target %d', value, self.target)
if value != self.target:
if self._remaining_tries is None:
self.target = self.read_value()
return IDLE, ''
self._remaining_tries -= 1
if self._remaining_tries < 0:
self.setFastPoll(False)
return ERROR, 'too many tries to switch'
self.set_vm_value(self.vm_address_output, self.target)
return BUSY, 'switching (try again)'
self.setFastPoll(False)
return IDLE, ''
class FluidMachines(LogoMixin, Drivable):
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Valve state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('resistance', datatype=FloatRange(unit='Ohm')) value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self): class Comparator(LogoMixin, Readable):
return IDLE, '' addr = Property('VM address', datatype=StringType())
scale = Property('scale to multiply with raw integer value',
NoneOr(FloatRange()), default=None)
class HeaterParam(LogoMixin, Writable): value = Parameter('airpressure state', datatype=BoolType())
vm_address = Property('VM address output', datatype=StringType()) threshold = Property('threshold for True', FloatRange())
target = Parameter('Heater target', datatype=IntRange())
value = Parameter('Heater Param', datatype=IntRange())
def read_value(self): def read_value(self):
return self.get_vm_value(self.vm_address) return self.get_vm_value(self.addr, self.scale) > self.threshold
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class controlHeater(LogoMixin, Writable):
vm_address = Property('VM address on switch', datatype=StringType())
target = Parameter('Heater state', datatype=BoolType())
value = Parameter('Heater state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_on)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''
class safetyfeatureState(LogoMixin, Readable):
vm_address = Property('VM address state', datatype=StringType())
value = Parameter('safety Feature state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class safetyfeatureParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class comparatorgekoppeltParam(LogoMixin, Writable):
vm_address_1 = Property('VM address output', datatype=StringType())
vm_address_2 = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address_1)
def write_target(self, target):
self.set_vm_value(self.vm_address_1, target)
return self.set_vm_value(self.vm_address_2, target)
def read_status(self):
return IDLE, ''

View File

@@ -61,7 +61,7 @@ example cfg:
import time import time
import math import math
import numpy as np import numpy as np
from frappy.core import Readable, Writable, Parameter, Attached, IDLE, Property from frappy.core import Readable, Writable, Parameter, Attached, IDLE, WARN, Property
from frappy.lib import clamp, merge_status from frappy.lib import clamp, merge_status
from frappy.datatypes import LimitsType, EnumType, FloatRange from frappy.datatypes import LimitsType, EnumType, FloatRange
from frappy.errors import SECoPError from frappy.errors import SECoPError
@@ -296,6 +296,13 @@ class PI(HasConvergence, PImixin):
super().write_target(target) super().write_target(target)
self.convergence_start() self.convergence_start()
def deactivate_control(self, source=None):
super().deactivate_control(source)
if self.isBusy():
self.convergence_state.stop_status = (
WARN, f'switched to manual mode by {source or self.name}')
self.convergence_state.start(self.convergence_interrupt)
# unchecked! # unchecked!
class PI2(PI): class PI2(PI):