5 Commits
wip ... andrea

Author SHA1 Message Date
14ee1d66e0 kapillarheizung: state as of 2025-11-26
Comment from Markus:
This needs some redesign. We should not map each
logo parameter to a SECoP module.
I guess that the switch to turn off the heater should be
integrated into the heater modules.

In addition we have to rebase to the wip branch, which would need
a merged version of frappy_psi/logo.py.
2025-11-26 16:20:47 +01:00
6d90f02dd6 in the config file some modules required a frappy.logo.ControlHeater class
this class does not exist.
in the config these modules where now defined using the HeaterParam class
The Server starts, but iti is unclear if it fullfils the intended behaviour
Please fix
2025-03-20 15:25:22 +01:00
a26b7b69fa added description, remove not working cernox 2024-11-18 11:23:22 +01:00
d1d640805b improvements when plc is switched off
- stop continuous logging during reconnect
- include ._init() into lock
2024-06-11 14:42:02 +02:00
92a8dfac5d fixed heater level
- should be percent
- round to integer before writing to logo
2024-06-11 13:47:36 +02:00
2 changed files with 323 additions and 0 deletions

148
cfg/kapillarheizung_cfg.py Normal file
View File

@@ -0,0 +1,148 @@
Node('Gas10kA.psi.ch',
'LOGO for 10kBar Gas pressure stick',
interface='tcp://5001',
)
Mod('io',
'frappy_psi.logo.IO',
'',
ip_address = "192.168.1.1",
tcap_client = 0x3000,
tsap_server = 0x2000
)
Mod('PT10000',
'frappy_psi.logo.TempSensor',
'raw sensor value of T_p10k',
io = 'io',
vm_address ="VW0",
)
Mod('T_pt10k',
'frappy_psi.softcal.Sensor',
'temperature close to sample',
value=Param(unit='K'),
rawsensor='PT10000',
calib='pt10000e',
)
Mod('PT1000_oben',
'frappy_psi.logo.TempSensor',
'raw sensor value of T_top',
io = 'io',
vm_address ="VW2",
)
Mod('T_top',
'frappy_psi.softcal.Sensor',
'capillary temperature at highest position',
value=Param(unit='K'),
rawsensor='PT1000_oben',
calib='pt1000e',
)
Mod('PT1000_mitte',
'frappy_psi.logo.TempSensor',
'raw sensor value of T_mid',
io = 'io',
vm_address ="VW6",
)
Mod('T_mid',
'frappy_psi.softcal.Sensor',
'capillary temperature at mid position',
value=Param(unit='K'),
rawsensor='PT1000_mitte',
calib='pt1000e',
)
Mod('PT1000_unten',
'frappy_psi.logo.TempSensor',
'raw sensor value of T_bot',
io = 'io',
vm_address ="VW4",
)
Mod('T_bot',
'frappy_psi.softcal.Sensor',
'capillary temperature at lower position',
value=Param(unit='K'),
rawsensor='PT1000_unten',
calib='pt1000e',
)
Mod('Cernox',
'frappy_psi.logo.TempSensor',
'sensor',
io = 'io',
vm_address ="VW16",
)
Mod('T_cx',
'frappy_psi.softcal.Sensor',
'?',
value=Param(unit='K'),
rawsensor='Cernox',
calib='X174785',
)
Mod('HeaterLevelHigh',
'frappy_psi.logo.HeaterParam',
'heater param',
io = 'io',
vm_address = "VW10",
)
''' before 12.11.2024
Mod('HeaterState',
'frappy_psi.logo.ControlHeater',
'heater state',
io = 'io',
vm_address_on = "V1246.0",
vm_address_off = "V1246.1",
)
'''
"""
Mod('HeaterState',
'frappy_psi.logo.HeaterParam',
'heater state',
io = 'io',
vm_address = "V0.1",
)
Mod('HeaterState_a',
'frappy_psi.logo.HeaterParam',
'heater state a',
io = 'io',
vm_address = "V0.0",
)
"""
Mod('HeaterState',
'frappy_psi.logo.ControlHeater',
'heater state',
io = 'io',
vm_address = "V0.1",
)
Mod('HeaterState_a',
'frappy_psi.logo.ControlHeater',
'heater state a',
io = 'io',
vm_address = "V0.0",
)
# currently unused:
#Mod('HeaterLevelLow',
# 'frappy_psi.logo.HeaterParam',
# 'heater param',
# io = 'io',
# vm_address = "VW12",
#)

175
frappy_psi/logo.py Normal file
View File

@@ -0,0 +1,175 @@
# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Andrea Plank <andrea.plank@psi.ch>
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
import sys
import time
from ast import literal_eval
from threading import RLock
import snap7
from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator
from frappy.errors import CommunicationFailedError
class IO(Communicator):
tcap_client = Property('tcap_client', IntRange())
tsap_server = Property('tcap_server', IntRange())
ip_address = Property('numeric ip address', StringType())
_plc = None
_last_try = 0
def initModule(self):
self._lock = RLock()
super().initModule()
def _init(self):
if not self._plc:
if time.time() < self._last_try + 10:
raise CommunicationFailedError('logo PLC not reachable')
self._plc = snap7.logo.Logo()
prev_stderr = sys.stdout
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
try:
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
if self._plc.get_connected():
return
except Exception:
pass
finally:
sys.stderr = prev_stderr
self._plc = None
self._last_try = time.time()
raise CommunicationFailedError('logo PLC not reachable')
def communicate(self, cmd):
with self._lock:
self._init()
cmd = cmd.split(maxsplit=1)
if len(cmd) == 2:
self._plc.write(cmd[0], literal_eval(cmd[1]))
try:
return self._plc.read(cmd[0])
except Exception as e:
if self._plc:
self.log.exception('error in plc read')
self._plc = None
raise
class Snap7Mixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return self.io.communicate(vm_address)
def set_vm_value(self, vm_address, value):
return self.io.communicate(f'{vm_address} {value}')
class Pressure(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('pressure', datatype = FloatRange(unit = 'mbar'))
pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Valve(Snap7Mixin, Drivable):
vm_address_input = Property('VM address input', datatype= StringType())
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class FluidMachines(Snap7Mixin, Drivable):
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(Snap7Mixin, Writable):
vm_address = Property('VM address output', datatype= StringType())
target = Parameter('Heater target', datatype = FloatRange(unit='%'))
value = Parameter('Heater Param', datatype = FloatRange(unit='%'))
def read_value(self):
return self.get_vm_value(self.vm_address) * 0.1
def write_target(self, target):
return self.set_vm_value(self.vm_address, round(target * 10)) * 0.1
def read_status(self):
return IDLE, ''
class ControlHeater(Snap7Mixin, Writable):
vm_address = Property('VM address on switch', datatype= StringType())
target = Parameter('Heater state', datatype = BoolType())
value = Parameter('Heater state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''