# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # # # ***************************************************************************** import sys from time import monotonic from ast import literal_eval import snap7 from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \ IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType from frappy.errors import CommunicationFailedError, ConfigError from threading import RLock class IO(Communicator): tcap_client = Property('tcap_client', IntRange()) tsap_server = Property('tcap_server', IntRange()) ip_address = Property('numeric ip address', StringType()) _plc = None _last_try = 0 def initModule(self): self._lock = RLock() super().initModule() def _init(self): if monotonic() < self._last_try + 10: raise CommunicationFailedError('logo PLC not reachable') self._plc = snap7.logo.Logo() sys.stderr = open('/dev/null', 'w') # suppress output of snap7 try: self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server) if self._plc.get_connected(): return except Exception: pass finally: sys.stderr = sys.stdout self._plc = None self._last_try = monotonic() raise CommunicationFailedError('logo PLC not reachable') def communicate(self, cmd): with self._lock: if not self._plc: self._init() cmd = cmd.split(maxsplit=1) if len(cmd) == 2: self.comLog('> %s %s', cmd[0], cmd[1]) self._plc.write(cmd[0], literal_eval(cmd[1])) self.comLog('< OK') try: self.comLog('> %s', cmd[0]) reply = self._plc.read(cmd[0]) self.comLog('< %s', reply) return str(reply) except Exception as e: if self._plc: self.comLog('? %r', e) self.log.exception('error in plc read') self._plc = None raise class LogoMixin(HasIO): ioclass = IO def get_vm_value(self, vm_address): return literal_eval(self.io.communicate(vm_address)) def set_vm_value(self, vm_address, value): return literal_eval(self.io.communicate(f'{vm_address} {round(value)}')) class DigitalActuator(LogoMixin, Writable): """output with or without feedback""" output_addr = Property('VM address output', datatype=StringType(), default='') target_addr = Property('VM address target', datatype=StringType(), default='') feedback_addr = Property('VM address feedback', datatype=StringType(), default='') target = Parameter('target', datatype=BoolType()) value = Parameter('feedback or output', datatype=BoolType()) _input = 'output' _value_addr = None _target_addr = None _fault = None def doPoll(self): self.read_status() # this calls also read_value def checkProperties(self): super().checkProperties() if self.feedback_addr: self._input = 'feedback' self._value_addr = self.feedback_addr else: self._input = 'output' self._value_addr = self.output_addr self._target_addr = self.target_addr or self.output_addr if self._target_addr and self._value_addr: self._check_feedback = self.feedback_addr and self.output_addr return raise ConfigError('need either output_addr or both feedback_addr and target_addr') def initialReads(self): super().initialReads() self.target = self.read_value() def set_fault(self, value, statustext): """on a fault condition, set target to value and status to statustext """ self.write_target(value) self._fault = statustext self.read_status() def reset_fault(self): """reset fault condition""" self._fault = None self.read_status() def read_value(self): return self.get_vm_value(self._value_addr) def write_target(self, target): self._fault = None self.set_vm_value(self._target_addr, target) value = self.read_value() if value != target and self.feedback_addr: # retry only if we have a feedback and the feedback did not change yet for i in range(20): if self.read_value() == target: self.log.debug('tried %d times', i) break self.set_vm_value(self._target_addr, target) def read_status(self): if self._fault: return ERROR, self._fault value = self.read_value() if value != self.target: return ERROR, 'value and target do not match' if self._check_feedback: if value != self.get_vm_value(self._check_feedback): return ERROR, f'feedback does not match output' if self.feedback_addr: return IDLE, 'feedback confirmed' return IDLE, '' class DelayedActuator(DigitalActuator, Drivable): delay_addr = Property('address of delay value', StringType()) _pulse_start = 0 _pulse_end = 0 _fault = None def read_status(self): if self._fault: return ERROR, self._fault value = self.read_value() fberror = None if self._pulse_start: now = monotonic() if now < self._pulse_start + 1: value = 1 elif now < self._pulse_end - 1: if not value: self._pulse_start = 0 return WARN, f'{self._input} is not on during pulse - due to interlock?' if value: if now < self._pulse_end + 1: return BUSY, 'pulsing' self.log.warn('pulse timeout') self.set_vm_value(self._target_addr, 0) self._pulse_start = 0 self.set_vm_value(self.delay_addr, 0) elif self._check_feedback and value != self.get_vm_value(self._check_feedback): fberror = ERROR, f'feedback does not match output' if value != self.target: return ERROR, 'value does not match target' self.setFastPoll(False) if fberror: return fberror if self.feedback_addr: return IDLE, 'feedback confirmed' return IDLE, '' def write_target(self, value): self._pulse_start = 0 if not value: self.set_vm_value(self.delay_addr, 0) return super().write_target(value) @Command(argument=FloatRange(0)) def pulse(self, delay): """open for delay seconds""" self.set_vm_value(self.delay_addr, delay) self.set_vm_value(self._target_addr, 1) self.set_vm_value(self._target_addr, 0) self.setFastPoll(True, 0.5) self.status = BUSY, 'pulsing' now = monotonic() self._pulse_start = now self._pulse_end = now + delay class Value(LogoMixin, Readable): addr = Property('VM address', datatype=StringType()) def read_value(self): return self.get_vm_value(self.addr) def read_status(self): return IDLE, '' class DigitalValue(Value): value = Parameter('airpressure state', datatype=BoolType()) # TODO: the following classes are too specific, they have to be moved class Pressure(LogoMixin, Drivable): vm_address = Property('VM address', datatype=StringType()) value = Parameter('pressure', datatype=FloatRange(unit='mbar')) # pollinterval = 0.5 def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class Airpressure(LogoMixin, Readable): vm_address = Property('VM address', datatype=StringType()) value = Parameter('airpressure state', datatype=BoolType()) # pollinterval = 0.5 def read_value(self): if (self.get_vm_value(self.vm_address) > 500): return 1 else: return 0 def read_status(self): return IDLE, '' class Valve(LogoMixin, Drivable): vm_address_input = Property('VM address input', datatype=StringType()) vm_address_output = Property('VM address output', datatype=StringType()) target = Parameter('Valve target', datatype=BoolType()) value = Parameter('Value state', datatype=BoolType()) _remaining_tries = None def read_value(self): return self.get_vm_value(self.vm_address_input) def write_target(self, target): self.set_vm_value(self.vm_address_output, target) self._remaining_tries = 5 self.status = BUSY, 'switching' self.setFastPoll(True, 0.5) def read_status(self): self.log.debug('read_status') value = self.read_value() self.log.debug('value %d target %d', value, self.target) if value != self.target: if self._remaining_tries is None: self.target = self.read_value() return IDLE, '' self._remaining_tries -= 1 if self._remaining_tries < 0: self.setFastPoll(False) return ERROR, 'too many tries to switch' self.set_vm_value(self.vm_address_output, self.target) return BUSY, 'switching (try again)' self.setFastPoll(False) return IDLE, '' class FluidMachines(LogoMixin, Drivable): vm_address_output = Property('VM address output', datatype=StringType()) target = Parameter('Valve target', datatype=BoolType()) value = Parameter('Valve state', datatype=BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_output) def write_target(self, target): return self.set_vm_value(self.vm_address_output, target) def read_status(self): return IDLE, '' class TempSensor(LogoMixin, Readable): vm_address = Property('VM address', datatype=StringType()) value = Parameter('resistance', datatype=FloatRange(unit='Ohm')) def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class HeaterParam(LogoMixin, Writable): vm_address = Property('VM address output', datatype=StringType()) target = Parameter('Heater target', datatype=IntRange()) value = Parameter('Heater Param', datatype=IntRange()) def read_value(self): return self.get_vm_value(self.vm_address) def write_target(self, target): return self.set_vm_value(self.vm_address, target) def read_status(self): return IDLE, '' class controlHeater(LogoMixin, Writable): vm_address = Property('VM address on switch', datatype=StringType()) target = Parameter('Heater state', datatype=BoolType()) value = Parameter('Heater state', datatype=BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_on) def write_target(self, target): if (target): return self.set_vm_value(self.vm_address, True) else: return self.set_vm_value(self.vm_address, False) def read_status(self): return IDLE, '' class safetyfeatureState(LogoMixin, Readable): vm_address = Property('VM address state', datatype=StringType()) value = Parameter('safety Feature state', datatype=BoolType()) def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class safetyfeatureParam(LogoMixin, Writable): vm_address = Property('VM address output', datatype=StringType()) target = Parameter('safety Feature target', datatype=IntRange()) value = Parameter('safety Feature Param', datatype=IntRange()) def read_value(self): return self.get_vm_value(self.vm_address) def write_target(self, target): return self.set_vm_value(self.vm_address, target) def read_status(self): return IDLE, '' class comparatorgekoppeltParam(LogoMixin, Writable): vm_address_1 = Property('VM address output', datatype=StringType()) vm_address_2 = Property('VM address output', datatype=StringType()) target = Parameter('safety Feature target', datatype=IntRange()) value = Parameter('safety Feature Param', datatype=IntRange()) def read_value(self): return self.get_vm_value(self.vm_address_1) def write_target(self, target): self.set_vm_value(self.vm_address_1, target) return self.set_vm_value(self.vm_address_2, target) def read_status(self): return IDLE, ''