Files
sehistory/nicoscache.py

173 lines
5.8 KiB
Python

import re
import logging
from ast import literal_eval
from streams import Stream
from secop import EnumConvert
OP_TELL = '='
OP_ASK = '?'
OP_WILDCARD = '*'
OP_SUBSCRIBE = ':'
OP_UNSUBSCRIBE = '|'
OP_TELLOLD = '!'
OP_LOCK = '$'
OP_REWRITE = '~'
OP_LOCK_LOCK = '+'
OP_LOCK_UNLOCK = '-'
# put flags between key and op...
FLAG_NO_STORE = '#'
# end/sync special token
END_MARKER = '###'
SYNC_MARKER = '#sync#'
PING = '#ping#'
# Time constant
CYCLETIME = 0.1
opkeys = OP_TELL + OP_ASK + OP_WILDCARD + OP_SUBSCRIBE + OP_UNSUBSCRIBE + \
OP_TELLOLD + OP_LOCK + OP_REWRITE
# regular expression matching a cache protocol message
msg_pattern = re.compile(r'''
^ (?:
\s* (?P<time>\d+\.?\d*)? # timestamp
\s* (?P<ttlop>[+-]?) # ttl operator
\s* (?P<ttl>\d+\.?\d*(?:[eE][+-]?\d+)?)? # ttl
\s* (?P<tsop>@) # timestamp mark
)?
\s* (?P<key>[^%(opkeys)s]*?) # key
\s* (?P<op>[%(opkeys)s]) # operator
\s* (?P<value>[^\r\n]*?) # value
\s* \r?$ # skip cr before line end
''' % dict(opkeys=opkeys), re.X)
class NicosStream(Stream):
def __init__(self, uri, name=None):
self.devices = {}
self.units = {} # global unit dict when overridden in streams
self.aliases = {}
self.descr = {}
self.convert = {}
self.secnode_uri = {}
super().__init__(uri, name)
def init(self, device=None, **kwds):
self.tags['device'] = device or 'nicos'
self.send(f'@nicos/*\n{END_MARKER}{OP_ASK}')
self._init = True
def do_status(self, devname, value, op):
self.devices[devname] = op == '='
return value[0]
def do_value(self, devname, value, op):
if op == '=':
if isinstance(value, (int, float)):
key = f'{devname}.value'
if key not in self.convert:
self.convert[key] = float
self.devices[devname] = op == '='
return value
def do_unit(self, devname, value, op):
# for non-secop devices
self.units[f'{devname}.value'] = value
self.units[f'{devname}.target'] = value
return value
def do_alias(self, devname, value, op):
self.convert.pop(f'{devname}.value', None)
self.aliases[devname] = value
return value
def do_uri(self, devname, value, op):
self.secnode_uri[devname] = value
return value
def do_setup_info(self, devname, value, op):
for dev, (cls, descr) in value.items():
self.descr[dev] = descr
mod = descr.get('secop_module')
if mod:
params_cfg = descr.get('params_cfg', {})
# add datainfo for special parameters:
for param in 'value', 'target':
dt = descr.get(f'{param}_datainfo')
if dt:
params_cfg[param] = {'datainfo': dt}
for param, cfg in params_cfg.items():
dt = cfg.get('datainfo')
key = f'{mod}.{param}'
if dt:
if dt['type'] == 'enum':
self.convert[key] = EnumConvert(dt['members'])
elif dt['type'] in ('double', 'scaled'):
self.convert[key] = float
unit = cfg.get('unit')
if unit is not None:
self.units[key] = unit
return value
def ping(self):
self.send(f'{PING}{OP_ASK}')
def get_tags(self, key):
return self.tags
def event_generator(self, matchmsg=msg_pattern.match):
if self.is_offline():
return
events = {}
try:
for line in self.get_lines():
match = matchmsg(line)
if match:
data = match.groupdict()
key = data['key']
op = data['op']
if key[:1] == '#':
if key == END_MARKER:
if self._init:
self.notimeout()
self.send(f'@nicos/{OP_SUBSCRIBE}')
self._init = False
break
continue
value = data['value']
keysplit = key.split('/')
param = keysplit[-1]
try:
value = literal_eval(value)
except Exception:
value = None
devname = keysplit[1]
devname = self.aliases.get(devname, devname)
func = getattr(self, f'do_{param}', None)
if func:
value = func(devname, value, op)
events[devname, param] = op, value, float(data['time'])
if self._init:
raise TimeoutError('timeout receiving initial values')
except Exception as e:
logging.exception('nicos %r', self.uri)
return
for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()):
descr = self.descr.get(devname)
mod = descr.get('secop_module', devname) if descr else devname
key = (mod, param)
if self.devices.get(devname):
try:
value = self.convert[key](value)
yield 'value', value, key, self.tags, ts
error = None
except KeyError: # no conversion function
continue
except TypeError:
yield 'error', 'error', key, self.tags, ts