# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # # # ***************************************************************************** from ast import literal_eval import snap7 from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator from frappy.errors import CommunicationFailedError from threading import RLock import sys import time class IO(Communicator): tcap_client = Property('tcap_client', IntRange()) tsap_server = Property('tcap_server', IntRange()) ip_address = Property('numeric ip address', StringType()) _plc = None _last_try = 0 def initModule(self): self._lock = RLock() super().initModule() def _init(self): if not self._plc: if time.time() < self._last_try + 10: raise CommunicationFailedError('logo PLC not reachable') self._plc = snap7.logo.Logo() prev_stderr = sys.stdout sys.stderr = open('/dev/null', 'w') # suppress output of snap7 try: self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server) if self._plc.get_connected(): return except Exception: pass finally: sys.stderr = prev_stderr self._plc = None self._last_try = time.time() raise CommunicationFailedError('logo PLC not reachable') def communicate(self, cmd): with self._lock: self._init() cmd = cmd.split(maxsplit=1) if len(cmd) == 2: self._plc.write(cmd[0], literal_eval(cmd[1])) try: return self._plc.read(cmd[0]) except Exception as e: if self._plc: self.log.exception('error in plc read') self._plc = None raise class Snap7Mixin(HasIO): ioclass = IO def get_vm_value(self, vm_address): return self.io.communicate(vm_address) def set_vm_value(self, vm_address, value): return self.io.communicate(f'{vm_address} {value}') class Pressure(Snap7Mixin, Readable): vm_address = Property('VM address', datatype= StringType()) value = Parameter('pressure', datatype = FloatRange(unit = 'mbar')) #pollinterval = 0.5 def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class Airpressure(Snap7Mixin, Readable): vm_address = Property('VM address', datatype= StringType()) value = Parameter('airpressure state', datatype = BoolType()) #pollinterval = 0.5 def read_value(self): if (self.get_vm_value(self.vm_address) > 500): return 1 else: return 0 def read_status(self): return IDLE, '' class Valve(Snap7Mixin, Drivable): vm_address_input = Property('VM address input', datatype= StringType()) vm_address_output = Property('VM address output', datatype= StringType()) target = Parameter('Valve target', datatype = BoolType()) value = Parameter('Value state', datatype = BoolType()) _remaining_tries = None def read_value(self): return self.get_vm_value(self.vm_address_input) def write_target(self, target): self.set_vm_value(self.vm_address_output, target) self._remaining_tries = 5 self.status = BUSY, 'switching' self.setFastPoll(True, 0.001) def read_status(self): self.log.info('read_status') value = self.read_value() self.log.info('value %d target %d', value, self.target) if value != self.target: if self._remaining_tries is None: self.target = self.read_value() return IDLE,'' self._remaining_tries -= 1 if self._remaining_tries < 0: self.setFastPoll(False) return ERROR, 'too many tries to switch' self.set_vm_value(self.vm_address_output, self.target) return BUSY, 'switching (try again)' self.setFastPoll(False) return IDLE, '' class FluidMachines(Snap7Mixin, Drivable): vm_address_output = Property('VM address output', datatype= StringType()) target = Parameter('Valve target', datatype = BoolType()) value = Parameter('Value state', datatype = BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_output) def write_target(self, target): return self.set_vm_value(self.vm_address_output, target) def read_status(self): return IDLE, '' class TempSensor(Snap7Mixin, Readable): vm_address = Property('VM address', datatype= StringType()) value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm')) def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class HeaterParam(Snap7Mixin, Writable): vm_address = Property('VM address output', datatype= StringType()) target = Parameter('Heater target', datatype = IntRange()) value = Parameter('Heater Param', datatype = IntRange()) def read_value(self): return self.get_vm_value(self.vm_address) def write_target(self, target): return self.set_vm_value(self.vm_address, target) def read_status(self): return IDLE, '' class controlHeater(Snap7Mixin, Writable): vm_address = Property('VM address on switch', datatype= StringType()) target = Parameter('Heater state', datatype = BoolType()) value = Parameter('Heater state', datatype = BoolType()) def read_value(self): return self.get_vm_value(self.vm_address_on) def write_target(self, target): if (target): return self.set_vm_value(self.vm_address, True) else: return self.set_vm_value(self.vm_address, False) def read_status(self): return IDLE, '' class safetyfeatureState(Snap7Mixin, Readable): vm_address = Property('VM address state', datatype= StringType()) value = Parameter('safety Feature state', datatype = BoolType()) def read_value(self): return self.get_vm_value(self.vm_address) def read_status(self): return IDLE, '' class safetyfeatureParam(Snap7Mixin, Writable): vm_address = Property('VM address output', datatype= StringType()) target = Parameter('safety Feature target', datatype = IntRange()) value = Parameter('safety Feature Param', datatype = IntRange()) def read_value(self): return self.get_vm_value(self.vm_address) def write_target(self, target): return self.set_vm_value(self.vm_address, target) def read_status(self): return IDLE, '' class comparatorgekoppeltParam(Snap7Mixin, Writable): vm_address_1 = Property('VM address output', datatype= StringType()) vm_address_2 = Property('VM address output', datatype= StringType()) target = Parameter('safety Feature target', datatype = IntRange()) value = Parameter('safety Feature Param', datatype = IntRange()) def read_value(self): return self.get_vm_value(self.vm_address_1) def write_target(self, target): self.set_vm_value(self.vm_address_1, target) return self.set_vm_value(self.vm_address_2, target) def read_status(self): return IDLE, ''