# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Andrea Plank # Markus Zolliker # # ***************************************************************************** import sys import time from ast import literal_eval from threading import RLock import snap7 from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator from frappy.errors import CommunicationFailedError class IO(Communicator): tcap_client = Property('tcap_client', IntRange()) tsap_server = Property('tcap_server', IntRange()) ip_address = Property('numeric ip address', StringType()) _plc = None _last_try = 0 def initModule(self): self._lock = RLock() super().initModule() def _init(self): if not self._plc: if time.time() < self._last_try + 10: raise CommunicationFailedError('logo PLC not reachable') self._plc = snap7.logo.Logo() prev_stderr = sys.stdout sys.stderr = open('/dev/null', 'w') # suppress output of snap7 try: self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server) if self._plc.get_connected(): return except Exception: pass finally: sys.stderr = prev_stderr self._plc = None self._last_try = time.time() raise CommunicationFailedError('logo PLC not reachable') def communicate(self, cmd): with self._lock: self._init() cmd = cmd.split(maxsplit=1) if len(cmd) == 2: self._plc.write(cmd[0], literal_eval(cmd[1])) try: return self._plc.read(cmd[0]) except Exception as e: if self._plc: self.log.exception('error in plc read') self._plc = None raise class Snap7Mixin(HasIO): ioclass = IO def get_vm_value(self, vm_address): return self.io.communicate(vm_address) def set_vm_value(self, vm_address, value): return self.io.communicate(f'{vm_address} {value}') class Pressure(Snap7Mixin, Readable): vm_address = Property('VM address', datatype= StringType()) value = Parameter('pressure', datatype = FloatRange(unit = 'mbar')) pollinterval = 0.5 def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class Valve(Snap7Mixin, Drivable): vm_address_input = Property('VM address input', datatype= StringType()) vm_address_output = Property('VM address output', datatype= StringType()) target = Parameter('Valve target', datatype = BoolType()) value = Parameter('Value state', datatype = BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_input) def write_target(self, target): return self.set_vm_value(self.vm_address_output, target) def read_status(self): return IDLE, '' class FluidMachines(Snap7Mixin, Drivable): vm_address_output = Property('VM address output', datatype= StringType()) target = Parameter('Valve target', datatype = BoolType()) value = Parameter('Value state', datatype = BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_output) def write_target(self, target): return self.set_vm_value(self.vm_address_output, target) def read_status(self): return IDLE, '' class TempSensor(Snap7Mixin, Readable): vm_address = Property('VM address', datatype= StringType()) value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm')) def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class HeaterParam(Snap7Mixin, Writable): vm_address = Property('VM address output', datatype= StringType()) target = Parameter('Heater target', datatype = FloatRange(unit='%')) value = Parameter('Heater Param', datatype = FloatRange(unit='%')) def read_value(self): return self.get_vm_value(self.vm_address) * 0.1 def write_target(self, target): return self.set_vm_value(self.vm_address, round(target * 10)) * 0.1 def read_status(self): return IDLE, ''