frappy/secop_psi/sea.py
2020-09-08 13:36:11 +02:00

497 lines
18 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""generic SEA driver
a object or subobject in sea may be assigned to a SECoP module
Examples:
SECoP SEA hipadaba path mod.obj mod.sub par.sub mod.path
-------------------------------------------------------------------------------
tt:maxwait tt /tt/maxwait tt maxwait /tt
tt:ramp tt set/ramp /tt/set/ramp tt set/ramp /tt
t1:raw tt t1/raw /tt/t1/raw tt t1 raw /tt/t1
rx:bla rx bla /some/rx_a/bla rx bla /some/rx_a
"""
import threading
import time
import json
from os.path import join, expanduser
from secop.modules import Module, Parameter, Command, Override, Drivable, Readable, Writable, Property, Attached
from secop.datatypes import StringType, FloatRange, ArrayOf, BoolType, IntRange, EnumType
from secop.lib import mkthread, getGeneralConfig
from secop.lib.asynconn import AsynConn, ConnectionClosed
from secop.metaclass import ModuleMeta, Done
from secop.errors import HardwareError, secop_error, ConfigError
from secop.client import ProxyClient
from secop.protocol.dispatcher import make_update
CFG_HEADER = """[NODE]
id = %(samenv)s.psi.ch
description = %(samenv)s over SEA
[seaconn]
class = secop_psi.sea.SeaClient
description = a SEA connection
"""
CFG_MODULE = """
[%(module)s]
class = secop_psi.sea.%(modcls)s
iodev = seaconn
json_descr = %(descr)s
remote_paths = .
"""
SEA_DIR = expanduser('~/sea')
confdir = getGeneralConfig()['confdir'].split(':', 1)[0]
def get_sea_port(instance):
for filename in ('sea_%s.tcl' % instance, 'sea.tcl'):
try:
with open(join(SEA_DIR, filename)) as f:
for line in f:
linesplit = line.split()
if len(linesplit) == 3:
cmd, var, value = line.split()
if var == 'serverport':
return value
except FileNotFoundError:
pass
return None
class SeaClient(ProxyClient, Module):
"""connection to SEA"""
properties = {
'json_path': Property('path to SEA json descriptors',
datatype=StringType(),
default=join(expanduser('~'), 'sea/tcl/json'))
}
parameters = {
'uri':
Parameter('hostname:portnumber', datatype=StringType(), default='localhost:5000'),
'timeout':
Parameter('timeout', datatype=FloatRange(0), default=10),
}
commands = {
'communicate':
Command('send a command to SEA', argument=StringType(), result=StringType()),
'describe':
Command('save objects (and sub-objects) description', result=StringType()),
}
def __init__(self, name, log, opts, srv):
instance = srv.node_cfg['name'].rsplit('_', 1)[0]
if 'uri' not in opts:
port = get_sea_port(instance)
if port is None:
raise ConfigError('missing sea port for %s' % instance)
opts['uri'] = 'tcp://localhost:%s' % port
self.objects = []
self.shutdown = False
self.path2param = {}
self._write_lock = threading.Lock()
self.io = None
ProxyClient.__init__(self)
Module.__init__(self, name, log, opts, srv)
def register_obj(self, module, obj):
self.objects.append(obj)
self.path2param.update(module.path2param)
self.register_callback(module.name, module.updateEvent)
def startModule(self, started_callback):
mkthread(self._connect, started_callback)
def _connect(self, started_callback):
if '//' not in self.uri:
self.uri = 'tcp://' + self.uri
self.asyncio = AsynConn(self.uri)
assert self.asyncio.readline() == b'OK'
self.asyncio.writeline(b'Spy 007')
assert self.asyncio.readline() == b'Login OK'
# the json protocol is better for updates
self.asyncio.writeline(b'protocol set json')
self.asyncio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
mkthread(self._rxthread, started_callback)
def request(self, command):
"""send a request and wait for reply"""
with self._write_lock:
if not self.io or not self.io.connection:
if not self.asyncio.connection:
self._connect(None)
self.io = AsynConn(self.uri)
assert self.io.readline() == b'OK'
self.io.writeline(b'seauser seaser')
assert self.io.readline() == b'Login OK'
self.io.flush_recv()
self.io.writeline(('fulltransact %s' % command).encode())
result = None
deadline = time.time() + 10
while time.time() < deadline:
try:
reply = self.io.readline()
if reply is None:
continue
except ConnectionClosed:
break
reply = reply.decode()
if reply.startswith('TRANSACTIONSTART'):
result = []
continue
if reply == 'TRANSACTIONFINISHED':
if result is None:
print('missing TRANSACTIONSTART on: %s' % command)
return ''
if not result:
return ''
return '\n'.join(result)
if result is None:
print('swallow: %s' % reply)
continue
if not result:
result = [reply.split('=', 1)[-1]]
else:
result.append(reply)
raise TimeoutError('no response within 10s')
def _rxthread(self, started_callback):
while not self.shutdown:
try:
reply = self.asyncio.readline()
if reply is None:
continue
except ConnectionClosed:
break
try:
msg = json.loads(reply)
except Exception as e:
print(repr(e), reply)
continue
if isinstance(msg, str):
if msg.startswith('_E '):
try:
_, path, readerror = msg.split(None, 2)
except Exception as e:
continue
else:
continue
data = {'%s.geterror' % path: readerror.replace('ERROR: ', '')}
obj = None
flag = 'hdbevent'
else:
obj = msg['object']
flag = msg['flag']
data = msg['data']
if flag == 'finish' and obj == 'get_all_param':
# first updates have finished
if started_callback:
started_callback()
started_callback = None
continue
if flag != 'hdbevent':
if obj != 'protocol':
print('SKIP', msg)
continue
if data is None:
continue
now = time.time()
for path, value in data.items():
readerror = None
if path.endswith('.geterror'):
if value:
readerror = HardwareError(value)
path = path.rsplit('.', 1)[0]
value = None
try:
module, param = self.path2param[path]
except KeyError:
# print('UNUSED', msg)
continue # unused parameter
oldv, oldt, oldr = self.cache.get((module, param), [None, None, None])
if value is None:
value = oldv
if value != oldv or str(readerror) != str(oldr) or abs(now - oldt) > 60:
# do not update unchanged values within 0.1 sec
self.updateValue(module, param, value, now, readerror)
def do_communicate(self, command):
reply = self.request(command)
return reply
def do_describe(self):
reply = self.request('describe_all')
reply = ''.join('' if line.startswith('WARNING') else line for line in reply.split('\n'))
samenv, reply = json.loads(reply)
samenv = samenv.replace('/', '_')
result = []
with open(join(confdir, 'sea', samenv + '.cfg'), 'w') as cfp:
cfp.write(CFG_HEADER % dict(samenv=samenv))
for filename, obj, descr in reply:
content = json.dumps([obj, descr]).replace('}, {', '},\n{')
with open(join(confdir, 'sea', filename + '.json'), 'w') as fp:
fp.write(content + '\n')
if descr[0].get('cmd', '').startswith('run '):
modcls = 'SeaDrivable'
else:
modcls = 'SeaReadable'
cfp.write(CFG_MODULE % dict(modcls=modcls, module=obj, descr=filename))
result.append(filename)
return '\n'.join(result)
SEA_TO_SECOPTYPE = {
'float': FloatRange(),
'text': StringType(),
'int': IntRange(),
'bool': BoolType(),
'none': None,
'floatvarar': ArrayOf(FloatRange(), 0, 400), # 400 is the current limit for proper notify events in SEA
}
def get_datatype(paramdesc):
typ = paramdesc['type']
result = SEA_TO_SECOPTYPE.get(typ, False)
if result is not False: # general case
return result
# special cases
if typ == 'enum':
return EnumType(paramdesc['enum'])
raise ValueError('unknown SEA type %r' % typ)
class SeaModule(Module):
properties = {
'iodev': Attached(),
}
# pollerClass=None
path2param = None
sea_object = None
def __new__(cls, name, logger, cfgdict, dispatcher):
visibility_level = cfgdict.pop('visibility_level', 2)
json_descr = cfgdict.pop('json_descr')
remote_paths = cfgdict.pop('remote_paths', '')
if 'description' not in cfgdict:
cfgdict['description'] = '%s (remote_paths=%s)' % (json_descr, remote_paths)
with open(join(confdir, 'sea', json_descr + '.json')) as fp:
sea_object, descr = json.load(fp)
remote_paths = remote_paths.split()
if remote_paths:
result = []
for rpath in remote_paths:
include = True
for paramdesc in descr:
path = paramdesc['path']
if paramdesc.get('visibility', 1) > visibility_level:
if not path.endswith('is_running'):
continue
sub = path.split('/', 1)
if rpath == '.': # take all except subpaths with readonly node at top
if len(sub) == 1:
include = paramdesc.get('kids', 0) == 0 or not paramdesc.get('readonly', True)
if include or path == '':
result.append(paramdesc)
elif sub[0] == rpath:
result.append(paramdesc)
descr = result
main = remote_paths[0]
if main == '.':
main = ''
else: # take all
main = ''
path2param = {}
parameters = {}
attributes = dict(sea_object=sea_object, path2param=path2param, parameters=parameters)
for paramdesc in descr:
path = paramdesc['path']
readonly = paramdesc.get('readonly', True)
dt = get_datatype(paramdesc)
kwds = dict(description=paramdesc.get('description', path),
datatype=dt,
visibility=paramdesc.get('visibility', 1),
needscfg=False, poll=False, readonly=readonly)
if kwds['datatype'] is None:
kwds.update(visibility=3, default='', datatype=StringType())
pathlist = path.split('/') if path else []
if path == main:
key = 'value'
else:
if len(pathlist) > 0:
if len(pathlist) == 1:
kwds['group'] = 'more'
else:
kwds['group'] = pathlist[-2]
# flatten path to parameter name
for i in reversed(range(len(pathlist))):
key = '_'.join(pathlist[i:])
if not key in cls.accessibles:
break
if key == 'is_running':
kwds['export'] = False
path2param['/'.join(['', sea_object] + pathlist)] = (name, key)
if key in cls.accessibles:
if key == 'target':
kwds['readonly'] = False
pobj = Override(**kwds)
datatype = kwds.get('datatype', cls.accessibles[key].datatype)
else:
pobj = Parameter(**kwds)
datatype = pobj.datatype
parameters[key] = pobj
if not hasattr(cls, 'read_' + key):
def rfunc(self, cmd='hval /sics/%s/%s' % (sea_object, path)):
print('READ', cmd)
reply = self._iodev.request(cmd)
print('REPLY', reply)
if reply.startswith('ERROR: '):
raise HardwareError(reply.split(' ', 1)[1])
try:
reply = float(reply)
except ValueError:
pass
# an updateEvent will be handled before above returns
return reply
attributes['read_' + key] = rfunc
if not (readonly or hasattr(cls, 'write_' + key)):
# pylint wrongly complains 'Cell variable pobj defined in loop'
# pylint: disable=cell-var-from-loop
def wfunc(self, value, datatype=datatype, command=paramdesc['cmd']):
# TODO: convert to valid tcl data
cmd = "%s %s" % (command, datatype.export_value(value))
print('WRITE', cmd)
self._iodev.request(cmd)
# an updateEvent will be handled before above returns
return Done
attributes['write_' + key] = wfunc
# create standard parameters like value and status, if not yet there
for pname, pobj in cls.accessibles.items():
if pname == 'pollinterval':
parameters[pname] = Override(export=False)
elif pname not in parameters and isinstance(pobj, Parameter):
parameters[pname] = Override(poll=False, needscfg=False)
classname = '%s_%s' % (cls.__name__, sea_object)
newcls = ModuleMeta.__new__(ModuleMeta, classname, (cls,), attributes)
return Module.__new__(newcls)
def __init__(self, name, logger, cfgdict, dispatcher):
Module.__init__(self, name, logger, cfgdict, dispatcher)
def updateEvent(self, module, parameter, value, timestamp, readerror):
upd = getattr(self, 'update_' + parameter, None)
if upd:
upd(value, timestamp, readerror)
return
pobj = self.parameters[parameter]
pobj.timestamp = timestamp
#if not pobj.readonly and pobj.value != value:
# print('UPDATE', module, parameter, value)
# should be done here: deal with clock differences
if not readerror:
try:
pobj.value = value # store the value even in case of a validation error
pobj.value = pobj.datatype(value)
except Exception as e:
readerror = secop_error(e)
pobj.readerror = readerror
self.DISPATCHER.broadcast_event(make_update(self.name, pobj))
#def earlyInit(self):
# self.path2param = {k % subst: v for k, v in self.path2param.items()}
def initModule(self):
self._iodev.register_obj(self, self.sea_object)
super().initModule()
class SeaReadable(SeaModule, Readable):
def update_status(self, value, timestamp, readerror):
if readerror:
value = repr(readerror)
if value == '':
self.status = [self.Status.IDLE, '']
else:
self.status = [self.Status.ERROR, value]
def read_status(self):
return self.status
class SeaWritable(SeaModule, Writable):
pass
class SeaDrivable(SeaModule, Drivable):
_sea_status = ''
_is_running = 0
#def buildParams(self, cfgdict, name):
# # insert here special treatment for status and target
# super().buildParams(cfgdict)
def read_status(self):
return self.status
def read_target(self):
return self.target
def write_target(self, value):
self._iodev.request('run %s %s' % (self.sea_object, value))
#self.status = [self.Status.BUSY, 'driving']
return value
def update_status(self, value, timestamp, readerror):
if not readerror:
self._sea_status = value
self.updateStatus()
def update_is_running(self, value, timestamp, readerror):
if not readerror:
self._is_running = value
self.updateStatus()
def updateStatus(self):
if self._sea_status:
self.status = [self.Status.ERROR, self._sea_status]
elif self._is_running:
self.status = [self.Status.BUSY, 'driving']
else:
self.status = [self.Status.IDLE, '']
def updateTarget(self, module, parameter, value, timestamp, readerror):
if value is not None:
self.target = value