174 lines
6.7 KiB
Python
174 lines
6.7 KiB
Python
import logging
|
|
from frappy.client import SecopClient
|
|
|
|
|
|
def convert_par(module, name, par):
|
|
result = dict(type='input', name=module+":"+name, title=name)
|
|
if par.get('readonly', True):
|
|
result['type'] = 'rdonly'
|
|
else:
|
|
result['command'] = 'change %s:%s' % (module, name)
|
|
if par['datainfo']['type'] == 'enum':
|
|
result['enum_names'] = [dict(title=k, value=v) for k, v in par['datainfo']['members'].items()]
|
|
result['type'] = 'enum'
|
|
elif par['datainfo']['type'] == 'bool':
|
|
result['type'] = 'checkbox'
|
|
if par['description']:
|
|
result['info'] = par['description']
|
|
return result
|
|
|
|
|
|
def convert_cmd(module, name, cmd):
|
|
result = dict(type='pushbutton', name=module+":"+name, title=name)
|
|
result['command'] = 'do %s:%s' % (module, name)
|
|
argument = cmd['datainfo'].get('argument')
|
|
if cmd['datainfo'].get('result'):
|
|
result['result'] = True
|
|
else:
|
|
if not argument: # simple command like stop
|
|
return result
|
|
result['button'] = not argument
|
|
# result['type'] = pushbutton will be replaced below
|
|
if argument:
|
|
if argument['type'] == 'enum':
|
|
result['enum_names'] = [dict(title=k, value=v) for k, v in argument['members'].items()]
|
|
result['type'] = 'enum'
|
|
elif argument['type'] == 'bool':
|
|
result['type'] = 'checkbox'
|
|
else:
|
|
result['type'] = 'input'
|
|
else:
|
|
result['type'] = 'rdonly'
|
|
if cmd['description']:
|
|
result['info'] = cmd['description']
|
|
return result
|
|
|
|
|
|
class SecopInteractor(SecopClient):
|
|
prio_par = ["value", "status", "target"]
|
|
hide_par = ["baseclass", "class", "pollinterval"]
|
|
skip_par = ["status2"]
|
|
|
|
def __init__(self, uri, node_map):
|
|
super().__init__(uri)
|
|
self.module_updates = set()
|
|
self.param_updates = set()
|
|
self.updates = {}
|
|
try:
|
|
self.connect()
|
|
node_map.update({k: self for k in self.modules})
|
|
self.register_callback(None, updateItem=self.updateItem)
|
|
except Exception as e:
|
|
print(repr(e))
|
|
|
|
def add_main_components(self, components):
|
|
for name, desc in self.modules.items():
|
|
parameters = desc['parameters']
|
|
component = {'type': 'rdonly' if 'value' in parameters else 'none'}
|
|
if 'status' in parameters:
|
|
component['statusname'] = f'{name}:status'
|
|
targetpar = parameters.get('target')
|
|
if targetpar:
|
|
component.update(convert_par(name, 'target', targetpar))
|
|
component['targetname'] = f'{name}:target'
|
|
info = desc['properties'].get('description')
|
|
if info:
|
|
component['info'] = info
|
|
component['name'] = f'{name}:value'
|
|
component['title'] = name
|
|
components.append(component)
|
|
self.param_updates.add('value')
|
|
self.param_updates.add('status')
|
|
self.param_updates.add('target')
|
|
|
|
def get_components(self, path):
|
|
module = self.modules[path]
|
|
self.module_updates.add(path) # TODO: remove others?
|
|
parameters = dict(module["parameters"])
|
|
components = []
|
|
for name in SecopInteractor.skip_par:
|
|
if name in parameters:
|
|
parameters.pop(name)
|
|
for name in SecopInteractor.prio_par:
|
|
if name in parameters:
|
|
components.append(convert_par(path, name, parameters.pop(name)))
|
|
components1 = []
|
|
for name in SecopInteractor.hide_par:
|
|
if name in parameters:
|
|
components1.append(convert_par(path, name, parameters.pop(name)))
|
|
for name, par in parameters.items():
|
|
components.append(convert_par(path, name, par))
|
|
components.extend(components1)
|
|
for name, cmd in module.get("commands", {}).items():
|
|
components.append(convert_cmd(path, name, cmd))
|
|
return components
|
|
|
|
def updateItem(self, module, parameter, entry):
|
|
key = module, parameter
|
|
# print(key, entry)
|
|
if module in self.module_updates or parameter in self.param_updates:
|
|
name = f'{module}:{parameter}'
|
|
if entry.readerror:
|
|
item = {'name': name, 'error': str(entry.readerror)}
|
|
elif parameter == 'status':
|
|
# statuscode: 0: DISABLED, 1: IDLE, 2: WARN, 3: BUSY, 4: ERROR
|
|
statuscode, statustext = entry[0]
|
|
formatted = statuscode.name + (f', {statustext}' if statustext else '')
|
|
item = {'name': name, 'value': str(entry), 'statuscode': entry[0][0] // 100,
|
|
'formatted': formatted}
|
|
else:
|
|
item = {'name': name, 'value': str(entry), 'formatted': entry.formatted()}
|
|
# print(item)
|
|
self.updates[key] = item
|
|
|
|
def update_main(self):
|
|
cache = self.cache
|
|
for modname in self.modules:
|
|
for param in 'value', 'status', 'target':
|
|
key = modname, param
|
|
if key in cache:
|
|
self.updateItem(*key, cache[key])
|
|
|
|
def update_params(self, path):
|
|
cache = self.cache
|
|
for param in self.modules[path]['parameters']:
|
|
key = path, param
|
|
if key in cache:
|
|
self.updateItem(*key, cache[key])
|
|
|
|
def handle_command(self, command):
|
|
"""handle command if we can, else return False"""
|
|
if not command.strip():
|
|
return dict(type='accept-command')
|
|
is_param = True
|
|
if command.startswith('change '):
|
|
command = command[7:]
|
|
elif command.startswith('do '):
|
|
is_param = False
|
|
command = command[3:]
|
|
modpar, _, strvalue = command.partition(' ')
|
|
module, _, parameter = modpar.partition(':')
|
|
if not parameter:
|
|
parameter = 'target'
|
|
if module not in self.modules:
|
|
return None
|
|
logging.info('SENDCOMMAND %r', command)
|
|
if is_param:
|
|
try:
|
|
entry = self.setParameterFromString(module, parameter, strvalue)
|
|
item = {'name': f'{module}:{parameter}', 'value': str(entry), 'formatted': entry.formatted()}
|
|
except Exception as e:
|
|
print(f"{e!r} converting {strvalue} to {self.modules[module]['parameters'][parameter]['datatype']}")
|
|
self.updates[module, parameter] = item
|
|
return True
|
|
# called a command
|
|
formatted = self.execCommandFromString(module, parameter, strvalue)[0] # ignore qualifiers
|
|
return {'name': f'{module}:{parameter}', 'value': str(result), 'formatted': formatted}
|
|
|
|
def get_updates(self):
|
|
updates, self.updates = self.updates, {}
|
|
return list(updates.values())
|
|
|
|
def info(self):
|
|
return ["na"]
|