From 3ede9eb9f4f782593b6f14fac5b75503562c649e Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Thu, 27 Nov 2025 09:00:07 +0100 Subject: [PATCH] frappy_psi.logo: revised version after merging capillary heater - check that dil5 still works! Change-Id: Ibe98e64088f2f886888af170a1f38d699927eb58 --- cfg/dil5_cfg.py | 27 +++--- frappy_psi/logo.py | 216 ++++++++------------------------------------- 2 files changed, 50 insertions(+), 193 deletions(-) diff --git a/cfg/dil5_cfg.py b/cfg/dil5_cfg.py index 3b8065b0..d1e30414 100644 --- a/cfg/dil5_cfg.py +++ b/cfg/dil5_cfg.py @@ -83,38 +83,37 @@ Mod('compressor', ) Mod('p2', - 'frappy_psi.logo.Value', + 'frappy_psi.logo.Pressure', 'pressure after compressor', io = 'logo', addr ="VW0", - value = Param(unit='mbar'), -) + pollinterval=0.5, + ) Mod('p1', - 'frappy_psi.logo.Value', + 'frappy_psi.logo.Pressure', 'dump pressure', io = 'logo', addr ="VW28", - value = Param(unit='mbar'), -) + pollinterval=0.5, + ) Mod('p5', - 'frappy_psi.logo.Value', + 'frappy_psi.logo.Pressure', 'pressure after forepump', io = 'logo', addr ="VW4", - value = Param(unit='mbar'), -) + pollinterval = 0.5, + ) Mod('airpressure', - 'frappy_psi.logo.DigitalValue', + 'frappy_psi.logo.Comparator', 'Airpressure state', io = 'logo', addr ="V1024.7", -) - - - + threshold = 500, + pollinterval = 0.5, + ) Mod('io_pfeiffer', 'frappy_psi.pfeiffer_new.PfeifferProtocol', diff --git a/frappy_psi/logo.py b/frappy_psi/logo.py index a57e3f33..6c2cf046 100644 --- a/frappy_psi/logo.py +++ b/frappy_psi/logo.py @@ -21,8 +21,9 @@ import sys from time import monotonic from ast import literal_eval import snap7 -from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \ - IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType +from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, \ + IDLE, BUSY, WARN, ERROR, Writable, Drivable, Communicator +from frappy.datatypes import StringType, BoolType, IntRange, OrType, Int32 from frappy.errors import CommunicationFailedError, ConfigError from threading import RLock @@ -76,15 +77,24 @@ class IO(Communicator): self._plc = None raise + @Command(StringType(), result=Int32) + def read(self, vm_address): + return self._plc.read(vm_address) + + @Command((StringType(), Int32)) + def write(self, vm_address, value): + self._plc.write(vm_address, value) + class LogoMixin(HasIO): ioclass = IO - def get_vm_value(self, vm_address): - return literal_eval(self.io.communicate(vm_address)) + def get_vm_value(self, vm_address, scale=1): + return self.io.read(vm_address) * scale - def set_vm_value(self, vm_address, value): - return literal_eval(self.io.communicate(f'{vm_address} {round(value)}')) + def set_vm_value(self, vm_address, value, scale=1): + self.io.write(vm_address, round(value / scale)) + return self.io.read(vm_address) * scale class DigitalActuator(LogoMixin, Writable): @@ -219,7 +229,7 @@ class DelayedActuator(DigitalActuator, Drivable): self._pulse_end = now + delay -class Value(LogoMixin, Readable): +class Sensor(LogoMixin, Readable): addr = Property('VM address', datatype=StringType()) def read_value(self): @@ -229,185 +239,33 @@ class Value(LogoMixin, Readable): return IDLE, '' -class DigitalValue(Value): - value = Parameter('airpressure state', datatype=BoolType()) +class AnalogOutput(Sensor, Writable): + output_addr = Property('VM address output', datatype=StringType(), default='') + + def checkProperties(self): + super().checkProperties() + if not self.output_addr: + self.output_addr = self.addr + + def read_value(self): + return self.get_vm_value(self.vm_address, self.scale) + + def write_target(self, target): + return self.set_vm_value(self.vm_address, target, self.scale) -# TODO: the following classes are too specific, they have to be moved - -class Pressure(LogoMixin, Drivable): - vm_address = Property('VM address', datatype=StringType()) +class Pressure(Sensor): value = Parameter('pressure', datatype=FloatRange(unit='mbar')) - # pollinterval = 0.5 - def read_value(self): - return self.get_vm_value(self.vm_address) - - def read_status(self): - return IDLE, '' - - -class Airpressure(LogoMixin, Readable): - vm_address = Property('VM address', datatype=StringType()) - value = Parameter('airpressure state', datatype=BoolType()) - - # pollinterval = 0.5 - - def read_value(self): - if (self.get_vm_value(self.vm_address) > 500): - return 1 - else: - return 0 - - def read_status(self): - return IDLE, '' - - -class Valve(LogoMixin, Drivable): - vm_address_input = Property('VM address input', datatype=StringType()) - vm_address_output = Property('VM address output', datatype=StringType()) - - target = Parameter('Valve target', datatype=BoolType()) - value = Parameter('Value state', datatype=BoolType()) - _remaining_tries = None - - def read_value(self): - return self.get_vm_value(self.vm_address_input) - - def write_target(self, target): - self.set_vm_value(self.vm_address_output, target) - self._remaining_tries = 5 - self.status = BUSY, 'switching' - self.setFastPoll(True, 0.5) - - def read_status(self): - self.log.debug('read_status') - value = self.read_value() - self.log.debug('value %d target %d', value, self.target) - if value != self.target: - if self._remaining_tries is None: - self.target = self.read_value() - return IDLE, '' - self._remaining_tries -= 1 - if self._remaining_tries < 0: - self.setFastPoll(False) - return ERROR, 'too many tries to switch' - self.set_vm_value(self.vm_address_output, self.target) - return BUSY, 'switching (try again)' - self.setFastPoll(False) - return IDLE, '' - - -class FluidMachines(LogoMixin, Drivable): - vm_address_output = Property('VM address output', datatype=StringType()) - - target = Parameter('Valve target', datatype=BoolType()) - value = Parameter('Valve state', datatype=BoolType()) - - def read_value(self): - return self.get_vm_value(self.vm_address_output) - - def write_target(self, target): - return self.set_vm_value(self.vm_address_output, target) - - def read_status(self): - return IDLE, '' - - -class TempSensor(LogoMixin, Readable): - vm_address = Property('VM address', datatype=StringType()) +class Resistor(Sensor): value = Parameter('resistance', datatype=FloatRange(unit='Ohm')) - def read_value(self): - return self.get_vm_value(self.vm_address) - def read_status(self): - return IDLE, '' - - -class HeaterParam(LogoMixin, Writable): - vm_address = Property('VM address output', datatype=StringType()) - - target = Parameter('Heater target', datatype=IntRange()) - - value = Parameter('Heater Param', datatype=IntRange()) +class Comparator(LogoMixin, Readable): + vm_address = Property('VM address', datatype=StringType()) + value = Parameter('airpressure state', datatype=BoolType()) + threshold = Property('threshold for True', FloatRange()) def read_value(self): - return self.get_vm_value(self.vm_address) - - def write_target(self, target): - return self.set_vm_value(self.vm_address, target) - - def read_status(self): - return IDLE, '' - - -class controlHeater(LogoMixin, Writable): - vm_address = Property('VM address on switch', datatype=StringType()) - - target = Parameter('Heater state', datatype=BoolType()) - - value = Parameter('Heater state', datatype=BoolType()) - - def read_value(self): - return self.get_vm_value(self.vm_address_on) - - def write_target(self, target): - if (target): - return self.set_vm_value(self.vm_address, True) - else: - return self.set_vm_value(self.vm_address, False) - - def read_status(self): - return IDLE, '' - - - - -class safetyfeatureState(LogoMixin, Readable): - vm_address = Property('VM address state', datatype=StringType()) - - value = Parameter('safety Feature state', datatype=BoolType()) - - def read_value(self): - return self.get_vm_value(self.vm_address) - - def read_status(self): - return IDLE, '' - - -class safetyfeatureParam(LogoMixin, Writable): - vm_address = Property('VM address output', datatype=StringType()) - - target = Parameter('safety Feature target', datatype=IntRange()) - - value = Parameter('safety Feature Param', datatype=IntRange()) - - def read_value(self): - return self.get_vm_value(self.vm_address) - - def write_target(self, target): - return self.set_vm_value(self.vm_address, target) - - def read_status(self): - return IDLE, '' - - -class comparatorgekoppeltParam(LogoMixin, Writable): - vm_address_1 = Property('VM address output', datatype=StringType()) - vm_address_2 = Property('VM address output', datatype=StringType()) - - target = Parameter('safety Feature target', datatype=IntRange()) - value = Parameter('safety Feature Param', datatype=IntRange()) - - def read_value(self): - return self.get_vm_value(self.vm_address_1) - - def write_target(self, target): - self.set_vm_value(self.vm_address_1, target) - return self.set_vm_value(self.vm_address_2, target) - - def read_status(self): - return IDLE, '' - + return self.get_vm_value(self.vm_address) > self.threshold