- check that dil5 still works! Change-Id: Ibe98e64088f2f886888af170a1f38d699927eb58
272 lines
9.2 KiB
Python
272 lines
9.2 KiB
Python
# *****************************************************************************
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
# the terms of the GNU General Public License as published by the Free Software
|
|
# Foundation; either version 2 of the License, or (at your option) any later
|
|
# version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
# details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along with
|
|
# this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
#
|
|
#
|
|
#
|
|
# *****************************************************************************
|
|
import sys
|
|
from time import monotonic
|
|
from ast import literal_eval
|
|
import snap7
|
|
from frappy.core import Attached, Command, Readable, Parameter, FloatRange, HasIO, Property, \
|
|
IDLE, BUSY, WARN, ERROR, Writable, Drivable, Communicator
|
|
from frappy.datatypes import StringType, BoolType, IntRange, OrType, Int32
|
|
from frappy.errors import CommunicationFailedError, ConfigError
|
|
from threading import RLock
|
|
|
|
|
|
class IO(Communicator):
|
|
tcap_client = Property('tcap_client', IntRange())
|
|
tsap_server = Property('tcap_server', IntRange())
|
|
ip_address = Property('numeric ip address', StringType())
|
|
_plc = None
|
|
_last_try = 0
|
|
|
|
def initModule(self):
|
|
self._lock = RLock()
|
|
super().initModule()
|
|
|
|
def _init(self):
|
|
if monotonic() < self._last_try + 10:
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
self._plc = snap7.logo.Logo()
|
|
sys.stderr = open('/dev/null', 'w') # suppress output of snap7
|
|
try:
|
|
self._plc.connect(self.ip_address, self.tcap_client, self.tsap_server)
|
|
if self._plc.get_connected():
|
|
return
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
sys.stderr = sys.stdout
|
|
self._plc = None
|
|
self._last_try = monotonic()
|
|
raise CommunicationFailedError('logo PLC not reachable')
|
|
|
|
def communicate(self, cmd):
|
|
with self._lock:
|
|
if not self._plc:
|
|
self._init()
|
|
cmd = cmd.split(maxsplit=1)
|
|
if len(cmd) == 2:
|
|
self.comLog('> %s %s', cmd[0], cmd[1])
|
|
self._plc.write(cmd[0], literal_eval(cmd[1]))
|
|
self.comLog('< OK')
|
|
try:
|
|
self.comLog('> %s', cmd[0])
|
|
reply = self._plc.read(cmd[0])
|
|
self.comLog('< %s', reply)
|
|
return str(reply)
|
|
except Exception as e:
|
|
if self._plc:
|
|
self.comLog('? %r', e)
|
|
self.log.exception('error in plc read')
|
|
self._plc = None
|
|
raise
|
|
|
|
@Command(StringType(), result=Int32)
|
|
def read(self, vm_address):
|
|
return self._plc.read(vm_address)
|
|
|
|
@Command((StringType(), Int32))
|
|
def write(self, vm_address, value):
|
|
self._plc.write(vm_address, value)
|
|
|
|
|
|
class LogoMixin(HasIO):
|
|
ioclass = IO
|
|
|
|
def get_vm_value(self, vm_address, scale=1):
|
|
return self.io.read(vm_address) * scale
|
|
|
|
def set_vm_value(self, vm_address, value, scale=1):
|
|
self.io.write(vm_address, round(value / scale))
|
|
return self.io.read(vm_address) * scale
|
|
|
|
|
|
class DigitalActuator(LogoMixin, Writable):
|
|
"""output with or without feedback"""
|
|
output_addr = Property('VM address output', datatype=StringType(), default='')
|
|
target_addr = Property('VM address target', datatype=StringType(), default='')
|
|
feedback_addr = Property('VM address feedback', datatype=StringType(), default='')
|
|
target = Parameter('target', datatype=BoolType())
|
|
value = Parameter('feedback or output', datatype=BoolType())
|
|
_input = 'output'
|
|
_value_addr = None
|
|
_target_addr = None
|
|
_fault = None
|
|
|
|
def doPoll(self):
|
|
self.read_status() # this calls also read_value
|
|
|
|
def checkProperties(self):
|
|
super().checkProperties()
|
|
if self.feedback_addr:
|
|
self._input = 'feedback'
|
|
self._value_addr = self.feedback_addr
|
|
else:
|
|
self._input = 'output'
|
|
self._value_addr = self.output_addr
|
|
self._target_addr = self.target_addr or self.output_addr
|
|
if self._target_addr and self._value_addr:
|
|
self._check_feedback = self.feedback_addr and self.output_addr
|
|
return
|
|
raise ConfigError('need either output_addr or both feedback_addr and target_addr')
|
|
|
|
def initialReads(self):
|
|
super().initialReads()
|
|
self.target = self.read_value()
|
|
|
|
def set_fault(self, value, statustext):
|
|
"""on a fault condition, set target to value
|
|
|
|
and status to statustext
|
|
"""
|
|
self.write_target(value)
|
|
self._fault = statustext
|
|
self.read_status()
|
|
|
|
def reset_fault(self):
|
|
"""reset fault condition"""
|
|
self._fault = None
|
|
self.read_status()
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self._value_addr)
|
|
|
|
def write_target(self, target):
|
|
self._fault = None
|
|
self.set_vm_value(self._target_addr, target)
|
|
value = self.read_value()
|
|
if value != target and self.feedback_addr:
|
|
# retry only if we have a feedback and the feedback did not change yet
|
|
for i in range(20):
|
|
if self.read_value() == target:
|
|
self.log.debug('tried %d times', i)
|
|
break
|
|
self.set_vm_value(self._target_addr, target)
|
|
|
|
def read_status(self):
|
|
if self._fault:
|
|
return ERROR, self._fault
|
|
value = self.read_value()
|
|
if value != self.target:
|
|
return ERROR, 'value and target do not match'
|
|
if self._check_feedback:
|
|
if value != self.get_vm_value(self._check_feedback):
|
|
return ERROR, f'feedback does not match output'
|
|
if self.feedback_addr:
|
|
return IDLE, 'feedback confirmed'
|
|
return IDLE, ''
|
|
|
|
|
|
class DelayedActuator(DigitalActuator, Drivable):
|
|
delay_addr = Property('address of delay value', StringType())
|
|
_pulse_start = 0
|
|
_pulse_end = 0
|
|
_fault = None
|
|
|
|
def read_status(self):
|
|
if self._fault:
|
|
return ERROR, self._fault
|
|
value = self.read_value()
|
|
fberror = None
|
|
if self._pulse_start:
|
|
now = monotonic()
|
|
if now < self._pulse_start + 1:
|
|
value = 1
|
|
elif now < self._pulse_end - 1:
|
|
if not value:
|
|
self._pulse_start = 0
|
|
return WARN, f'{self._input} is not on during pulse - due to interlock?'
|
|
if value:
|
|
if now < self._pulse_end + 1:
|
|
return BUSY, 'pulsing'
|
|
self.log.warn('pulse timeout')
|
|
self.set_vm_value(self._target_addr, 0)
|
|
self._pulse_start = 0
|
|
self.set_vm_value(self.delay_addr, 0)
|
|
elif self._check_feedback and value != self.get_vm_value(self._check_feedback):
|
|
fberror = ERROR, f'feedback does not match output'
|
|
if value != self.target:
|
|
return ERROR, 'value does not match target'
|
|
self.setFastPoll(False)
|
|
if fberror:
|
|
return fberror
|
|
if self.feedback_addr:
|
|
return IDLE, 'feedback confirmed'
|
|
return IDLE, ''
|
|
|
|
def write_target(self, value):
|
|
self._pulse_start = 0
|
|
if not value:
|
|
self.set_vm_value(self.delay_addr, 0)
|
|
return super().write_target(value)
|
|
|
|
@Command(argument=FloatRange(0))
|
|
def pulse(self, delay):
|
|
"""open for delay seconds"""
|
|
self.set_vm_value(self.delay_addr, delay)
|
|
self.set_vm_value(self._target_addr, 1)
|
|
self.set_vm_value(self._target_addr, 0)
|
|
self.setFastPoll(True, 0.5)
|
|
self.status = BUSY, 'pulsing'
|
|
now = monotonic()
|
|
self._pulse_start = now
|
|
self._pulse_end = now + delay
|
|
|
|
|
|
class Sensor(LogoMixin, Readable):
|
|
addr = Property('VM address', datatype=StringType())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.addr)
|
|
|
|
def read_status(self):
|
|
return IDLE, ''
|
|
|
|
|
|
class AnalogOutput(Sensor, Writable):
|
|
output_addr = Property('VM address output', datatype=StringType(), default='')
|
|
|
|
def checkProperties(self):
|
|
super().checkProperties()
|
|
if not self.output_addr:
|
|
self.output_addr = self.addr
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address, self.scale)
|
|
|
|
def write_target(self, target):
|
|
return self.set_vm_value(self.vm_address, target, self.scale)
|
|
|
|
|
|
class Pressure(Sensor):
|
|
value = Parameter('pressure', datatype=FloatRange(unit='mbar'))
|
|
|
|
|
|
class Resistor(Sensor):
|
|
value = Parameter('resistance', datatype=FloatRange(unit='Ohm'))
|
|
|
|
|
|
class Comparator(LogoMixin, Readable):
|
|
vm_address = Property('VM address', datatype=StringType())
|
|
value = Parameter('airpressure state', datatype=BoolType())
|
|
threshold = Property('threshold for True', FloatRange())
|
|
|
|
def read_value(self):
|
|
return self.get_vm_value(self.vm_address) > self.threshold
|