Files
sehistory/secop.py
Markus Zolliker e735ebe5f9 add normalize_uri instead of short_hostname
- use gethostbyaddr only in case of numeric IP
- hard wired reverse CNAME (network alias) for linse-c
2025-05-26 08:55:11 +02:00

233 lines
7.8 KiB
Python

import re
import os
import json
import time
import socket
import logging
from collections import namedtuple
from select import select
from streams import Stream, Base, StreamDead
from normalizeuri import normalizeuri
IDN = re.compile('.*ISSE.*,SEC[oO]P,')
DESCRIBING = re.compile(r'describing \S* (.*)$')
UPDATE = re.compile(r'(update|error_update) (\S*) (.*)$')
class EnumConvert(dict):
def __call__(self, value):
return float(self[value])
class TagsDict(dict):
def __init__(self, default_value):
self.default_value = default_value
def __missing__(self, key):
return self.default_value
ParamInfo = namedtuple('ParamInfo', ['cvt', 'key', 'tags'])
class SecopStream(Stream):
ping_time = 0
def init(self, device=None, **kwds):
self._buffer = []
self.send('*IDN?')
resend = True
messages = []
for msg in self.get_lines():
if IDN.match(msg):
break
if resend:
self.send('*IDN?')
resend = False
messages.append(msg)
else:
raise ValueError(f'missing identifier, got {messages} instead')
self.send('describe')
for msg in self.get_lines():
match = DESCRIBING.match(msg)
if match:
break
else:
raise ValueError('missing describing message')
self.descr = json.loads(match.group(1))
self.device = device or self.descr['equipment_id']
if self.device.endswith('.psi.ch'):
self.device = self.device[:-7]
self.tags['device'] = self.device
self.modules = self.descr['modules']
self.param_info = {}
self.tags_dict = TagsDict(self.tags)
for mod, moddesc in self.modules.items():
mod_tags = None
for key in ('_original_id', 'original_id'):
value = moddesc.get(key)
if value:
mod_tags = dict(self.tags, device=value)
break
parameters = moddesc['accessibles']
for param, desc in parameters.items():
dt = desc['datainfo']
if dt['type'] in ('double', 'int', 'enum'):
stripped = param[1:] if param.startswith('_') else param
unit = dt.get('unit')
tags = self.tags or mod_tags
if unit:
tags = dict(tags, unit=unit)
key = mod, (param if stripped in parameters else stripped)
self.param_info[mod, param] = ParamInfo(float, key, tags)
self.send('activate')
def ping(self):
self.send('ping')
def get_tags(self, key):
return self.tags_dict[key[0]]
def event_generator(self):
try:
for msg in self.get_lines():
match = UPDATE.match(msg)
if match:
cmd, ident, data = match.groups()
mod, _, param = ident.partition(':')
pinfo = self.param_info.get((mod, param or 'value'))
if pinfo:
data = json.loads(data)
tags = self.tags_dict[pinfo.key[0]]
if cmd == 'error_update':
error = ': '.join(data[0:2])
# print(msg, repr(error))
timestamp = data[2].get('t', time.time())
yield 'error', error, pinfo.key, pinfo.tags, timestamp
else:
value = pinfo.cvt(data[0])
timestamp = data[1].get('t', time.time())
yield 'value', value, pinfo.key, pinfo.tags, timestamp
elif msg == 'active':
# from now on, no more waiting
self.notimeout()
except Exception as e:
# probably other end closed
logging.info('%r on %s', e, self.uri)
SECOP_UDP_PORT = 10767
class UdpStream(Base):
socket = None
def events(self):
while select([self.socket], [], [], 0.5)[0]:
try:
msg, addr = self.socket.recvfrom(1024)
except socket.error: # pragma: no cover
return None
addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0]
msg = json.loads(msg.decode('utf-8'))
kind = msg.pop('SECoP', None)
if not kind:
continue
if kind == 'for_other_node':
uri = msg.pop('uri')
# if 'device' not in msg:
# msg['device'] = uri.split('://', 1)[-1].split(':')[0]
kwargs = msg
elif kind == 'node':
uri = normalizeuri(f"{addr}:{msg['port']}")
kwargs = {'name': msg['equipment_id']}
else:
continue
yield SecopStream, uri, kwargs
class ScanReply(UdpStream):
def __init__(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# send a general broadcast
try:
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))
except OSError as e:
logging.info('could not send the broadcast %r:', e)
self.socket = sock
self.select_read[sock.fileno()] = self
class ScanStream(UdpStream):
def __init__(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
if os.name == 'nt':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', SECOP_UDP_PORT))
self.socket = sock
self.select_read[sock.fileno()] = self
class TrySecopConnect(Base):
def __init__(self, uri):
self.uri = uri
host, port = self.uri.split(':')
sock = socket.socket()
sock.setblocking(False)
self.socket = sock
self.fno = sock.fileno()
self.select_write[self.fno] = self
try:
sock.connect((host, int(port)))
except BlockingIOError:
pass
self.idn = b''
def events(self):
if self.select_write.pop(self.fno, None):
try:
self.socket.sendall(b'*IDN?\n')
self.idn_sent = True
logging.debug('SEND IDN %s', self.uri)
self.select_read[self.fno] = self
return
except Exception as e:
logging.info('NO CONN TO %s %r', self.uri, e)
else:
reply = b''
try:
chunk = self.socket.recv(99)
if chunk:
self.idn += chunk
if b'SECoP' in self.idn:
logging.info('connected to %s', self.uri)
yield SecopStream, self.uri, {'stream': self.uri}
if b'\n' not in self.idn:
return
except Exception:
logging.exception('receiving')
self.select_read.pop(self.fno)
def send_fake_udp(uri, device=None, instrument=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {
'SECoP': 'for_other_node',
'uri': uri,
}
if device:
msg['device'] = device
msg['instrument'] = instrument or '0'
sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))