414 lines
13 KiB
Python
414 lines
13 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
#
|
|
#
|
|
# *****************************************************************************
|
|
import sys
|
|
from time import monotonic
|
|
from ast import literal_eval
|
|
import snap7
|
|
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, StringType, \
|
|
IDLE, BUSY, WARN, ERROR, Writable, Drivable, BoolType, IntRange, Communicator, StatusType
|
|
from frappy.errors import CommunicationFailedError, ConfigError
|
|
from threading import RLock
|
|
|
|
|
|
class IO(Communicator):
|
|
tcap_client = Property('tcap_client', IntRange())
|
|
tsap_server = Property('tcap_server', IntRange())
|
|
ip_address = Property('numeric ip address', StringType())
|
|
_plc = None
|
|
_last_try = 0
|
|
|
|
def initModule(self):
|
|
self._lock = RLock()
|
|
super().initModule()
|
|
|
|
def _init(self):
|
|
if monotonic() < self._last_try + 10:
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
self._plc = snap7.logo.Logo()
|
|
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
|
|
try:
|
|
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
|
|
if self._plc.get_connected():
|
|
return
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
sys.stderr = sys.stdout
|
|
self._plc = None
|
|
self._last_try = monotonic()
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
|
|
def communicate(self, cmd):
|
|
with self._lock:
|
|
if not self._plc:
|
|
self._init()
|
|
cmd = cmd.split(maxsplit=1)
|
|
if len(cmd) == 2:
|
|
self.comLog('> %s %s', cmd[0], cmd[1])
|
|
self._plc.write(cmd[0], literal_eval(cmd[1]))
|
|
self.comLog('< OK')
|
|
try:
|
|
self.comLog('> %s', cmd[0])
|
|
reply = self._plc.read(cmd[0])
|
|
self.comLog('< %s', reply)
|
|
return str(reply)
|
|
except Exception as e:
|
|
if self._plc:
|
|
self.comLog('? %r', e)
|
|
self.log.exception('error in plc read')
|
|
self._plc = None
|
|
raise
|
|
|
|
|
|
class LogoMixin(HasIO):
|
|
ioclass = IO
|
|
|
|
def get_vm_value(self, vm_address):
|
|
return literal_eval(self.io.communicate(vm_address))
|
|
|
|
def set_vm_value(self, vm_address, value):
|
|
return literal_eval(self.io.communicate(f'{vm_address} {round(value)}'))
|
|
|
|
|
|
class DigitalActuator(LogoMixin, Writable):
|
|
"""output with or without feedback"""
|
|
output_addr = Property('VM address output', datatype=StringType(), default='')
|
|
target_addr = Property('VM address target', datatype=StringType(), default='')
|
|
feedback_addr = Property('VM address feedback', datatype=StringType(), default='')
|
|
target = Parameter('target', datatype=BoolType())
|
|
value = Parameter('feedback or output', datatype=BoolType())
|
|
_input = 'output'
|
|
_value_addr = None
|
|
_target_addr = None
|
|
_fault = None
|
|
|
|
def doPoll(self):
|
|
self.read_status() # this calls also read_value
|
|
|
|
def checkProperties(self):
|
|
super().checkProperties()
|
|
if self.feedback_addr:
|
|
self._input = 'feedback'
|
|
self._value_addr = self.feedback_addr
|
|
else:
|
|
self._input = 'output'
|
|
self._value_addr = self.output_addr
|
|
self._target_addr = self.target_addr or self.output_addr
|
|
if self._target_addr and self._value_addr:
|
|
self._check_feedback = self.feedback_addr and self.output_addr
|
|
return
|
|
raise ConfigError('need either output_addr or both feedback_addr and target_addr')
|
|
|
|
def initialReads(self):
|
|
super().initialReads()
|
|
self.target = self.read_value()
|
|
|
|
def set_fault(self, value, statustext):
|
|
"""on a fault condition, set target to value
|
|
|
|
and status to statustext
|
|
"""
|
|
self.write_target(value)
|
|
self._fault = statustext
|
|
self.read_status()
|
|
|
|
def reset_fault(self):
|
|
"""reset fault condition"""
|
|
self._fault = None
|
|
self.read_status()
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self._value_addr)
|
|
|
|
def write_target(self, target):
|
|
self._fault = None
|
|
self.set_vm_value(self._target_addr, target)
|
|
value = self.read_value()
|
|
if value != target and self.feedback_addr:
|
|
# retry only if we have a feedback and the feedback did not change yet
|
|
for i in range(20):
|
|
if self.read_value() == target:
|
|
self.log.debug('tried %d times', i)
|
|
break
|
|
self.set_vm_value(self._target_addr, target)
|
|
|
|
def read_status(self):
|
|
if self._fault:
|
|
return ERROR, self._fault
|
|
value = self.read_value()
|
|
if value != self.target:
|
|
return ERROR, 'value and target do not match'
|
|
if self._check_feedback:
|
|
if value != self.get_vm_value(self._check_feedback):
|
|
return ERROR, f'feedback does not match output'
|
|
if self.feedback_addr:
|
|
return IDLE, 'feedback confirmed'
|
|
return IDLE, ''
|
|
|
|
|
|
class DelayedActuator(DigitalActuator, Drivable):
|
|
delay_addr = Property('address of delay value', StringType())
|
|
_pulse_start = 0
|
|
_pulse_end = 0
|
|
_fault = None
|
|
|
|
def read_status(self):
|
|
if self._fault:
|
|
return ERROR, self._fault
|
|
value = self.read_value()
|
|
fberror = None
|
|
if self._pulse_start:
|
|
now = monotonic()
|
|
if now < self._pulse_start + 1:
|
|
value = 1
|
|
elif now < self._pulse_end - 1:
|
|
if not value:
|
|
self._pulse_start = 0
|
|
return WARN, f'{self._input} is not on during pulse - due to interlock?'
|
|
if value:
|
|
if now < self._pulse_end + 1:
|
|
return BUSY, 'pulsing'
|
|
self.log.warn('pulse timeout')
|
|
self.set_vm_value(self._target_addr, 0)
|
|
self._pulse_start = 0
|
|
self.set_vm_value(self.delay_addr, 0)
|
|
elif self._check_feedback and value != self.get_vm_value(self._check_feedback):
|
|
fberror = ERROR, f'feedback does not match output'
|
|
if value != self.target:
|
|
return ERROR, 'value does not match target'
|
|
self.setFastPoll(False)
|
|
if fberror:
|
|
return fberror
|
|
if self.feedback_addr:
|
|
return IDLE, 'feedback confirmed'
|
|
return IDLE, ''
|
|
|
|
def write_target(self, value):
|
|
self._pulse_start = 0
|
|
if not value:
|
|
self.set_vm_value(self.delay_addr, 0)
|
|
return super().write_target(value)
|
|
|
|
@Command(argument=FloatRange(0))
|
|
def pulse(self, delay):
|
|
"""open for delay seconds"""
|
|
self.set_vm_value(self.delay_addr, delay)
|
|
self.set_vm_value(self._target_addr, 1)
|
|
self.set_vm_value(self._target_addr, 0)
|
|
self.setFastPoll(True, 0.5)
|
|
self.status = BUSY, 'pulsing'
|
|
now = monotonic()
|
|
self._pulse_start = now
|
|
self._pulse_end = now + delay
|
|
|
|
|
|
class Value(LogoMixin, Readable):
|
|
addr = Property('VM address', datatype=StringType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.addr)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class DigitalValue(Value):
|
|
value = Parameter('airpressure state', datatype=BoolType())
|
|
|
|
|
|
# TODO: the following classes are too specific, they have to be moved
|
|
|
|
class Pressure(LogoMixin, Drivable):
|
|
vm_address = Property('VM address', datatype=StringType())
|
|
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
|
|
|
|
# pollinterval = 0.5
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class Airpressure(LogoMixin, Readable):
|
|
vm_address = Property('VM address', datatype=StringType())
|
|
value = Parameter('airpressure state', datatype=BoolType())
|
|
|
|
# pollinterval = 0.5
|
|
|
|
def read_value(self):
|
|
if (self.get_vm_value(self.vm_address) > 500):
|
|
return 1
|
|
else:
|
|
return 0
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class Valve(LogoMixin, Drivable):
|
|
vm_address_input = Property('VM address input', datatype=StringType())
|
|
vm_address_output = Property('VM address output', datatype=StringType())
|
|
|
|
target = Parameter('Valve target', datatype=BoolType())
|
|
value = Parameter('Value state', datatype=BoolType())
|
|
_remaining_tries = None
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_input)
|
|
|
|
def write_target(self, target):
|
|
self.set_vm_value(self.vm_address_output, target)
|
|
self._remaining_tries = 5
|
|
self.status = BUSY, 'switching'
|
|
self.setFastPoll(True, 0.5)
|
|
|
|
def read_status(self):
|
|
self.log.debug('read_status')
|
|
value = self.read_value()
|
|
self.log.debug('value %d target %d', value, self.target)
|
|
if value != self.target:
|
|
if self._remaining_tries is None:
|
|
self.target = self.read_value()
|
|
return IDLE, ''
|
|
self._remaining_tries -= 1
|
|
if self._remaining_tries < 0:
|
|
self.setFastPoll(False)
|
|
return ERROR, 'too many tries to switch'
|
|
self.set_vm_value(self.vm_address_output, self.target)
|
|
return BUSY, 'switching (try again)'
|
|
self.setFastPoll(False)
|
|
return IDLE, ''
|
|
|
|
|
|
class FluidMachines(LogoMixin, Drivable):
|
|
vm_address_output = Property('VM address output', datatype=StringType())
|
|
|
|
target = Parameter('Valve target', datatype=BoolType())
|
|
value = Parameter('Valve state', datatype=BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_output)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address_output, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class TempSensor(LogoMixin, Readable):
|
|
vm_address = Property('VM address', datatype=StringType())
|
|
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class HeaterParam(LogoMixin, Writable):
|
|
vm_address = Property('VM address output', datatype=StringType())
|
|
|
|
target = Parameter('Heater target', datatype=IntRange())
|
|
|
|
value = Parameter('Heater Param', datatype=IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class controlHeater(LogoMixin, Writable):
|
|
vm_address = Property('VM address on switch', datatype=StringType())
|
|
|
|
target = Parameter('Heater state', datatype=BoolType())
|
|
|
|
value = Parameter('Heater state', datatype=BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_on)
|
|
|
|
def write_target(self, target):
|
|
if (target):
|
|
return self.set_vm_value(self.vm_address, True)
|
|
else:
|
|
return self.set_vm_value(self.vm_address, False)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
|
|
|
|
class safetyfeatureState(LogoMixin, Readable):
|
|
vm_address = Property('VM address state', datatype=StringType())
|
|
|
|
value = Parameter('safety Feature state', datatype=BoolType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class safetyfeatureParam(LogoMixin, Writable):
|
|
vm_address = Property('VM address output', datatype=StringType())
|
|
|
|
target = Parameter('safety Feature target', datatype=IntRange())
|
|
|
|
value = Parameter('safety Feature Param', datatype=IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class comparatorgekoppeltParam(LogoMixin, Writable):
|
|
vm_address_1 = Property('VM address output', datatype=StringType())
|
|
vm_address_2 = Property('VM address output', datatype=StringType())
|
|
|
|
target = Parameter('safety Feature target', datatype=IntRange())
|
|
value = Parameter('safety Feature Param', datatype=IntRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address_1)
|
|
|
|
def write_target(self, target):
|
|
self.set_vm_value(self.vm_address_1, target)
|
|
return self.set_vm_value(self.vm_address_2, target)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|