import re import os import json import time import socket import logging from collections import namedtuple from select import select from streams import Stream, Base, StreamDead from normalizeuri import normalizeuri IDN = re.compile('.*ISSE.*,SEC[oO]P,') DESCRIBING = re.compile(r'describing \S* (.*)$') UPDATE = re.compile(r'(update|error_update) (\S*) (.*)$') class EnumConvert(dict): def __call__(self, value): return float(self[value]) class TagsDict(dict): def __init__(self, default_value): self.default_value = default_value def __missing__(self, key): return self.default_value ParamInfo = namedtuple('ParamInfo', ['cvt', 'key', 'tags']) class SecopStream(Stream): ping_time = 0 def init(self, device=None, **kwds): self._buffer = [] self.send('*IDN?') resend = True messages = [] for msg in self.get_lines(): if IDN.match(msg): break if resend: self.send('*IDN?') resend = False messages.append(msg) else: raise ValueError(f'missing identifier, got {messages} instead') self.send('describe') for msg in self.get_lines(): match = DESCRIBING.match(msg) if match: break else: raise ValueError('missing describing message') self.descr = json.loads(match.group(1)) self.device = device or self.descr['equipment_id'] if self.device.endswith('.psi.ch'): self.device = self.device[:-7] self.tags['device'] = self.device self.modules = self.descr['modules'] self.param_info = {} self.tags_dict = TagsDict(self.tags) for mod, moddesc in self.modules.items(): mod_tags = None for key in ('_original_id', 'original_id'): value = moddesc.get(key) if value: mod_tags = dict(self.tags, device=value) break parameters = moddesc['accessibles'] for param, desc in parameters.items(): dt = desc['datainfo'] if dt['type'] in ('double', 'int', 'enum'): stripped = param[1:] if param.startswith('_') else param unit = dt.get('unit') tags = self.tags or mod_tags if unit: tags = dict(tags, unit=unit) key = mod, (param if stripped in parameters else stripped) self.param_info[mod, param] = ParamInfo(float, key, tags) self.send('activate') def ping(self): self.send('ping') def get_tags(self, key): return self.tags_dict[key[0]] def event_generator(self): try: for msg in self.get_lines(): match = UPDATE.match(msg) if match: cmd, ident, data = match.groups() mod, _, param = ident.partition(':') pinfo = self.param_info.get((mod, param or 'value')) if pinfo: data = json.loads(data) tags = self.tags_dict[pinfo.key[0]] if cmd == 'error_update': error = ': '.join(data[0:2]) # print(msg, repr(error)) timestamp = data[2].get('t', time.time()) yield 'error', error, pinfo.key, pinfo.tags, timestamp else: value = pinfo.cvt(data[0]) timestamp = data[1].get('t', time.time()) yield 'value', value, pinfo.key, pinfo.tags, timestamp elif msg == 'active': # from now on, no more waiting self.notimeout() except Exception as e: # probably other end closed logging.info('%r on %s', e, self.uri) SECOP_UDP_PORT = 10767 class UdpStream(Base): socket = None def events(self): while select([self.socket], [], [], 0.5)[0]: try: msg, addr = self.socket.recvfrom(1024) except socket.error: # pragma: no cover return None addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0] msg = json.loads(msg.decode('utf-8')) kind = msg.pop('SECoP', None) if not kind: continue if kind == 'for_other_node': uri = msg.pop('uri') # if 'device' not in msg: # msg['device'] = uri.split('://', 1)[-1].split(':')[0] kwargs = msg elif kind == 'node': uri = normalizeuri(f"{addr}:{msg['port']}") kwargs = {'name': msg['equipment_id']} else: continue yield SecopStream, uri, kwargs class ScanReply(UdpStream): def __init__(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) # send a general broadcast try: sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'), ('255.255.255.255', SECOP_UDP_PORT)) except OSError as e: logging.info('could not send the broadcast %r:', e) self.socket = sock self.select_read[sock.fileno()] = self class ScanStream(UdpStream): def __init__(self): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(1) if os.name == 'nt': sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) else: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind(('0.0.0.0', SECOP_UDP_PORT)) self.socket = sock self.select_read[sock.fileno()] = self class TrySecopConnect(Base): def __init__(self, uri): self.uri = uri host, port = self.uri.split(':') sock = socket.socket() sock.setblocking(False) self.socket = sock self.fno = sock.fileno() self.select_write[self.fno] = self try: sock.connect((host, int(port))) except BlockingIOError: pass self.idn = b'' def events(self): if self.select_write.pop(self.fno, None): try: self.socket.sendall(b'*IDN?\n') self.idn_sent = True logging.debug('SEND IDN %s', self.uri) self.select_read[self.fno] = self return except Exception as e: logging.info('NO CONN TO %s %r', self.uri, e) else: reply = b'' try: chunk = self.socket.recv(99) if chunk: self.idn += chunk if b'SECoP' in self.idn: logging.info('connected to %s', self.uri) yield SecopStream, self.uri, {'stream': self.uri} if b'\n' not in self.idn: return except Exception: logging.exception('receiving') self.select_read.pop(self.fno) def send_fake_udp(uri, device=None, instrument=None): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) msg = { 'SECoP': 'for_other_node', 'uri': uri, } if device: msg['device'] = device msg['instrument'] = instrument or '0' sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'), ('255.255.255.255', SECOP_UDP_PORT))