- use gethostbyaddr only in case of numeric IP - hard wired reverse CNAME (network alias) for linse-c
233 lines
7.8 KiB
Python
233 lines
7.8 KiB
Python
import re
|
|
import os
|
|
import json
|
|
import time
|
|
import socket
|
|
import logging
|
|
from collections import namedtuple
|
|
from select import select
|
|
from streams import Stream, Base, StreamDead
|
|
from normalizeuri import normalizeuri
|
|
|
|
IDN = re.compile('.*ISSE.*,SEC[oO]P,')
|
|
DESCRIBING = re.compile(r'describing \S* (.*)$')
|
|
UPDATE = re.compile(r'(update|error_update) (\S*) (.*)$')
|
|
|
|
|
|
class EnumConvert(dict):
|
|
def __call__(self, value):
|
|
return float(self[value])
|
|
|
|
|
|
class TagsDict(dict):
|
|
def __init__(self, default_value):
|
|
self.default_value = default_value
|
|
|
|
def __missing__(self, key):
|
|
return self.default_value
|
|
|
|
|
|
ParamInfo = namedtuple('ParamInfo', ['cvt', 'key', 'tags'])
|
|
|
|
|
|
class SecopStream(Stream):
|
|
ping_time = 0
|
|
|
|
def init(self, device=None, **kwds):
|
|
self._buffer = []
|
|
self.send('*IDN?')
|
|
resend = True
|
|
messages = []
|
|
for msg in self.get_lines():
|
|
if IDN.match(msg):
|
|
break
|
|
if resend:
|
|
self.send('*IDN?')
|
|
resend = False
|
|
messages.append(msg)
|
|
else:
|
|
raise ValueError(f'missing identifier, got {messages} instead')
|
|
self.send('describe')
|
|
for msg in self.get_lines():
|
|
match = DESCRIBING.match(msg)
|
|
if match:
|
|
break
|
|
else:
|
|
raise ValueError('missing describing message')
|
|
self.descr = json.loads(match.group(1))
|
|
self.device = device or self.descr['equipment_id']
|
|
if self.device.endswith('.psi.ch'):
|
|
self.device = self.device[:-7]
|
|
self.tags['device'] = self.device
|
|
self.modules = self.descr['modules']
|
|
self.param_info = {}
|
|
self.tags_dict = TagsDict(self.tags)
|
|
for mod, moddesc in self.modules.items():
|
|
mod_tags = None
|
|
for key in ('_original_id', 'original_id'):
|
|
value = moddesc.get(key)
|
|
if value:
|
|
mod_tags = dict(self.tags, device=value)
|
|
break
|
|
parameters = moddesc['accessibles']
|
|
for param, desc in parameters.items():
|
|
dt = desc['datainfo']
|
|
if dt['type'] in ('double', 'int', 'enum'):
|
|
stripped = param[1:] if param.startswith('_') else param
|
|
unit = dt.get('unit')
|
|
tags = self.tags or mod_tags
|
|
if unit:
|
|
tags = dict(tags, unit=unit)
|
|
key = mod, (param if stripped in parameters else stripped)
|
|
self.param_info[mod, param] = ParamInfo(float, key, tags)
|
|
self.send('activate')
|
|
|
|
def ping(self):
|
|
self.send('ping')
|
|
|
|
def get_tags(self, key):
|
|
return self.tags_dict[key[0]]
|
|
|
|
def event_generator(self):
|
|
try:
|
|
for msg in self.get_lines():
|
|
match = UPDATE.match(msg)
|
|
if match:
|
|
cmd, ident, data = match.groups()
|
|
mod, _, param = ident.partition(':')
|
|
pinfo = self.param_info.get((mod, param or 'value'))
|
|
if pinfo:
|
|
data = json.loads(data)
|
|
tags = self.tags_dict[pinfo.key[0]]
|
|
if cmd == 'error_update':
|
|
error = ': '.join(data[0:2])
|
|
# print(msg, repr(error))
|
|
timestamp = data[2].get('t', time.time())
|
|
yield 'error', error, pinfo.key, pinfo.tags, timestamp
|
|
else:
|
|
value = pinfo.cvt(data[0])
|
|
timestamp = data[1].get('t', time.time())
|
|
yield 'value', value, pinfo.key, pinfo.tags, timestamp
|
|
elif msg == 'active':
|
|
# from now on, no more waiting
|
|
self.notimeout()
|
|
|
|
except Exception as e:
|
|
# probably other end closed
|
|
logging.info('%r on %s', e, self.uri)
|
|
|
|
|
|
SECOP_UDP_PORT = 10767
|
|
|
|
|
|
class UdpStream(Base):
|
|
socket = None
|
|
|
|
def events(self):
|
|
while select([self.socket], [], [], 0.5)[0]:
|
|
try:
|
|
msg, addr = self.socket.recvfrom(1024)
|
|
except socket.error: # pragma: no cover
|
|
return None
|
|
addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0]
|
|
msg = json.loads(msg.decode('utf-8'))
|
|
kind = msg.pop('SECoP', None)
|
|
if not kind:
|
|
continue
|
|
if kind == 'for_other_node':
|
|
uri = msg.pop('uri')
|
|
# if 'device' not in msg:
|
|
# msg['device'] = uri.split('://', 1)[-1].split(':')[0]
|
|
kwargs = msg
|
|
elif kind == 'node':
|
|
uri = normalizeuri(f"{addr}:{msg['port']}")
|
|
kwargs = {'name': msg['equipment_id']}
|
|
else:
|
|
continue
|
|
yield SecopStream, uri, kwargs
|
|
|
|
|
|
class ScanReply(UdpStream):
|
|
def __init__(self):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
# send a general broadcast
|
|
try:
|
|
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
|
|
('255.255.255.255', SECOP_UDP_PORT))
|
|
except OSError as e:
|
|
logging.info('could not send the broadcast %r:', e)
|
|
self.socket = sock
|
|
self.select_read[sock.fileno()] = self
|
|
|
|
|
|
class ScanStream(UdpStream):
|
|
def __init__(self):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.settimeout(1)
|
|
if os.name == 'nt':
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
else:
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
sock.bind(('0.0.0.0', SECOP_UDP_PORT))
|
|
self.socket = sock
|
|
self.select_read[sock.fileno()] = self
|
|
|
|
|
|
class TrySecopConnect(Base):
|
|
def __init__(self, uri):
|
|
self.uri = uri
|
|
host, port = self.uri.split(':')
|
|
sock = socket.socket()
|
|
sock.setblocking(False)
|
|
self.socket = sock
|
|
self.fno = sock.fileno()
|
|
self.select_write[self.fno] = self
|
|
try:
|
|
sock.connect((host, int(port)))
|
|
except BlockingIOError:
|
|
pass
|
|
self.idn = b''
|
|
|
|
def events(self):
|
|
if self.select_write.pop(self.fno, None):
|
|
try:
|
|
self.socket.sendall(b'*IDN?\n')
|
|
self.idn_sent = True
|
|
logging.debug('SEND IDN %s', self.uri)
|
|
self.select_read[self.fno] = self
|
|
return
|
|
except Exception as e:
|
|
logging.info('NO CONN TO %s %r', self.uri, e)
|
|
else:
|
|
reply = b''
|
|
try:
|
|
chunk = self.socket.recv(99)
|
|
if chunk:
|
|
self.idn += chunk
|
|
if b'SECoP' in self.idn:
|
|
logging.info('connected to %s', self.uri)
|
|
yield SecopStream, self.uri, {'stream': self.uri}
|
|
if b'\n' not in self.idn:
|
|
return
|
|
except Exception:
|
|
logging.exception('receiving')
|
|
self.select_read.pop(self.fno)
|
|
|
|
|
|
def send_fake_udp(uri, device=None, instrument=None):
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
|
|
msg = {
|
|
'SECoP': 'for_other_node',
|
|
'uri': uri,
|
|
}
|
|
if device:
|
|
msg['device'] = device
|
|
msg['instrument'] = instrument or '0'
|
|
sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'),
|
|
('255.255.255.255', SECOP_UDP_PORT))
|
|
|
|
|