263 lines
8.3 KiB
Python
263 lines
8.3 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
#
|
|
#
|
|
# *****************************************************************************
|
|
from ast import literal_eval
|
|
import snap7
|
|
from frappy.core import Readable, Parameter, FloatRange, HasIO, StringIO, Property, StringType,IDLE, BUSY, WARN, ERROR,Writable, Drivable, BoolType, IntRange, Communicator
|
|
from frappy.errors import CommunicationFailedError
|
|
from threading import RLock
|
|
import sys
|
|
import time
|
|
|
|
class IO(Communicator):
|
|
|
|
|
|
tcap_client = Property('tcap_client', IntRange())
|
|
tsap_server = Property('tcap_server', IntRange())
|
|
ip_address = Property('numeric ip address', StringType())
|
|
_plc = None
|
|
_last_try = 0
|
|
|
|
def initModule(self):
|
|
self._lock = RLock()
|
|
super().initModule()
|
|
def _init(self):
|
|
if not self._plc:
|
|
if time.time() < self._last_try + 10:
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
self._plc = snap7.logo.Logo()
|
|
prev_stderr = sys.stdout
|
|
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
|
|
try:
|
|
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
|
|
if self._plc.get_connected():
|
|
return
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
sys.stderr = prev_stderr
|
|
self._plc = None
|
|
self._last_try = time.time()
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
|
|
|
|
|
|
def communicate(self, cmd):
|
|
with self._lock:
|
|
self._init()
|
|
cmd = cmd.split(maxsplit=1)
|
|
if len(cmd) == 2:
|
|
self._plc.write(cmd[0], literal_eval(cmd[1]))
|
|
try:
|
|
return self._plc.read(cmd[0])
|
|
except Exception as e:
|
|
if self._plc:
|
|
self.log.exception('error in plc read')
|
|
self._plc = None
|
|
raise
|
|
|
|
|
|
|
|
class Snap7Mixin(HasIO):
|
|
ioclass = IO
|
|
|
|
def get_vm_value(self, vm_address):
|
|
return self.io.communicate(vm_address)
|
|
|
|
|
|
def set_vm_value(self, vm_address, value):
|
|
return self.io.communicate(f'{vm_address} {value}')
|
|
|
|
class Pressure(Snap7Mixin, Readable):
|
|
vm_address = Property('VM address', datatype= StringType())
|
|
value = Parameter('pressure', datatype = FloatRange(unit = 'mbar'))
|
|
|
|
#pollinterval = 0.5
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class Airpressure(Snap7Mixin, Readable):
|
|
vm_address = Property('VM address', datatype= StringType())
|
|
value = Parameter('airpressure state', datatype = BoolType())
|
|
|
|
#pollinterval = 0.5
|
|
|
|
def read_value(self):
|
|
if (self.get_vm_value(self.vm_address) > 500):
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
class Valve(Snap7Mixin, Drivable):
|
|
vm_address_input = Property('VM address input', datatype= StringType())
|
|
vm_address_output = Property('VM address output', datatype= StringType())
|
|
|
|
target = Parameter('Valve target', datatype = BoolType())
|
|
value = Parameter('Value state', datatype = BoolType())
|
|
_remaining_tries = None
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_input)
|
|
|
|
def write_target(self, target):
|
|
self.set_vm_value(self.vm_address_output, target)
|
|
self._remaining_tries = 5
|
|
self.status = BUSY, 'switching'
|
|
self.setFastPoll(True, 0.001)
|
|
|
|
def read_status(self):
|
|
self.log.info('read_status')
|
|
value = self.read_value()
|
|
self.log.info('value %d target %d', value, self.target)
|
|
if value != self.target:
|
|
if self._remaining_tries is None:
|
|
self.target = self.read_value()
|
|
return IDLE,''
|
|
self._remaining_tries -= 1
|
|
if self._remaining_tries < 0:
|
|
self.setFastPoll(False)
|
|
return ERROR, 'too many tries to switch'
|
|
self.set_vm_value(self.vm_address_output, self.target)
|
|
return BUSY, 'switching (try again)'
|
|
self.setFastPoll(False)
|
|
return IDLE, ''
|
|
|
|
class FluidMachines(Snap7Mixin, Drivable):
|
|
vm_address_output = Property('VM address output', datatype= StringType())
|
|
|
|
target = Parameter('Valve target', datatype = BoolType())
|
|
value = Parameter('Value state', datatype = BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_output)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address_output, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
class TempSensor(Snap7Mixin, Readable):
|
|
vm_address = Property('VM address', datatype= StringType())
|
|
value = Parameter('resistance', datatype = FloatRange(unit = 'Ohm'))
|
|
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
class HeaterParam(Snap7Mixin, Writable):
|
|
vm_address = Property('VM address output', datatype= StringType())
|
|
|
|
target = Parameter('Heater target', datatype = IntRange())
|
|
|
|
value = Parameter('Heater Param', datatype = IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class controlHeater(Snap7Mixin, Writable):
|
|
|
|
vm_address = Property('VM address on switch', datatype= StringType())
|
|
|
|
target = Parameter('Heater state', datatype = BoolType())
|
|
|
|
value = Parameter('Heater state', datatype = BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_on)
|
|
|
|
def write_target(self, target):
|
|
if (target):
|
|
return self.set_vm_value(self.vm_address, True)
|
|
else:
|
|
return self.set_vm_value(self.vm_address, False)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class safetyfeatureState(Snap7Mixin, Readable):
|
|
|
|
vm_address = Property('VM address state', datatype= StringType())
|
|
|
|
value = Parameter('safety Feature state', datatype = BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class safetyfeatureParam(Snap7Mixin, Writable):
|
|
vm_address = Property('VM address output', datatype= StringType())
|
|
|
|
target = Parameter('safety Feature target', datatype = IntRange())
|
|
|
|
value = Parameter('safety Feature Param', datatype = IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class comparatorgekoppeltParam(Snap7Mixin, Writable):
|
|
vm_address_1 = Property('VM address output', datatype= StringType())
|
|
vm_address_2 = Property('VM address output', datatype= StringType())
|
|
|
|
target = Parameter('safety Feature target', datatype = IntRange())
|
|
value = Parameter('safety Feature Param', datatype = IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_1)
|
|
|
|
def write_target(self, target):
|
|
self.set_vm_value(self.vm_address_1, target)
|
|
return self.set_vm_value(self.vm_address_2, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
|
|
|