Files
seweb/secop.py

174 lines
6.7 KiB
Python

import logging
from frappy.client import SecopClient
def convert_par(module, name, par):
result = dict(type='input', name=module+":"+name, title=name)
if par.get('readonly', True):
result['type'] = 'rdonly'
else:
result['command'] = 'change %s:%s' % (module, name)
if par['datainfo']['type'] == 'enum':
result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()]
result['type'] = 'enum'
elif par['datainfo']['type'] == 'bool':
result['type'] = 'checkbox'
if par['description']:
result['info'] = par['description']
return result
def convert_cmd(module, name, cmd):
result = dict(type='pushbutton', name=module+":"+name, title=name)
result['command'] = 'do %s:%s' % (module, name)
argument = cmd['datainfo'].get('argument')
if cmd['datainfo'].get('result'):
result['result'] = True
else:
if not argument: # simple command like stop
return result
result['button'] = not argument
# result['type'] = pushbutton will be replaced below
if argument:
if argument['type'] == 'enum':
result['enum_names'] = [dict(title=k, value=v) for k, v in argument['members'].items()]
result['type'] = 'enum'
elif argument['type'] == 'bool':
result['type'] = 'checkbox'
else:
result['type'] = 'input'
else:
result['type'] = 'rdonly'
if cmd['description']:
result['info'] = cmd['description']
return result
class SecopInteractor(SecopClient):
prio_par = ["value", "status", "target"]
hide_par = ["baseclass", "class", "pollinterval"]
skip_par = ["status2"]
def __init__(self, uri, node_map):
super().__init__(uri)
self.module_updates = set()
self.param_updates = set()
self.updates = {}
try:
self.connect()
node_map.update({k: self for k in self.modules})
self.register_callback(None, updateItem=self.updateItem)
except Exception as e:
print(repr(e))
def add_main_components(self, components):
for name, desc in self.modules.items():
parameters = desc['parameters']
component = {'type': 'rdonly' if 'value' in parameters else 'none'}
if 'status' in parameters:
component['statusname'] = f'{name}:status'
targetpar = parameters.get('target')
if targetpar:
component.update(convert_par(name, 'target', targetpar))
component['targetname'] = f'{name}:target'
info = desc['properties'].get('description')
if info:
component['info'] = info
component['name'] = f'{name}:value'
component['title'] = name
components.append(component)
self.param_updates.add('value')
self.param_updates.add('status')
self.param_updates.add('target')
def get_components(self, path):
module = self.modules[path]
self.module_updates.add(path) # TODO: remove others?
parameters = dict(module["parameters"])
components = []
for name in SecopInteractor.skip_par:
if name in parameters:
parameters.pop(name)
for name in SecopInteractor.prio_par:
if name in parameters:
components.append(convert_par(path, name, parameters.pop(name)))
components1 = []
for name in SecopInteractor.hide_par:
if name in parameters:
components1.append(convert_par(path, name, parameters.pop(name)))
for name, par in parameters.items():
components.append(convert_par(path, name, par))
components.extend(components1)
for name, cmd in module.get("commands", {}).items():
components.append(convert_cmd(path, name, cmd))
return components
def updateItem(self, module, parameter, entry):
key = module, parameter
# print(key, entry)
if module in self.module_updates or parameter in self.param_updates:
name = f'{module}:{parameter}'
if entry.readerror:
item = {'name': name, 'error': str(entry.readerror)}
elif parameter == 'status':
# statuscode: 0: DISABLED, 1: IDLE, 2: WARN, 3: BUSY, 4: ERROR
statuscode, statustext = entry[0]
formatted = statuscode.name + (f', {statustext}' if statustext else '')
item = {'name': name, 'value': str(entry), 'statuscode': entry[0][0] // 100,
'formatted': formatted}
else:
item = {'name': name, 'value': str(entry), 'formatted': entry.formatted()}
# print(item)
self.updates[key] = item
def update_main(self):
cache = self.cache
for modname in self.modules:
for param in 'value', 'status', 'target':
key = modname, param
if key in cache:
self.updateItem(*key, cache[key])
def update_params(self, path):
cache = self.cache
for param in self.modules[path]['parameters']:
key = path, param
if key in cache:
self.updateItem(*key, cache[key])
def handle_command(self, command):
"""handle command if we can, else return False"""
if not command.strip():
return dict(type='accept-command')
is_param = True
if command.startswith('change '):
command = command[7:]
elif command.startswith('do '):
is_param = False
command = command[3:]
modpar, _, strvalue = command.partition(' ')
module, _, parameter = modpar.partition(':')
if not parameter:
parameter = 'target'
if module not in self.modules:
return None
logging.info('SENDCOMMAND %r', command)
if is_param:
try:
entry = self.setParameterFromString(module, parameter, strvalue)
item = {'name': f'{module}:{parameter}', 'value': str(entry), 'formatted': entry.formatted()}
except Exception as e:
print(f"{e!r} converting {strvalue} to {self.modules[module]['parameters'][parameter]['datatype']}")
self.updates[module, parameter] = item
return True
# called a command
formatted = self.execCommandFromString(module, parameter, strvalue)[0] # ignore qualifiers
return {'name': f'{module}:{parameter}', 'value': str(result), 'formatted': formatted}
def get_updates(self):
updates, self.updates = self.updates, {}
return list(updates.values())
def info(self):
return ["na"]