Files
frappy/frappy_psi/logo.py
Markus Zolliker d85d80ba36 dil5: working alfa version
Change-Id: Ib6bf2234633cc760fc771a3f5c0beb4cb63a0f6d
2025-06-05 17:48:15 +02:00

414 lines
13 KiB
Python

# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
#
# *****************************************************************************
import sys
from time import monotonic
from ast import literal_eval
import snap7
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType
from frappy.errors import CommunicationFailedError, ConfigError
from threading import RLock
class IO(Communicator):
tcap_client = Property('tcap_client', IntRange())
tsap_server = Property('tcap_server', IntRange())
ip_address = Property('numeric ip address', StringType())
_plc = None
_last_try = 0
def initModule(self):
self._lock = RLock()
super().initModule()
def _init(self):
if monotonic() < self._last_try + 10:
raise CommunicationFailedError('logo PLC not reachable')
self._plc = snap7.logo.Logo()
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
try:
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
if self._plc.get_connected():
return
except Exception:
pass
finally:
sys.stderr = sys.stdout
self._plc = None
self._last_try = monotonic()
raise CommunicationFailedError('logo PLC not reachable')
def communicate(self, cmd):
with self._lock:
if not self._plc:
self._init()
cmd = cmd.split(maxsplit=1)
if len(cmd) == 2:
self.comLog('> %s %s', cmd[0], cmd[1])
self._plc.write(cmd[0], literal_eval(cmd[1]))
self.comLog('< OK')
try:
self.comLog('> %s', cmd[0])
reply = self._plc.read(cmd[0])
self.comLog('< %s', reply)
return str(reply)
except Exception as e:
if self._plc:
self.comLog('? %r', e)
self.log.exception('error in plc read')
self._plc = None
raise
class LogoMixin(HasIO):
ioclass = IO
def get_vm_value(self, vm_address):
return literal_eval(self.io.communicate(vm_address))
def set_vm_value(self, vm_address, value):
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}'))
class DigitalActuator(LogoMixin, Writable):
"""output with or without feedback"""
output_addr = Property('VM address output', datatype=StringType(), default='')
target_addr = Property('VM address target', datatype=StringType(), default='')
feedback_addr = Property('VM address feedback', datatype=StringType(), default='')
target = Parameter('target', datatype=BoolType())
value = Parameter('feedback or output', datatype=BoolType())
_input = 'output'
_value_addr = None
_target_addr = None
_fault = None
def doPoll(self):
self.read_status() # this calls also read_value
def checkProperties(self):
super().checkProperties()
if self.feedback_addr:
self._input = 'feedback'
self._value_addr = self.feedback_addr
else:
self._input = 'output'
self._value_addr = self.output_addr
self._target_addr = self.target_addr or self.output_addr
if self._target_addr and self._value_addr:
self._check_feedback = self.feedback_addr and self.output_addr
return
raise ConfigError('need either output_addr or both feedback_addr and target_addr')
def initialReads(self):
super().initialReads()
self.target = self.read_value()
def set_fault(self, value, statustext):
"""on a fault condition, set target to value
and status to statustext
"""
self.write_target(value)
self._fault = statustext
self.read_status()
def reset_fault(self):
"""reset fault condition"""
self._fault = None
self.read_status()
def read_value(self):
return self.get_vm_value(self._value_addr)
def write_target(self, target):
self._fault = None
self.set_vm_value(self._target_addr, target)
value = self.read_value()
if value != target and self.feedback_addr:
# retry only if we have a feedback and the feedback did not change yet
for i in range(20):
if self.read_value() == target:
self.log.debug('tried %d times', i)
break
self.set_vm_value(self._target_addr, target)
def read_status(self):
if self._fault:
return ERROR, self._fault
value = self.read_value()
if value != self.target:
return ERROR, 'value and target do not match'
if self._check_feedback:
if value != self.get_vm_value(self._check_feedback):
return ERROR, f'feedback does not match output'
if self.feedback_addr:
return IDLE, 'feedback confirmed'
return IDLE, ''
class DelayedActuator(DigitalActuator, Drivable):
delay_addr = Property('address of delay value', StringType())
_pulse_start = 0
_pulse_end = 0
_fault = None
def read_status(self):
if self._fault:
return ERROR, self._fault
value = self.read_value()
fberror = None
if self._pulse_start:
now = monotonic()
if now < self._pulse_start + 1:
value = 1
elif now < self._pulse_end - 1:
if not value:
self._pulse_start = 0
return WARN, f'{self._input} is not on during pulse - due to interlock?'
if value:
if now < self._pulse_end + 1:
return BUSY, 'pulsing'
self.log.warn('pulse timeout')
self.set_vm_value(self._target_addr, 0)
self._pulse_start = 0
self.set_vm_value(self.delay_addr, 0)
elif self._check_feedback and value != self.get_vm_value(self._check_feedback):
fberror = ERROR, f'feedback does not match output'
if value != self.target:
return ERROR, 'value does not match target'
self.setFastPoll(False)
if fberror:
return fberror
if self.feedback_addr:
return IDLE, 'feedback confirmed'
return IDLE, ''
def write_target(self, value):
self._pulse_start = 0
if not value:
self.set_vm_value(self.delay_addr, 0)
return super().write_target(value)
@Command(argument=FloatRange(0))
def pulse(self, delay):
"""open for delay seconds"""
self.set_vm_value(self.delay_addr, delay)
self.set_vm_value(self._target_addr, 1)
self.set_vm_value(self._target_addr, 0)
self.setFastPoll(True, 0.5)
self.status = BUSY, 'pulsing'
now = monotonic()
self._pulse_start = now
self._pulse_end = now + delay
class Value(LogoMixin, Readable):
addr = Property('VM address', datatype=StringType())
def read_value(self):
return self.get_vm_value(self.addr)
def read_status(self):
return IDLE, ''
class DigitalValue(Value):
value = Parameter('airpressure state', datatype=BoolType())
# TODO: the following classes are too specific, they have to be moved
class Pressure(LogoMixin, Drivable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
# pollinterval = 0.5
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class Airpressure(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('airpressure state', datatype=BoolType())
# pollinterval = 0.5
def read_value(self):
if (self.get_vm_value(self.vm_address) > 500):
return 1
else:
return 0
def read_status(self):
return IDLE, ''
class Valve(LogoMixin, Drivable):
vm_address_input = Property('VM address input', datatype=StringType())
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Value state', datatype=BoolType())
_remaining_tries = None
def read_value(self):
return self.get_vm_value(self.vm_address_input)
def write_target(self, target):
self.set_vm_value(self.vm_address_output, target)
self._remaining_tries = 5
self.status = BUSY, 'switching'
self.setFastPoll(True, 0.5)
def read_status(self):
self.log.debug('read_status')
value = self.read_value()
self.log.debug('value %d target %d', value, self.target)
if value != self.target:
if self._remaining_tries is None:
self.target = self.read_value()
return IDLE, ''
self._remaining_tries -= 1
if self._remaining_tries < 0:
self.setFastPoll(False)
return ERROR, 'too many tries to switch'
self.set_vm_value(self.vm_address_output, self.target)
return BUSY, 'switching (try again)'
self.setFastPoll(False)
return IDLE, ''
class FluidMachines(LogoMixin, Drivable):
vm_address_output = Property('VM address output', datatype=StringType())
target = Parameter('Valve target', datatype=BoolType())
value = Parameter('Valve state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_output)
def write_target(self, target):
return self.set_vm_value(self.vm_address_output, target)
def read_status(self):
return IDLE, ''
class TempSensor(LogoMixin, Readable):
vm_address = Property('VM address', datatype=StringType())
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class HeaterParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('Heater target', datatype=IntRange())
value = Parameter('Heater Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class controlHeater(LogoMixin, Writable):
vm_address = Property('VM address on switch', datatype=StringType())
target = Parameter('Heater state', datatype=BoolType())
value = Parameter('Heater state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address_on)
def write_target(self, target):
if (target):
return self.set_vm_value(self.vm_address, True)
else:
return self.set_vm_value(self.vm_address, False)
def read_status(self):
return IDLE, ''
class safetyfeatureState(LogoMixin, Readable):
vm_address = Property('VM address state', datatype=StringType())
value = Parameter('safety Feature state', datatype=BoolType())
def read_value(self):
return self.get_vm_value(self.vm_address)
def read_status(self):
return IDLE, ''
class safetyfeatureParam(LogoMixin, Writable):
vm_address = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address)
def write_target(self, target):
return self.set_vm_value(self.vm_address, target)
def read_status(self):
return IDLE, ''
class comparatorgekoppeltParam(LogoMixin, Writable):
vm_address_1 = Property('VM address output', datatype=StringType())
vm_address_2 = Property('VM address output', datatype=StringType())
target = Parameter('safety Feature target', datatype=IntRange())
value = Parameter('safety Feature Param', datatype=IntRange())
def read_value(self):
return self.get_vm_value(self.vm_address_1)
def write_target(self, target):
self.set_vm_value(self.vm_address_1, target)
return self.set_vm_value(self.vm_address_2, target)
def read_status(self):
return IDLE, ''