[WIP] dil5 improvements

Change-Id: I2b439bf5898601e10448511479bc67afa3edb4d3
This commit is contained in:
2025-06-05 10:15:57 +02:00
parent 04f7f6ece5
commit 472ae3f04d
3 changed files with 687 additions and 459 deletions

View File

@@ -17,17 +17,17 @@
#
#
# *****************************************************************************
import sys
from time import monotonic
from ast import literal_eval
import snap7
from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator
from frappy.errors import CommunicationFailedError
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType
from frappy.errors import CommunicationFailedError, ConfigError
from threading import RLock
import sys
import time
class IO(Communicator):
tcap_client = Property('tcap_client', IntRange())
tsap_server = Property('tcap_server', IntRange())
ip_address = Property('numeric ip address', StringType())
@@ -37,30 +37,28 @@ class IO(Communicator):
def initModule(self):
self._lock = RLock()
super().initModule()
def _init(self):
if not self._plc:
if time.time() < self._last_try + 10:
raise CommunicationFailedError('logo PLC not reachable')
self._plc = snap7.logo.Logo()
prev_stderr = sys.stdout
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
try:
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
if self._plc.get_connected():
return
except Exception:
pass
finally:
sys.stderr = prev_stderr
self._plc = None
self._last_try = time.time()
raise CommunicationFailedError('logo PLC not reachable')
if monotonic() < self._last_try + 10:
raise CommunicationFailedError('logo PLC not reachable')
self._plc = snap7.logo.Logo()
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
try:
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
if self._plc.get_connected():
return
except Exception:
pass
finally:
sys.stderr = sys.stdout
self._plc = None
self._last_try = monotonic()
raise CommunicationFailedError('logo PLC not reachable')
def communicate(self, cmd):
with self._lock:
self._init()
if not self._plc:
self._init()
cmd = cmd.split(maxsplit=1)
if len(cmd) == 2:
self.comLog('> %s %s', cmd[0], cmd[1])
@@ -76,59 +74,203 @@ class IO(Communicator):
self.comLog('? %r', e)
self.log.exception('error in plc read')
self._plc = None
raise
class Snap7Mixin(HasIO):
raise
class LogoMixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return literal_eval(self.io.communicate(vm_address))
def set_vm_value(self, vm_address, value):
return literal_eval(self.io.communicate(f'{vm_address} {value}'))
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}'))
class DigitalActuator(LogoMixin, Writable):
"""output with or without feedback"""
output_addr = Property('VM address output', datatype=StringType(), default='')
target_addr = Property('VM address target', datatype=StringType(), default='')
feedback_addr = Property('VM address feedback', datatype=StringType(), default='')
target = Parameter('target', datatype=BoolType())
value = Parameter('feedback or output', datatype=BoolType())
_input = 'output'
_value_addr = None
_target_addr = None
_fault = None
def doPoll(self):
self.read_status() # this calls also read_value
def checkProperties(self):
super().checkProperties()
if self.feedback_addr:
self._input = 'feedback'
self._value_addr = self.feedback_addr
else:
self._input = 'output'
self._value_addr = self.output_addr
self._target_addr = self.target_addr or self.output_addr
if self._target_addr and self._value_addr:
self._check_feedback = self.feedback_addr and self.output_addr
return
raise ConfigError('need either output_addr or both feedback_addr and target_addr')
def initialReads(self):
super().initialReads()
self.target = self.value
def set_fault(self, value, statustext):
"""on a fault condition, set target to value
and status to statustext
"""
self.write_target(value)
self._fault = statustext
self.read_status()
def reset_fault(self):
"""reset fault condition"""
self._fault = None
self.read_status()
class Pressure(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('pressure', datatype = FloatRange(unit = 'mbar'))
#pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
return self.get_vm_value(self._value_addr)
def write_target(self, target):
self._fault = None
self.set_vm_value(self._target_addr, target)
value = self.read_value()
if value != target and self.feedback_addr:
# retry only if we have a feedback and the feedback did not change yet
for i in range(20):
if self.read_value() == target:
self.log.debug('tried %d times', i)
break
self.set_vm_value(self._target_addr, target)
def read_status(self):
if self._fault:
return ERROR, self._fault
value = self.read_value()
if value != self.target:
return ERROR, 'value and target do not match'
if self._check_feedback:
if value != self.get_vm_value(self._check_feedback):
return ERROR, f'feedback does not match output'
if self.feedback_addr:
return IDLE, 'feedback confirmed'
return IDLE, ''
class DelayedActuator(DigitalActuator, Drivable):
delay_addr = Property('address of delay value', StringType())
_pulse_start = 0
_pulse_end = 0
_fault = None
def read_status(self):
if self._fault:
return ERROR, self._fault
value = self.read_value()
fberror = None
if self._pulse_start:
now = monotonic()
if now < self._pulse_start + 1:
value = 1
elif now < self._pulse_end - 1:
if not value:
self._pulse_start = 0
return WARN, f'{self._input} is not on during pulse - due to interlock?'
if value:
if now < self._pulse_end + 1:
return BUSY, 'pulsing'
self.log.warn('pulse timeout')
self.set_vm_value(self._target_addr, 0)
self._pulse_start = 0
self.set_vm_value(self.delay_addr, 0)
elif self._check_feedback and value != self.get_vm_value(self._check_feedback):
fberror = ERROR, f'feedback does not match output'
if value != self.target:
return ERROR, 'value does not match target'
self.setFastPoll(False)
if fberror:
return fberror
if self.feedback_addr:
return IDLE, 'feedback confirmed'
return IDLE, ''
def write_target(self, value):
self._pulse_start = 0
if not value:
self.set_vm_value(self.delay_addr, 0)
return super().write_target(value)
@Command(argument=FloatRange(0))
def pulse(self, delay):
"""open for delay seconds"""
self.set_vm_value(self.delay_addr, delay)
self.set_vm_value(self._target_addr, 1)
self.set_vm_value(self._target_addr, 0)
self.setFastPoll(True, 0.5)
self.status = BUSY, 'pulsing'
now = monotonic()
self._pulse_start = now
self._pulse_end = now + delay
class Value(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
def read_value(self):
return self.get_vm_value(self.addr)
def read_status(self):
return IDLE, ''
class Airpressure(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('airpressure state', datatype = BoolType())
#pollinterval = 0.5
# TODO: the following classes are too specific, they have to be moved
class Pressure(LogoMixin, Drivable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
# pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Airpressure(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('airpressure state', datatype=BoolType())
# pollinterval = 0.5
def read_value(self):
if (self.get_vm_value(self.vm_address) > 500):
return 1
else:
return 0
def read_status(self):
return IDLE, ''
class Valve(Snap7Mixin, Drivable):
vm_address_input = Property('VM address input', datatype= StringType())
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
def read_status(self):
return IDLE, ''
class Valve(LogoMixin, Drivable):
vm_address_input = Property('VM address input', datatype=StringType())
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Value state', datatype=BoolType())
_remaining_tries = None
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
self.set_vm_value(self.vm_address_output, target)
self._remaining_tries = 5
@@ -142,127 +284,126 @@ class Valve(Snap7Mixin, Drivable):
if value != self.target:
if self._remaining_tries is None:
self.target = self.read_value()
return IDLE,''
return IDLE, ''
self._remaining_tries -= 1
if self._remaining_tries < 0:
self.setFastPoll(False)
return ERROR, 'too many tries to switch'
self.set_vm_value(self.vm_address_output, self.target)
self.set_vm_value(self.vm_address_output, self.target)
return BUSY, 'switching (try again)'
self.setFastPoll(False)
return IDLE, ''
class FluidMachines(Snap7Mixin, Drivable):
vm_address_output = Property('VM address output', datatype= StringType())
target = Parameter('Valve target', datatype = BoolType())
value = Parameter('Value state', datatype = BoolType())
class FluidMachines(LogoMixin, Drivable):
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Valve state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(Snap7Mixin, Readable):
vm_address = Property('VM address', datatype= StringType())
value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(Snap7Mixin, Writable):
vm_address = Property('VM address output', datatype= StringType())
target = Parameter('Heater target', datatype = IntRange())
value = Parameter('Heater Param', datatype = IntRange())
class TempSensor(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('Heater target', datatype=IntRange())
value = Parameter('Heater Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
return IDLE, ''
class controlHeater(Snap7Mixin, Writable):
vm_address = Property('VM address on switch', datatype= StringType())
target = Parameter('Heater state', datatype = BoolType())
value = Parameter('Heater state', datatype = BoolType())
class controlHeater(LogoMixin, Writable):
vm_address = Property('VM address on switch', datatype=StringType())
target = Parameter('Heater state', datatype=BoolType())
value = Parameter('Heater state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_on)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''
class safetyfeatureState(Snap7Mixin, Readable):
vm_address = Property('VM address state', datatype= StringType())
value = Parameter('safety Feature state', datatype = BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
return IDLE, ''
class safetyfeatureParam(Snap7Mixin, Writable):
vm_address = Property('VM address output', datatype= StringType())
target = Parameter('safety Feature target', datatype = IntRange())
value = Parameter('safety Feature Param', datatype = IntRange())
class safetyfeatureState(LogoMixin, Readable):
vm_address = Property('VM address state', datatype=StringType())
value = Parameter('safety Feature state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class safetyfeatureParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
return IDLE, ''
class comparatorgekoppeltParam(Snap7Mixin, Writable):
vm_address_1 = Property('VM address output', datatype= StringType())
vm_address_2 = Property('VM address output', datatype= StringType())
target = Parameter('safety Feature target', datatype = IntRange())
value = Parameter('safety Feature Param', datatype = IntRange())
class comparatorgekoppeltParam(LogoMixin, Writable):
vm_address_1 = Property('VM address output', datatype=StringType())
vm_address_2 = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address_1)
def write_target(self, target):
self.set_vm_value(self.vm_address_1, target)
return self.set_vm_value(self.vm_address_2, target)
def read_status(self):
return IDLE, ''
def read_status(self):
return IDLE, ''