Files
sehistory/secop.py

178 lines
5.7 KiB
Python

import re
import os
import json
import time
import socket
from select import select
from streams import Stream, Base
IDN = re.compile('.*ISSE.*,SEC[oO]P,')
DESCRIBING = re.compile(r'describing \S* (.*)$')
UPDATE = re.compile(r'(update|error_update) (\S*) (.*)$')
class EnumConvert(dict):
def __call__(self, value):
return float(self[value])
class TagsDict(dict):
def __init__(self, default_value):
self.default_value = default_value
def __missing__(self, key):
return self.default_value
class SecopStream(Stream):
ping_time = 0
def init(self, device=None, **kwds):
self._buffer = []
self.send('*IDN?')
resend = True
messages = []
for msg in self.get_lines():
if IDN.match(msg):
break
if resend:
self.send('*IDN?')
resend = False
messages.append(msg)
else:
raise ValueError(f'missing identifier, got {messages} instead')
self.send('describe')
for msg in self.get_lines():
match = DESCRIBING.match(msg)
if match:
break
else:
raise ValueError('missing describing message')
self.descr = json.loads(match.group(1))
self.device = device or self.descr['equipment_id']
if self.device.endswith('psi.ch'):
self.device[-6:] = []
self.tags['device'] = self.device
self.modules = self.descr['modules']
self.convert = {}
self.tags_dict = TagsDict(self.tags)
for mod, moddesc in self.modules.items():
for key in ('_original_id', 'original_id'):
value = moddesc.get(key)
if value:
self.tags_dict[mod] = dict(self.tags, device=value)
break
for param, desc in moddesc['accessibles'].items():
dt = desc['datainfo']
if dt['type'] in ('double', 'int', 'enum'):
self.convert[mod, param] = float
self.send('activate')
def ping(self):
self.send('ping')
def get_tags(self, key):
return self.tags_dict[key[0]]
def event_generator(self):
try:
for msg in self.get_lines():
match = UPDATE.match(msg)
if match:
cmd, ident, data = match.groups()
mod, _, param = ident.partition(':')
key = (mod, param or 'value')
cvt = self.convert.get(key)
if cvt:
data = json.loads(data)
tags = self.tags_dict[key[0]]
if cmd == 'error_update':
error = ': '.join(data[0:2])
print(msg, repr(error))
timestamp = data[2].get('t', time.time())
yield 'error', error, key, tags, timestamp
else:
value = cvt(data[0])
timestamp = data[1].get('t', time.time())
yield 'value', value, key, tags, timestamp
elif msg == 'active':
# from now on, no more waiting
self.notimeout()
except Exception as e:
print(self.uri, repr(e))
raise
SECOP_UDP_PORT = 10767
class UdpStream(Base):
socket = None
def events(self):
while select([self.socket], [], [], 0.5)[0]:
try:
msg, addr = self.socket.recvfrom(1024)
except socket.error: # pragma: no cover
return None
msg = json.loads(msg.decode('utf-8'))
kind = msg.pop('SECoP', None)
if not kind:
continue
if kind == 'for_other_node':
uri = msg.pop('uri')
# if 'device' not in msg:
# msg['device'] = uri.split('://', 1)[-1].split(':')[0]
kwargs = msg
elif kind == 'node':
uri = f"{addr[0]}:{msg['port']}"
kwargs = {'name': msg['equipment_id']}
else:
continue
yield SecopStream, uri, kwargs
class ScanReply(UdpStream):
def __init__(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# send a general broadcast
try:
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))
except OSError as e:
print('could not send the broadcast:', e)
self.socket = sock
self.select_dict[sock.fileno()] = self
class ScanStream(UdpStream):
def __init__(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
if os.name == 'nt':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', SECOP_UDP_PORT))
self.socket = sock
self.select_dict[sock.fileno()] = self
def send_fake_udp(uri, device=None, instrument=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {
'SECoP': 'for_other_node',
'uri': uri,
}
if device:
msg['device'] = device
msg['instrument'] = instrument or '0'
sock.sendto(json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))