173 lines
5.8 KiB
Python
173 lines
5.8 KiB
Python
import re
|
|
import logging
|
|
from ast import literal_eval
|
|
from streams import Stream
|
|
from secop import EnumConvert
|
|
|
|
|
|
OP_TELL = '='
|
|
OP_ASK = '?'
|
|
OP_WILDCARD = '*'
|
|
OP_SUBSCRIBE = ':'
|
|
OP_UNSUBSCRIBE = '|'
|
|
OP_TELLOLD = '!'
|
|
OP_LOCK = '$'
|
|
OP_REWRITE = '~'
|
|
|
|
OP_LOCK_LOCK = '+'
|
|
OP_LOCK_UNLOCK = '-'
|
|
|
|
# put flags between key and op...
|
|
FLAG_NO_STORE = '#'
|
|
|
|
# end/sync special token
|
|
END_MARKER = '###'
|
|
SYNC_MARKER = '#sync#'
|
|
PING = '#ping#'
|
|
|
|
# Time constant
|
|
CYCLETIME = 0.1
|
|
|
|
|
|
opkeys = OP_TELL + OP_ASK + OP_WILDCARD + OP_SUBSCRIBE + OP_UNSUBSCRIBE + \
|
|
OP_TELLOLD + OP_LOCK + OP_REWRITE
|
|
|
|
# regular expression matching a cache protocol message
|
|
msg_pattern = re.compile(r'''
|
|
^ (?:
|
|
\s* (?P<time>\d+\.?\d*)? # timestamp
|
|
\s* (?P<ttlop>[+-]?) # ttl operator
|
|
\s* (?P<ttl>\d+\.?\d*(?:[eE][+-]?\d+)?)? # ttl
|
|
\s* (?P<tsop>@) # timestamp mark
|
|
)?
|
|
\s* (?P<key>[^%(opkeys)s]*?) # key
|
|
\s* (?P<op>[%(opkeys)s]) # operator
|
|
\s* (?P<value>[^\r\n]*?) # value
|
|
\s* \r?$ # skip cr before line end
|
|
''' % dict(opkeys=opkeys), re.X)
|
|
|
|
|
|
class NicosStream(Stream):
|
|
def __init__(self, uri, name=None):
|
|
self.devices = {}
|
|
self.units = {} # global unit dict when overridden in streams
|
|
self.aliases = {}
|
|
self.descr = {}
|
|
self.convert = {}
|
|
self.secnode_uri = {}
|
|
super().__init__(uri, name)
|
|
|
|
def init(self, device=None, **kwds):
|
|
self.tags['device'] = device or 'nicos'
|
|
self.send(f'@nicos/*\n{END_MARKER}{OP_ASK}')
|
|
self._init = True
|
|
|
|
def do_status(self, devname, value, op):
|
|
self.devices[devname] = op == '='
|
|
return value[0]
|
|
|
|
def do_value(self, devname, value, op):
|
|
if op == '=':
|
|
if isinstance(value, (int, float)):
|
|
key = f'{devname}.value'
|
|
if key not in self.convert:
|
|
self.convert[key] = float
|
|
self.devices[devname] = op == '='
|
|
return value
|
|
|
|
def do_unit(self, devname, value, op):
|
|
# for non-secop devices
|
|
self.units[f'{devname}.value'] = value
|
|
self.units[f'{devname}.target'] = value
|
|
return value
|
|
|
|
def do_alias(self, devname, value, op):
|
|
self.convert.pop(f'{devname}.value', None)
|
|
self.aliases[devname] = value
|
|
return value
|
|
|
|
def do_uri(self, devname, value, op):
|
|
self.secnode_uri[devname] = value
|
|
return value
|
|
|
|
def do_setup_info(self, devname, value, op):
|
|
for dev, (cls, descr) in value.items():
|
|
self.descr[dev] = descr
|
|
mod = descr.get('secop_module')
|
|
if mod:
|
|
params_cfg = descr.get('params_cfg', {})
|
|
# add datainfo for special parameters:
|
|
for param in 'value', 'target':
|
|
dt = descr.get(f'{param}_datainfo')
|
|
if dt:
|
|
params_cfg[param] = {'datainfo': dt}
|
|
for param, cfg in params_cfg.items():
|
|
dt = cfg.get('datainfo')
|
|
key = f'{mod}.{param}'
|
|
if dt:
|
|
if dt['type'] == 'enum':
|
|
self.convert[key] = EnumConvert(dt['members'])
|
|
elif dt['type'] in ('double', 'scaled'):
|
|
self.convert[key] = float
|
|
unit = cfg.get('unit')
|
|
if unit is not None:
|
|
self.units[key] = unit
|
|
return value
|
|
|
|
def ping(self):
|
|
self.send(f'{PING}{OP_ASK}')
|
|
|
|
def get_tags(self, key):
|
|
return self.tags
|
|
|
|
def event_generator(self, matchmsg=msg_pattern.match):
|
|
if self.is_offline():
|
|
return
|
|
events = {}
|
|
try:
|
|
for line in self.get_lines():
|
|
match = matchmsg(line)
|
|
if match:
|
|
data = match.groupdict()
|
|
key = data['key']
|
|
op = data['op']
|
|
if key[:1] == '#':
|
|
if key == END_MARKER:
|
|
if self._init:
|
|
self.notimeout()
|
|
self.send(f'@nicos/{OP_SUBSCRIBE}')
|
|
self._init = False
|
|
break
|
|
continue
|
|
value = data['value']
|
|
keysplit = key.split('/')
|
|
param = keysplit[-1]
|
|
try:
|
|
value = literal_eval(value)
|
|
except Exception:
|
|
value = None
|
|
devname = keysplit[1]
|
|
devname = self.aliases.get(devname, devname)
|
|
func = getattr(self, f'do_{param}', None)
|
|
if func:
|
|
value = func(devname, value, op)
|
|
events[devname, param] = op, value, float(data['time'])
|
|
if self._init:
|
|
raise TimeoutError('timeout receiving initial values')
|
|
except Exception as e:
|
|
logging.exception('nicos %r', self.uri)
|
|
return
|
|
for ts, devname, param, op, value in sorted([t, d, p, o, v] for (d, p), (o, v, t) in events.items()):
|
|
descr = self.descr.get(devname)
|
|
mod = descr.get('secop_module', devname) if descr else devname
|
|
key = (mod, param)
|
|
if self.devices.get(devname):
|
|
try:
|
|
value = self.convert[key](value)
|
|
yield 'value', value, key, self.tags, ts
|
|
error = None
|
|
except KeyError: # no conversion function
|
|
continue
|
|
except TypeError:
|
|
yield 'error', 'error', key, self.tags, ts
|