From 7a2e7642620a69313896ddd261473385bf2e8777 Mon Sep 17 00:00:00 2001 From: Alexander Zaft Date: Wed, 28 Jun 2023 09:02:05 +0200 Subject: [PATCH] frappy_mlz: Add Zapf PLC adds a zapf-based PLC connection scanner. Change-Id: Icc0ded7e7a8cc5a83d7527d9b26b37c49e9b8674 Reviewed-on: https://forge.frm2.tum.de/review/c/secop/frappy/+/31471 Tested-by: Jenkins Automated Tests Reviewed-by: Enrico Faulhaber Reviewed-by: Alexander Zaft --- frappy_mlz/plc_zapf.py | 363 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 363 insertions(+) create mode 100644 frappy_mlz/plc_zapf.py diff --git a/frappy_mlz/plc_zapf.py b/frappy_mlz/plc_zapf.py new file mode 100644 index 0000000..4fb45d6 --- /dev/null +++ b/frappy_mlz/plc_zapf.py @@ -0,0 +1,363 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Alexander Zaft +# +# ***************************************************************************** + +import re + +import zapf +import zapf.spec as zspec +from zapf.io import PlcIO +from zapf.scan import Scanner + +from frappy.core import BUSY, DISABLED, ERROR, FINALIZING, IDLE, \ + INITIALIZING, STARTING, UNKNOWN, WARN, Attached, Command, Communicator, \ + Drivable, Parameter, Property, Readable +from frappy.datatypes import UNLIMITED, ArrayOf, BLOBType, EnumType, \ + FloatRange, IntRange, StatusType, StringType, ValueType +from frappy.dynamic import Pinata +from frappy.errors import CommunicationFailedError, ImpossibleError, \ + IsBusyError, NoSuchParameterError, ReadOnlyError + +# Untested with real hardware, only testplc_2021_09.py + + +def internalize_name(name): + return re.sub(r'[^a-zA-Z0-9_]+', '_', name, re.ASCII) + + +ERROR_MAP = { + # should not happen. but better to have it here anyway + 5: NoSuchParameterError, + # if this occurs, something may have gone wrong with digesting the scanner + # data + 6: ReadOnlyError, + # Most likely from devices you cannot poll when busy. + 7: IsBusyError, +} + + +class ZapfPinata(Pinata): + """The Pinata device for a PLC that can be accessed according to PILS. + + See https://forge.frm2.tum.de/public/doc/plc/master/html/ + + Instantiates the classes with the base mapped class, which will be replaced + by initModule, so modules can also be configured manually in the config + file. + """ + iodev = Property('Connection to PLC', StringType()) + + def scanModules(self): + try: + self._plcio = PlcIO(self.iodev, self.log) + except zapf.CommError as e: + raise CommunicationFailedError('could not connect to plc') from e + scanner = Scanner(self._plcio, self.log) + for devinfo in scanner.scan_devices(): + if zspec.LOWLEVEL in devinfo.info.get('flags'): + self.log.debug('device %d (%s) is lowlevel, skipping', + devinfo.number, devinfo.name) + continue + device = scanner.get_device(devinfo) + if device is None: + self.log.info(f'{devinfo.name} unsupported') + continue + basecls = CLS_MAP.get(device.__class__, None) + if basecls is None: + self.log.info('No mapping found for %s, (class %s)', + devinfo.name, device.__class__.__name__) + continue + mod_cls = basecls.makeModuleClass(device, devinfo) + config = { + 'cls': mod_cls, + 'plcio': device, + 'description': devinfo.info['description'], + 'plc_name': devinfo.name, + '_pinata': self.name, + } + if devinfo.info['basetype'] != 'enum' \ + and not issubclass(basecls, PLCCommunicator): + config['value'] = { + # internal limit here is 2**64, zapf reports 2**128 + 'min': max(devinfo.info['absmin'], -UNLIMITED), + 'max': min(devinfo.info['absmax'], UNLIMITED), + } + if devinfo.info['access'] == 'rw': + config['target'] = { + 'min': config['value']['min'], + 'max': config['value']['max'], + } + name = internalize_name(devinfo.name) + yield (name, config) + self._plcio.start_cache() + + def shutdownModule(self): + """Shutdown the module, _plcio might be invalid after this. Needs to be + recreated by scanModules.""" + self._plcio.stop_cache() + self._plcio.proto.disconnect() + + +STATUS_MAP = { + zspec.DevStatus.RESET: (INITIALIZING, 'resetting'), + zspec.DevStatus.IDLE: (IDLE, 'idle'), + zspec.DevStatus.DISABLED: (DISABLED, 'disabled'), + zspec.DevStatus.WARN: (WARN, 'warning'), + zspec.DevStatus.START: (STARTING, 'starting'), + zspec.DevStatus.BUSY: (BUSY, 'busy'), + zspec.DevStatus.STOP: (FINALIZING, 'stopping'), + zspec.DevStatus.ERROR: (ERROR, 'error (please reset)'), + zspec.DevStatus.DIAGNOSTIC_ERROR: (ERROR, 'hard error (please check plc)'), +} + + +class PLCBase: + status = Parameter(datatype=StatusType(Drivable, 'INITIALIZING', + 'DISABLED', 'STARTING')) + status_code = Parameter('raw internal status code', + IntRange(0, 2**32-1)) + plcio = Property('plc io device', ValueType()) + plc_name = Property('plc io device', StringType(), export=True) + _pinata = Attached(ZapfPinata) # TODO: make this automatic? + + @classmethod + def makeModuleClass(cls, device, devinfo): + # add parameters and commands according to device info + add_members = {} + # set correct enums for value/target + if devinfo.info['basetype'] == 'enum': + rmap = {v: k for k, v in devinfo.info['enum_r'].items()} + read_enum = EnumType(rmap) + add_members['value'] = Parameter(datatype=read_enum) + if hasattr(cls, 'target'): + #wmap = {k:v for k, v in devinfo.info['enum_w'].items()} + #write_enum = EnumType(wmap) + write_enum = EnumType(devinfo.info['enum_w']) + add_members['target'] = Parameter(datatype=write_enum) + + for parameter in device.list_params(): + info = devinfo.info['params'][parameter] + iname = internalize_name(parameter) + readonly = info.get('access', 'ro') != 'rw' + dataty = cls._map_datatype(info) + if dataty is None: + continue + param = Parameter(info['description'], + dataty, + readonly=readonly) + + def read_param(self, parameter=parameter): + code, val = self.plcio.get_param_raw(parameter) + if code > 4: + raise ERROR_MAP[code](f'Error when reading parameter' + f'{parameter}: {code}') + return val + + def write_param(self, value, parameter=parameter): + code, val = self.plcio.set_param_raw(parameter, value) + if code > 4: + raise ERROR_MAP[code](f'Error when setting parameter' + f'{parameter} to {value!r}: {code}') + return val + + # enums can have asymmetric read and write variants. this should be + # checked + if info['basetype'] == 'enum': + allowed = frozenset(info['enum_w'].values()) + #pylint: disable=function-redefined + def write_param(self, value, allowed=allowed, parameter=parameter): + if value not in allowed: + raise ValueError(f'Invalid value for writing' + f' {parameter}: {value!r}') + + code, val = self.plcio.set_param_raw(parameter, value) + if code > 4: + raise ERROR_MAP[code](f'Error when setting parameter' + f'{parameter} to {value!r}: {code}') + return val + + add_members[iname] = param + add_members['read_' + iname] = read_param + if readonly: + continue + add_members['write_' + iname] = write_param + + for command in device.list_funcs(): + info = devinfo.info['funcs'][command] + iname = internalize_name(command) + if info['argument']: + arg = cls._map_datatype(info['argument']) + else: + arg = None + if info['result']: + result = cls._map_datatype(info['result']) + else: + result = None + def exec_command(self, arg=None, command=command): + # TODO: commands return , + return self.plcio.exec_func(command, arg) + decorator = Command(arg, + result = result, + description=info['description'], + ) + + func = decorator(exec_command) + add_members['call_' + iname] = func + if not add_members: + return cls + new_name = '_' + cls.__name__ + '_' \ + + internalize_name("blub") + return type(new_name, (cls,), add_members) + + @classmethod + def _map_datatype(cls, info): + dataty = info['basetype'] + if dataty == 'int': + return IntRange(info['min_value'], info['max_value']) + if dataty == 'float': + return FloatRange(info['min_value'], info['max_value']) + if dataty == 'enum': + mapping = {v: k for k, v in info['enum_r'].items()} + return EnumType(mapping) + return None + + def read_status(self): + state, reason, aux, err_id = self.plcio.read_status() + if state in STATUS_MAP: + status, m = STATUS_MAP[state] + else: + status, m = UNKNOWN, 'unknown state 0x%x' % state + msg = [m] + reason = zapf.spec.ReasonMap[reason] + if reason: + msg.append(reason) + if aux: + msg.append(self.plcio.decode_aux(aux)) + if err_id: + msg.append(self.plcio.decode_errid(err_id)) + return status, ', '.join(msg) + + def read_status_code(self): + state, reason, aux, _ = self.plcio.read_status() + return state << 28 | reason << 24 | aux + + @Command() + def stop(self): + """Stop the operation of this module. + + :raises: + ImpossibleError: if the command is called while the module is + not busy + """ + if not self.plcio.change_status((zapf.DevStatus.BUSY,), + zapf.DevStatus.STOP): + self.log.info('stop was called when device was not busy') + # TODO: off/on? + + @Command() + def reset(self): + """Tries to reset this module. + + :raises: + ImpossibleError: when called while the module is not in an error + state. + """ + if not self.plcio.reset(): + raise ImpossibleError('reset called when the device is not in' + 'an error state!') + + +class PLCValue(PLCBase): + """Base class for all but Communicator""" + def read_value(self): + return self.plcio.read_value_raw() # read_value maps enums on zapf side + + def read_target(self): + return self.plcio.read_target_raw() + + def write_target(self, value): + self.plcio.change_target_raw(value) + + +class PLCReadable(PLCValue, Readable): + """Readable value, scanned from PLC.""" + description = Property('the modules description', + datatype=StringType(isUTF8=True)) + + +class PLCDrivable(PLCValue, Drivable): + """Drivable, scanned from PLC.""" + description = Property('the modules description', + datatype=StringType(isUTF8=True)) + + +class PLCCommunicator(PLCBase, Communicator): + status = Parameter('current status of the module') + + @Command(BLOBType(), result=BLOBType()) + def communicate(self, command): + return self.plcio.communicate(command) + + +class Sensor(PLCReadable): + pass + + +class AnalogOutput(PLCDrivable): + pass + + +class DiscreteInput(PLCReadable): + value = Parameter(datatype=IntRange()) + +class DiscreteOutput(PLCDrivable): + value = Parameter(datatype=IntRange()) + target = Parameter(datatype=IntRange()) + + +class VectorInput(PLCReadable): + value = Parameter(datatype=ArrayOf(FloatRange())) + + +class VectorOutput(PLCDrivable): + value = Parameter(datatype=ArrayOf(FloatRange())) + target = Parameter(datatype=ArrayOf(FloatRange())) + + +CLS_MAP = { + zapf.device.SimpleDiscreteIn: DiscreteInput, + zapf.device.SimpleAnalogIn: Sensor, + zapf.device.Keyword: DiscreteOutput, + zapf.device.RealValue: AnalogOutput, + zapf.device.SimpleDiscreteOut: DiscreteOutput, + zapf.device.SimpleAnalogOut: PLCDrivable, + zapf.device.StatusWord: DiscreteInput, + zapf.device.DiscreteIn: DiscreteInput, + zapf.device.AnalogIn: Sensor, + zapf.device.DiscreteOut: DiscreteOutput, + zapf.device.AnalogOut: PLCDrivable, + zapf.device.FlatIn: Sensor, + zapf.device.FlatOut: AnalogOutput, + zapf.device.ParamIn: Sensor, + zapf.device.ParamOut: AnalogOutput, + zapf.device.VectorIn: VectorInput, + zapf.device.VectorOut: VectorOutput, + zapf.device.MessageIO: PLCCommunicator, +}