From 331f3e02b1c67adf9bd72d1219ad4f1bbd403e4c Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 6 May 2026 16:02:02 +0200 Subject: [PATCH] [WIP] superfrappy: state as of 2026-05-06 Change-Id: Ifd83a297dd5593db502bb2c0f6ec1c99717d4a4a --- frappy_psi/sea.py | 27 +- frappy_psi/superfrappy.py | 431 ++++++++++++++++++ frappy_psi/superfrappy/__init__.py | 562 ------------------------ frappy_psi/superfrappy/marche_frappy.py | 276 ------------ frappy_psi/superfrappy/normalizeuri.py | 41 -- frappy_psi/superfrappy/secop_udp.py | 115 ----- 6 files changed, 452 insertions(+), 1000 deletions(-) create mode 100644 frappy_psi/superfrappy.py delete mode 100644 frappy_psi/superfrappy/__init__.py delete mode 100644 frappy_psi/superfrappy/marche_frappy.py delete mode 100644 frappy_psi/superfrappy/normalizeuri.py delete mode 100644 frappy_psi/superfrappy/secop_udp.py diff --git a/frappy_psi/sea.py b/frappy_psi/sea.py index 7bcaae41..9bddd8df 100644 --- a/frappy_psi/sea.py +++ b/frappy_psi/sea.py @@ -97,7 +97,7 @@ class SeaConfig: for dir in self.dirs: file = dir / json_file if file.is_file(): - return json.load(file.read_text()) + return json.loads(file.read_text()) raise FileNotFoundError(f'{json_file} not found') @@ -129,7 +129,7 @@ class SeaClient(ProxyClient, Module): raise ConfigError('missing sea port for %s' % instance) opts['uri'] = {'value': 'tcp://localhost:%s' % port} self.objects = set() - self.shutdown = False + self._shutdown = threading.Event() self.path2param = {} self._write_lock = threading.RLock() self._connect_thread = None @@ -200,10 +200,15 @@ class SeaClient(ProxyClient, Module): raise CommunicationFailedError(f'reply from frappy_config: {result}') # frappy_async_client switches to the json protocol (better for updates) self.asynio.writeline(b'frappy_async_client') - self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) + if self.config == 'device': + self.asynio.writeline(b'get_all_param device') + else: + self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) self.log.info('connected to %s', self.uri) self._connected.set() mkthread(self._rxthread) + except CommunicationFailedError: + pass finally: self._connect_thread = None @@ -266,14 +271,18 @@ class SeaClient(ProxyClient, Module): def _rxthread(self): recheck = None - while not self.shutdown: + while not self._shutdown.is_set(): if recheck and time.time() > recheck: # try to collect device changes within 1 sec recheck = None - result = self.request('check_config %s %s' % (self.service, self.config)) + if self.service: + result = self.request('check_config %s %s' % (self.service, self.config)) + else: + result = '1' if result == '1': self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode()) else: + print('SHUTDOWN', result) self.secNode.srv.shutdown() try: reply = self.asynio.readline() @@ -281,6 +290,7 @@ class SeaClient(ProxyClient, Module): continue except ConnectionClosed: self.close_connections() + print('CLOSED') break try: msg = json.loads(reply) @@ -331,7 +341,8 @@ class SeaClient(ProxyClient, Module): if path.startswith('/device'): if path == '/device/changetime': recheck = time.time() + 1 - elif path.startswith('/device/frappy_%s' % self.service) and value == '': + elif self.service and path.startswith('/device/frappy_%s' % self.service) and value == '': + print('SHUT', self.service, path) self.secNode.srv.shutdown() else: for module, param in mplist: @@ -343,6 +354,10 @@ class SeaClient(ProxyClient, Module): # do not update unchanged values within 60 sec self.updateValue(module, param, value, now, readerror) + def stopPollThread(self): + self._shutdown.set() + super().stopPollThread() + @Command(StringType(), result=StringType()) def communicate(self, command): """send a command to SEA""" diff --git a/frappy_psi/superfrappy.py b/frappy_psi/superfrappy.py new file mode 100644 index 00000000..9eb6b9f9 --- /dev/null +++ b/frappy_psi/superfrappy.py @@ -0,0 +1,431 @@ +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# ***************************************************************************** +"""Module to handle running SEC nodes for an instrument + +Also start and stop frappy servers related to SEA + +Handled by linsetools: + + For all detected SEC nodes, setup files are written to the + setup directory, and they are disabled ('lowlevel') when shut down. + + For SEC nodes started on the instrument computer, wrapper config files + containing the port number are produced in the wrapper directory. +""" +import sys +sys.path.append('/home/l_samenv') +sys.path.append('/sq_sw/linse') +import time +import socket +import json +import os +from select import select +from frappy.core import Readable, Parameter, Property, Command, Attached, IDLE +from frappy.datatypes import ArrayOf, StructOf, StringType, IntRange, TupleOf, BoolType +from frappy.client import SecopClient +import linsetools.frappy +from sehistory.normalizeuri import normalizeuri + + +porttype = IntRange(0, 0xc000) +secnodetype = StructOf(cfg=StringType(), uri=StringType(), service=StringType()) + + +SECOP_UDP_PORT = 10767 + + +class FrappyControl(linsetools.frappy.FrappyControl): + # TODO; move to linsetools + def service_from_uri(self, uri): + try: + idx = self.frappy_servers.index(uri) + return {0: 'main', 1: 'stick'}.get(idx, 'addons') + except ValueError: + return '' + + +class Listener: + socket = None + + def __init__(self, use_localhost=False): + self.use_localhost = use_localhost # whether 'localhost' or the real hostname is returned on the own machine + + def poll(self, log=None): + if self.socket is None: + return None + if not select([self.socket], [], [], 0)[0]: + return None + try: + msg, addr = self.socket.recvfrom(1024) + except socket.error: # pragma: no cover + return None + addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0] + msg = json.loads(msg.decode('utf-8')) + if log: + log.debug('got msg %r', msg) + kind = msg.pop('SECoP', None) + if kind == 'node': + msg['device'] = msg['equipment_id'].split('.')[0] + uri = f"{addr}:{msg['port']}" + elif kind == 'for_other_node': + uri = msg['uri'] + else: + return None + host, _, port = uri.rpartition(':') + host = normalizeuri(host or 'localhost', self.use_localhost) + msg['uri'] = f'{host}:{port}' + return msg + + +class UdpScan(Listener): + def start(self, log=None): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + # send a general broadcast + try: + sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'), + ('255.255.255.255', SECOP_UDP_PORT)) + except OSError as e: + if log: + log.info('could not send the broadcast %r:', e) + self.socket = sock + self.deadline = time.time() + 30 + + def poll(self, log=None): + if self.socket is None: + return None + if time.time() > self.deadline: + try: + self.socket.close() + except Exception: + pass + self.socket = None + return super().poll(log) + + +class UdpListener(Listener): + def start(self): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(1) + if os.name == 'nt': + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + else: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + sock.bind(('0.0.0.0', SECOP_UDP_PORT)) + self.socket = sock + + +def send_other_udp(uri, instrument, device=None): + """inform the feeder about the start of a frappy server""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + msg = { + 'SECoP': 'for_other_node', + 'uri': uri, + 'instrument': instrument, + } + if device: + msg['device'] = device + msg = json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8') + sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT)) + + +class SecNode: + log = None + + def __init__(self, host_port, cfg, service=''): + self.host_port = host_port + self.host, _, self.port = host_port.partition(':') + self.client = SecopClient(host_port) + self.client.register_callback(None, self.nodeStateChange) + self.nodename = cfg + self.cfg = cfg + self.description = cfg + self.online = False + self.status = 'created' + self.trigger = True + # self.announce_pnp = True # None: do not announce, True: announce new, False: announce removal + self.setup_was_loaded = False + self.service = service + + def connect(self, complete_callback=None, log=None): + self.status = 'connecting' + self.log = log + self.complete_callback = complete_callback + if self.log: + self.log.info('spawn connect') + self.client.spawn_connect(self.complete) + + def complete(self): + try: + self.online = True + self.status = 'completing' + self.nodename = self.client.nodename + if self.log: + self.log.info('connected to %r', self.nodename) + if self.complete_callback: + if not self.cfg: + if self.nodename == self.client.uri: + self.cfg = self.host_port + else: + self.cfg = self.nodename.replace('.', '_') + try: + self.complete_callback(self) + except Exception as e: + self.log.exception('complete_callback failed') + self.complete_callback = None + desc = self.client.properties.get('description') or self.nodename + self.description = desc.split('\n')[0] + self.status = 'connected' + except Exception as e: + self.log.exception('connect failed') + self.status = f'disconnected {e!r}' + + def nodeStateChange(self, online, statetext): + pass + + def disconnect(self): + self.online = False + self.status = 'disconnecting' + self.client.disconnect() + self.status = 'disconnected' + + +class SuperFrappy(Readable): + seascan = Attached() + marcheport = Property('marche port number', porttype, default=8124) + is_main_instrument = Property('this is the main instrument', BoolType(), default=True) + value = Parameter('running servers', ArrayOf(secnodetype), default=()) + instance = Parameter('"this" or ', StringType(), default='this') + # plugplay = Parameter('enable plug and play', BoolType(), readonly=False, default=False) + # nicos_setups = Parameter('active nicos se setups', ArrayOf(StringType())) + _secnodes = None # dict of SecNode + _udp_listener = None + _setups = () + _to_close = () + _fast_deadline = 0 + + def initModule(self): + super().initModule() + # fc = FrappyControl(self.instance, 'localhost', self.marcheport) + self._secnodes = {} + self._udp_listener = [UdpScan(True), UdpListener(True)] + self.rescan() + self._to_close = set() + for pname in 'device_name', 'stick_name', 'addons': + self.seascan.addCallback(pname, self.sea_update, pname) + + def add_secnode(self, service, cfg, start): + self.log.info('add %r as %r', cfg, service) + # TODO: treat unknown service + for secnode in self._secnodes.values(): + if secnode.service == service: + if cfg == secnode.cfg: + return + elif cfg == secnode.cfg: + raise ValueError(f'{cfg} is already used as {secnode.service!r}') + fc = FrappyControl(self.instance, 'localhost', self.marcheport) + uri = fc.add_frappy_service(service, cfg, None, self.log) + if start: + self.log.info('start %r', cfg) + fc.start(cfg) + self._secnodes[uri] = secnode = SecNode(uri, cfg, service) + secnode.connect() + self.read_value() + self._fast_deadline = time.time() + 15 + self.setFastPoll(True, 0.25) + + def remove_secnode(self, service_or_cfg, remaining=()): + fc = FrappyControl(self.instance, 'localhost', self.marcheport) + count = 0 + for uri, secnode in list(self._secnodes.items()): + if secnode.service == service_or_cfg: + cfg = secnode.cfg + elif secnode.cfg == service_or_cfg: + cfg = service_or_cfg + else: + continue + if cfg not in remaining: + count += 1 + self.log.info('remove %s at %s', secnode.cfg, uri) + secnode.disconnect() + fc.stop(cfg) + fc.delete_frappy_service(cfg) + self.read_value() + return count + + def sea_update(self, pname, value, error): + if error or not value: + self.log.warn('SEA ignore %r %r %r', pname, value, error) + return + self.log.warn('SEA %r %r %r', pname, value, error) + fc = FrappyControl(self.instance, 'localhost', self.marcheport) + if pname == 'device_name': + service = 'main' + configs = [f'{value}.config'] + elif pname == 'stick_name': + service = 'stick' + configs = [f'{value}.stick'] + else: + service = 'addons' + configs = [f'{v}.config' for v in value.split()] + self.log.warn('SeA %r', configs) + if value: + fc.all_cfg(self.instance, service, details=True) + for seacfg in configs: + cfgs = list(fc.sea2frappy.get(seacfg, ())) + if len(cfgs) == 1: + cfg = cfgs[0] + self.log.warn('SeA cfg=%s', cfg) + try: + self.add_secnode(service, cfg, False) # TODO: must be True, finally + except ValueError as e: + self.log.warn('%s', e) + elif len(cfgs) > 1: + self.log.warn('frappy config for %s is ambiguous: %s', seacfg, ','.join(cfgs)) + if service == 'addons': + remaining = {fc.sea2frappy.get(f'{v["cfg"]}.addons') for v in self.value} + else: + return + else: + remaining = () + self.remove_secnode(service, remaining=remaining) + + def doPoll(self): + super().doPoll() + done = False + for listener in self._udp_listener: + while msg := listener.poll(self.log): + done = True + uri = msg['uri'] + if uri in self._secnodes: + self.log.info('%r is already known', msg) + else: + cfg = msg.get('device', '') + # if uri.startswith('localhost:') and cfg != 'superfrappy': + if cfg != 'superfrappy': + self.log.warn('%r appeared', msg) + try: + self.connect(uri) + except Exception as e: + self.log.warning('%s: %r', uri, e) + else: + self.log.info('skip %r', msg) + + if self._fast_deadline == 0: + self.log.warn('INIT') + try: + for pname in 'device_name', 'stick_name', 'addons': + method = f'read_{pname}' + value = getattr(self.seascan, method)() + self.sea_update(pname, value, None) + self._fast_deadline = time.time() + 15 + self.setFastPoll(True, 0.25) + except Exception as e: + self.log.error('doPoll %r', e) + if not done and time.time() > self._fast_deadline: + self.setFastPoll(False) + + def read_value(self): + value = [] + for secnode in self._secnodes.values(): + info = dict(cfg=secnode.cfg, uri=secnode.host_port, service=secnode.service) + value.append(info) + if tuple(value) != self.value: + self.log.info('----') + for info in value: + formatted = '%(cfg)s uri=%(uri)s service=%(service)s' % info + self.log.info('---- %s', formatted) + return value + + def read_status(self): + return IDLE, ', '.join((f'{k}: {v.status}' for k, v in self._secnodes.items())) + + @Command + def rescan(self): + """rescan secop servers in subnet""" + for listener in self._udp_listener: + listener.start() + + @Command(argument=StringType()) + def restart(self, cfg): + """restart the frappy server + + works only when it runs on the local machine + """ + fc = FrappyControl(self.instance, 'localhost', self.marcheport) + fc.restart(cfg) + + @Command(argument=StringType()) + def connect(self, host_port): + """connect secnode + + :param host_port: box address : + """ + service, _, host_port = host_port.rpartition('=') + host_port = normalizeuri(host_port, True) + secnode = self._secnodes.get(host_port) + if secnode: + self.log.info('already connected %r', host_port) + else: + self.log.info('connect secnode %r', host_port) + fc = FrappyControl(self.instance, 'localhost', self.marcheport) + if not service: + service = fc.service_from_uri(host_port) + if not service: + raise ValueError(f'can not determine service from {host_port}') + secnode = SecNode(host_port, fc.cfg_info.get(host_port, ''), service) + self._secnodes[host_port] = secnode + secnode.connect(log=self.log) + self.read_value() + + @Command(argument=StringType()) + def main(self, cfg): + """add or change main cfg""" + self.add_secnode('main', cfg, True) + + @Command(argument=StringType()) + def stick(self, cfg): + """add or change stick cfg""" + self.add_secnode('stick', cfg, True) + + @Command(argument=StringType()) + def add(self, cfg): + """add an addons cfg""" + self.add_secnode('addons', cfg, True) + + @Command(argument=StringType()) + def remove(self, service_or_cfg): + """remove server + + - for servers on localhost stop the server + - register the setup file for deletion + """ + count = self.remove_secnode(service_or_cfg) + if count == 0: + raise ValueError(f'no running server found for {service_or_cfg}') + + @Command(argument=TupleOf(StringType(), porttype), result=StringType()) + def running_cfg(self, host_port): + """when running, return cfg, else an empty string""" + secnode = self._secnodes.get(host_port) + if secnode and secnode.online: + return secnode.cfg + return '' diff --git a/frappy_psi/superfrappy/__init__.py b/frappy_psi/superfrappy/__init__.py deleted file mode 100644 index 7197c937..00000000 --- a/frappy_psi/superfrappy/__init__.py +++ /dev/null @@ -1,562 +0,0 @@ -# ***************************************************************************** -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# ***************************************************************************** -"""module to trigger plug and play mechanism - -For all detected secop servers, setup files are written to the -setup directory. - -For SEC nodes started on the instrument computer, wrapper config files -containing the port number are produced in the wrapper directory. - -Mechanism: - -start or detect a SecNode - - (on start only) create a wrapper file and tell marche to start - add SecNode to superfrappy._secnodes in 'connecting' mode - - -action table: - -connection == online, setupfile does not exist: - - announce = new, create wrapperfile (if needed) and setup file - -nicos == idle, setup not in loaded_setups, announce == new: - - announce = None, plugplay = new - -setup in loaded_setups, plugplay == new: - - pluglay = quiet (handled in _handle_setups) - -connection != online, setup in loaded_setups: - - announce = cancel - -nicos == idle, setup not in loaded_setups, announce == cancel: - - announce = None, plugplay = cancel - -setup not in loaded_setups, plugplay == cancel: - - plugplay = quiet (handled in _handle_setups) - -setup not in loaded_setups, plugplay == quiet and setupfile exists: - - remove setupfile and wrapper file (if present) - -""" -import time -import socket -import re -from ast import literal_eval -from pathlib import Path -from frappy.lib import mkthread, formatExtendedTraceback -from frappy.core import Readable, Parameter, Property, Command, Communicator, HasIO -from frappy.datatypes import ArrayOf, StructOf, StringType, IntRange, TupleOf, BoolType -from frappy.client import SecopClient -from .marche_frappy import FrappyMarche -from .secop_udp import UdpScan, UdpListener -from .normalizeuri import normalizeuri - - -porttype = IntRange(0, 0xc000) -secnodetype = StructOf(cfg=StringType(), uri=StringType(), status=StringType()) - - -SETUP_TEMPLATE = """description = 'frappy %(cfg)s setup' -group = 'plugplay' - -devices = { - %(devname)s: - device('nicos_sinq.frappy_sinq.new.FrappyMarcheNode', - description='%(cfg)s SEC node', unit='', async_only=True, - prefix='se_', auto_create=True, - uri=%(uri)s, - general_stop_whitelist=['om', 'stickrot'], - %(nodeargs)s), -} -%(aliasconfig)s -""" - -MEANINGS = { - 'temperature': 'Ts', - 'temperature_regulation': 'T', - 'magneticfield': 'B', - 'pressure': 'p', - 'rotation_z': 'a3', - 'stick_rotation': 'dom', -} -SKIP_ENV = 'rotation_z', 'dom' - - -class SecNode: - log = None - - def __init__(self, host_port, cfg): - self.host_port = host_port - self.host, _, self.port = host_port.partition(':') - self.client = SecopClient(host_port) - self.nodename = cfg - self.cfg = cfg - self.description = cfg - self.online = False - self.status = 'created' - self.trigger = True - self.announce_pnp = True # None: do not announce, True: announce new, False: announce removal - self.setup_was_loaded = False - - def connect(self, complete_callback=None, log=None): - self.status = 'connecting' - self.log = log - self.complete_callback = complete_callback - if self.log: - self.log.info('spawn connect') - self.client.spawn_connect(self.complete) - - def get_setup(self): - return f'se_{self.cfg}' - - def complete(self): - try: - self.online = True - self.status = 'completing' - self.nodename = self.client.nodename - if self.log: - self.log.info('connected to %r', self.nodename) - if self.complete_callback: - if not self.cfg: - if self.nodename == self.client.uri: - self.cfg = self.host_port - else: - self.cfg = self.nodename.replace('.', '_') - try: - self.complete_callback(self) - except Exception as e: - self.log.exception('complete_callback failed') - self.complete_callback = None - desc = self.client.properties.get('description') or self.nodename - self.description = desc.split('\n')[0] - self.status = 'connected' - except Exception as e: - self.log.exception('connect failed') - self.status = f'disconnected {e!r}' - - def disconnect(self): - self.online = False - self.status = 'disconnecting' - self.client.disconnect() - self.status = 'disconnected' - - -def noop(*args): - pass - - -MSGPAT = re.compile(r'([^=!]*)(?:([=!])(.*))?') - - -def noop(*args): - pass - - -def get_lookup_key(key): - split = key.split('/') - return f'/{split[-2]}/{split[-1]}' if len(split) > 2 else '/'.join(split[-2:]) - - -class NicosCache(Communicator): - uri = Property(':>', StringType(), default='localhost') - _error = None - _sock = None - - def doPoll(self): - if self._error: - self.log.error('%r', self._error) - self._error = None - - @Command(argument=StringType()) - def communicate(self, command): - """send a command, do not wait for any response""" - self._connect() - self.log.info('> %r', command) - self._sock.sendall(command.encode() + b'\n') - - def _connect(self, keys=()): - if self._sock is None: - host, _, port = self.uri.partition(':') - self._sock = socket.create_connection((host or 'localhost', int(port or 14869))) - for lookup_key in keys: - msg = f'{lookup_key}*\n{lookup_key}:\n' - self._sock.sendall(msg.encode()) - - def recvloop(self, handlers, other=noop, exception=noop): - handler_lookup = {} - for key, handler in handlers.items(): - pat = re.compile(key.replace('*', '.*')) - lookup_key = get_lookup_key(key) - handler_lookup.setdefault(lookup_key, []).append((pat, handler)) - while True: - try: - self._connect(handler_lookup) - except Exception: - self._error = formatExtendedTraceback() - time.sleep(10) - continue - try: - buffer = b'' - while True: - raw = self._sock.recv(8192) - if not raw: - break - messages = (buffer + raw).split(b'\n') - buffer = messages.pop() - for msg in messages: - msg = msg.strip().decode() - match = MSGPAT.match(msg) - if not match: - other(msg) - continue - key, op, value = match.groups() - lookup_key = get_lookup_key(key) - for pat, handler in handler_lookup.get(lookup_key, ()): - if pat.match(key): - try: - if value is not None: - value = literal_eval(value) - handler(key, op, value) - except Exception as e: - self._error = formatExtendedTraceback() - exception(e) - break - else: - other(key) - messages.extend(raw.split(b'\n')) - except Exception as e: - self._error = formatExtendedTraceback() - self._sock = None - exception(e) - raise - - -class SuperFrappy(HasIO, Readable): - ioClass = NicosCache - marcheport = Property('marche port number', porttype, default=8124) - is_main_instrument = Property('this is the main instrument', BoolType(), default=True) - value = Parameter('running servers', ArrayOf(secnodetype), default=()) - instance = Parameter('"this" or ', StringType()) - plugplay = Parameter('enable plug and play', BoolType(), readonly=False, default=False) - nicos_setups = Parameter('active nicos se setups', ArrayOf(StringType())) - _marche = None - _secnodes = None # dict of SecNode - _udp_listener = None - _nicos_idle_since = None - _setups = () - _to_close = () - - def initModule(self): - super().initModule() - self._marche = FrappyMarche(self.instance, 'localhost', self.marcheport) - self.setupdir = self._marche.config['setupdir'] - self.wrapperdir = self._marche.wrapperdir - self._secnodes = {} - self._udp_listener = [UdpScan(True), UdpListener(True)] - self.rescan() - self._announced_setups = {} - self._current_plugplay = {} - self._to_close = set() - # self.log.info('%r', self.log.handlers[0].setLevel(10)) - mkthread(self.io.recvloop, { - 'nicos/session/mastersetupexplicit': self._handle_setups, - 'nicos/exp/scripts': self._handle_scripts, - 'se/*/nicos/setupname': self._handle_plugplay, - }) - - def _cache_send(self, key, op, value=''): - if op in ('=', '!'): - value = repr(value) - self.io.communicate(key + op + value) - - def _handle_scripts(self, key, op, value): - self.log.info('scripts %r', value) - if value: - self._nicos_idle_since = None - self.setFastPoll(False) - else: - if not self._nicos_idle_since: - self._nicos_idle_since = time.time() - self.setFastPoll(True, 0.25) - - def _handle_setups(self, key, op, value): - self._setups = set(value) - self.log.info('setups %r', value) - for secnode in self._secnodes.values(): - setup = secnode.get_setup() - loaded = setup in self._setups - pnp = self._current_plugplay.get(secnode.nodename) - if loaded: - if pnp is True: - if self.plugplay: - self._send_pnp_message(secnode.nodename, None, True) - secnode.status = 'loaded' - secnode.setup_was_loaded = True - else: - if pnp is False: - if self.plugplay: - self._send_pnp_message(secnode.nodename, None, False) - if secnode.setup_was_loaded and secnode.host == 'localhost' or not secnode.client.online: - secnode.status = 'to_close' - self._to_close.add(secnode.host_port) - - def _handle_plugplay(self, key, op, value): - value = None if value is None else (op == '=') - if value is None: - self._current_plugplay.pop(key, None) - else: - self._current_plugplay[key] = value - self.log.info('pnp %r', self._current_plugplay) - - def _write_setup_file(self, secnode): - setup_file = Path(self.setupdir) / f'{secnode.get_setup()}.py' - envlist, alias_config, devmap = self.node_setup_info(secnode) - nodeargs = f'device_mapping={devmap!r}' if devmap else '' - aliasconfig = f'alias_config = {alias_config!r}' if alias_config else '' - setup_content = SETUP_TEMPLATE % { - 'cfg': secnode.cfg, 'devname': f'"secnode_{secnode.cfg}"', - 'uri': repr(secnode.host_port), 'nodeargs': nodeargs, 'aliasconfig': aliasconfig} - setup_file.write_text(setup_content) - - def _send_pnp_message(self, nodename, setup, on): - self._current_plugplay[nodename] = None if setup is None else on - self._cache_send(f'se/{nodename}/nicos/setupname', '=' if on else '!', setup) - - def _update(self): - while self._to_close: - secnode = self._secnodes.pop(self._to_close.pop(), None) - self._remove_secnode(secnode) - superfluous_setup_files = set(Path(self.setupdir).glob('*.py')) - superfluous_cfg_files = set(Path(self.wrapperdir).glob('*_cfg.py')) - for secnode in self._secnodes.values(): - nodename = secnode.nodename.replace('/', '_') - if not secnode.cfg: - continue - setup = secnode.get_setup() - setup_file = Path(self.setupdir) / f'{setup}.py' - superfluous_setup_files.discard(setup_file) - superfluous_cfg_files.discard(Path(self.wrapperdir) / f'{secnode.cfg}_cfg.py') - if setup in self._setups: - if not secnode.online: - if secnode.announce_pnp is False: - if self._nicos_idle_since: - secnode.announce_pnp = None - if self.plugplay: - self._send_pnp_message(nodename, setup, False) - elif secnode.host_port not in self._to_close: - secnode.announce_pnp = False - else: - if secnode.online: - if not setup_file.is_file(): - self.log.info('write_setup %r', secnode.cfg) - self._write_setup_file(secnode) - if secnode.announce_pnp is True and self._nicos_idle_since: - secnode.announce_pnp = None - if self.plugplay: - self._send_pnp_message(nodename, setup, True) - self._to_close.discard(secnode.host_port) - for setup in self._setups: - # do not delete active setups - superfluous_setup_files.discard(Path(self.setupdir) / f'{setup}.py') - reload_marche = bool(superfluous_cfg_files) - for file in superfluous_setup_files | superfluous_cfg_files: - self.log.info('remove %s', file) - try: - file.unlink() - except FileNotFoundError: - pass - if reload_marche: - self._marche.reload() - - def doPoll(self): - super().doPoll() - for listener in self._udp_listener: - msg = listener.poll(self.log) - if msg: - uri = msg['uri'] - if uri in self._secnodes: - self.log.info('%r is already known', msg) - else: - self.log.info('%r appeared', msg) - cfg = msg.get('device', '') - if uri.startswith('localhost:') and cfg != 'superfrappy': - self.connect(uri) - now = time.time() - if now > (self._nicos_idle_since or now) + 0.5: - self.setFastPoll(False) - self._update() - - @Command - def rescan(self): - """rescan secop servers in subnet""" - for listener in self._udp_listener: - listener.start() - - def read_value(self): - value = [] - for secnode in self._secnodes.values(): - info = {'cfg': secnode.cfg, 'uri': secnode.host_port, 'status': secnode.status} - value.append(info) - return value - - def read_nicos_setups(self): - return list(self._setups & {s.get_setup() for s in self._secnodes.values()}) - - @Command() - def ping(self): - """nicos cache ping""" - self._cache_send('#ping#', '?', '') - - @Command(argument=StringType()) - def restart(self, cfg): - """restart the frappy server - - works only when it runs on the local machine - """ - self._marche.restart(cfg) - - @Command(argument=StringType()) - def connect(self, host_port): - """connect secnode - - - add a setup file to the setup dir - - :param host_port: box address : - """ - host_port = normalizeuri(host_port, True) - secnode = self._secnodes.get(host_port) - if secnode: - self.log.info('already connected %r', host_port) - else: - self.log.info('connect secnode %r', host_port) - secnode = SecNode(host_port, self._marche.cfg_info.get(host_port, '')) - self._secnodes[host_port] = secnode - secnode.connect(log=self.log) - self.read_value() - - @Command(argument=StringType()) - def main(self, cfg): - """add or change main cfg""" - self._add_secnode('main', cfg) - - @Command(argument=StringType()) - def stick(self, cfg): - """add or change stick cfg""" - self._add_secnode('stick', cfg) - - @Command(argument=StringType()) - def add(self, cfg): - """add an addons cfg""" - self._add_secnode('addons', cfg) - - def _add_secnode(self, service, cfg): - """add and start server on localhost - - - add a wrapper cfg file to the wrapper dir - - add a setup file to the setup dir - - :param service: 'stick', 'main', '' or a stringified port number - :param cfg: config file or equipment id - """ - self.log.info('add and start %r', cfg) - port = self._marche.get_port(service) - host_port = f'localhost:{port}' - secnode = SecNode(host_port, cfg) - self._secnodes[host_port] = secnode - self._marche.add_frappy_service(service, secnode.cfg, secnode.port, self.log) - self.log.info('start %r at %r', secnode.cfg, host_port) - self._marche.start(secnode.cfg) - secnode.connect(log=self.log) - self.read_value() - - def _remove_secnode(self, secnode): - secnode.announce = False - secnode.disconnect() - self.log.info('secnode.host %r', secnode) - if secnode.host == 'localhost': - self._marche.stop(secnode.cfg) - self._update() - self.read_value() - - @Command(argument=StringType()) - def remove(self, host_port_or_cfg): - """remove server - - - for servers on localhost stop the server - - register the setup file for deletion - """ - by_cfg = [s for s in self._secnodes.values() if s.cfg == host_port_or_cfg] - if by_cfg: - secnode = by_cfg[0] - else: - host_port = normalizeuri(host_port_or_cfg, True) - secnode = self._secnodes.get(host_port) - if secnode: - self._remove_secnode(secnode) - self._update() - self.read_value() - else: - raise ValueError(f'no running server found for {host_port_or_cfg}') - - @Command(argument=TupleOf(StringType(), porttype), result=StringType()) - def running_cfg(self, host_port): - """when running, return cfg, else an empty string""" - secnode = self._secnodes.get(host_port) - if secnode and secnode.online: - return secnode.cfg - return '' - - def node_setup_info(self, secnode): - """create aliases and envlist for SECoP devices - - depending on their meaning - """ - modules = secnode.client.modules - result = {} # dict of list of (, ) - device_mapping = {} - reserved_names = {v.lower() for v in MEANINGS.values()} - for modname, moddesc in modules.items(): - if modname.lower() in reserved_names: - device_mapping[modname] = {'name': f'{modname}_'} - meaning = moddesc['properties'].get('meaning') - if meaning: - meaning_name, importance = meaning - if meaning_name not in MEANINGS: - self.log.warning('%s: meaning %r is unknown', modname, meaning_name) - continue - result.setdefault(meaning_name, []).append((importance, modname)) - if meaning_name == 'temperature_regulation': - # add temperature_regulation to temperature list, with very low importance - result.setdefault('temperature', []).append((importance - 100, modname)) - elif meaning_name == 'temperature' and moddesc['parameters'].get('target'): - result.setdefault('temperature_regulation', []).append((importance, modname)) - envlist = [] - alias_config = {} - for meaning_name, info in result.items(): - importance, modname = sorted(info)[-1] - target = MEANINGS.get(meaning_name) - alias_config[target] = {modname: importance} - if target == 'a3' and meaning_name == 'rotation_z': - alias_config['om'] = {modname: importance} - if target not in SKIP_ENV: - envlist.append(target) - return envlist, alias_config, device_mapping - diff --git a/frappy_psi/superfrappy/marche_frappy.py b/frappy_psi/superfrappy/marche_frappy.py deleted file mode 100644 index 35becd3f..00000000 --- a/frappy_psi/superfrappy/marche_frappy.py +++ /dev/null @@ -1,276 +0,0 @@ -# ***************************************************************************** -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# ***************************************************************************** - -import sys -import time -import re -import socket -from pathlib import Path -from configparser import ConfigParser -import logging - - -MARCHESRC = ['/home/software/marche'] -# CFGDIRS = ['/home/linse/config', '/home/l_samenv/linse_config'] - - -def get_logger(previous=[]): - if previous: - return previous[0] - logger = logging.getLogger() - logger.setLevel(logging.INFO) - logger.addHandler(logging.StreamHandler(sys.stdout)) - previous.append(logger) - return logger - - -for marchedir in MARCHESRC: - if Path(marchedir).is_dir(): - sys.path.append(marchedir) - import marche.jobs as mj - from marche.client import Client - break - - -STATUS_MAP = { # values are (, name) - mj.DEAD: (False, False, 'DEAD'), - mj.NOT_RUNNING: (False, False, 'NOT RUNNING'), - mj.STARTING: (True, True, 'STARTING'), - mj.INITIALIZING: (True, True, 'INITIALIZING'), - mj.RUNNING: (False, True, 'RUNNING'), - mj.WARNING: (False, True, 'WARNING'), - mj.STOPPING: (True, False, 'STOPPING'), - mj.NOT_AVAILABLE: (False, False, 'NOT AVAILABLE'), -} - - -def wait_status(cl, service): - delay = 0.2 - while True: - sts = cl.getServiceStatus(service) - if STATUS_MAP[sts][0]: # busy - if delay > 1.5: # this happens after about 5 sec - return False - time.sleep(delay) - delay *= 1.225 - continue - return True - - -class MarcheControl: - port = 8124 - - def __init__(self, host, port=None, user=None, instrument=None): - self.host = host - self.instrument = instrument or socket.gethostname().split('.')[0] - self.user = user or instrument # SINQ instruments - if port is not None: - self.port = port - self._client = None - - def connect(self): - if self._client is None: - # TODO: may need more generic solution for last arg - print(self.host, self.port, self.user) - self._client = Client(self.host, self.port, self.user, self.instrument.upper() + 'LNS') - - # TODO; do we need disconnect? - - def get_service(self, instance): - return instance - - def start(self, instance): - self.connect() - print(self.get_service(instance)) - self._client.startService(self.get_service(instance)) - - def restart(self, instance): - self.connect() - self._client.restartService(self.get_service(instance)) - - def stop(self, instance): - self.connect() - self._client.stopService(self.get_service(instance)) - - def status(self, service): - """returns a dict of (, , )""" - self.connect() - servdict = self._client.getAllServiceInfo().get(service) - if not servdict: - return {} - statedict = servdict['instances'] - return {k: STATUS_MAP[v['state']] for k, v in statedict.items()} - - def reload(self): - self.connect() - self._client.reloadJobs() - - def run(self, action, instance=None, *args): - if action == 'start': - self.start(instance) - elif action == 'restart': - self.restart(instance) - elif action == 'stop': - self.stop(instance) - else: - raise ValueError('unknown args %r', (action, instance) + args) - - -WRAPPER_CFG = """interface = '{port}' -include({cfg!r}) -overrideNode(interface=interface) -""" -WRAPPER_PAT = re.compile(r"interface\s=\s*'(\d*)'\s*\n") - - -class FrappyMarche(MarcheControl): - - def __init__(self, instance, host='localhost', port=None, user=None): - parser = ConfigParser() - parser.optionxform = str - gencfg = '/sq_sw/linse/frappycfg/generalConfig.cfg' - parser.read([gencfg]) - try: - section = dict(parser['superfrappy']) - except KeyError: - raise ValueError(f'bad config {gencfg}') - self.instance = instance # 'this' or an instrument on a generic computer - instrument = section.get('instrument', instance) - if instrument == 'this': - instrument = socket.gethostname().split('.')[0] - print(instance, instrument) - self.instrument = instrument # the instrument name - - self.config = {k: section[k].replace('', instrument) for k in section} - - self.wrapperdir = self.config.pop('wrapperdir') - self.cfgdirs = self.config.pop('cfgdirs') - self.main_port = int(self.config.pop('main_port')) - - if not Path(self.wrapperdir).is_dir(): - raise ValueError(f'{self.wrapperdir} does not exist') - self.get_cfg_info() # do we need to update this from time to time? - super().__init__(host, port, user, instrument) - - def get_service(self, instance): - return f'frappy.{instance}' if self.instance == 'main' else f'frappy.{self.instrument}-{instance}' - - def wrapper_file(self, cfg): - return Path(self.wrapperdir) / f'{cfg}_cfg.py' - - def cfg_file(self, cfgdirs, service, cfg): - cfgpy = f'{cfg}_cfg.py' - tries = [] - for servicedir in (service, ''): - for cfgdir in cfgdirs.split(':'): - cfgfile = Path(cfgdir) / servicedir / cfgpy - tries.append(cfgfile) - if cfgfile.is_file(): - return cfgfile - else: - raise FileNotFoundError(f'can not find {cfgpy} in {tries}') - - def get_std_port(self, service): - port = self.main_port - if service == 'main': - return port - port += 1 - if service == 'stick': - return port - return port + 1, self.main_port + 10 - - def get_local_ports(self): - self.get_cfg_info() - run_state = self.status('frappy') - result = {} - for host_port, cfg in self.cfg_info.items(): - host, port = host_port.split(':') - if host == 'localhost': - busy, on, _ = run_state.get(cfg, (0,0,0)) - if on or busy: - result.setdefault(port, []).append((on, busy, cfg)) - return {sorted(v)[-1][-1]: k for k, v in result.items()} - - def get_port(self, service): - """get a port number for service - - return a predefined port number for 'main' and 'stick' - or a free port number for 'addons' - """ - if service not in {'main', 'stick', 'addons', 'addon'}: - raise ValueError('illegal service argument') - ports = self.get_std_port(service) - if isinstance(ports, int): - return ports - self.get_cfg_info() - used_ports = self.get_local_ports() - for port in range(*ports): - if port not in used_ports: - return port - raise ValueError('too many frappy servers') - - def add_frappy_service(self, service, cfg, port, log=None): - if log is None: - log = get_logger() - log.info('add %r port=%r', cfg, port) - cfgfile = self.cfg_file(self.cfgdirs, service, cfg) - wrapper_content = WRAPPER_CFG.format(cfg=str(cfgfile), port=port) - self.wrapper_file(cfg).write_text(wrapper_content) - self.get_cfg_info() - log.info('wrapper %r %r', self.wrapper_file(cfg), wrapper_content) - self.reload() - log.info('registered %r', cfg) - - def get_cfg_info(self): - """get info from wrapper dir""" - result = {} - for cfgfile in Path(self.wrapperdir).glob('*_cfg.py'): - cfg = cfgfile.stem[:-4] - match = WRAPPER_PAT.match(cfgfile.read_text()) - if match: - result[f'localhost:{match.group(1)}'] = cfg - self.cfg_info = result - - def delete_frappy_service(self, cfg): - try: - self.wrapper_file(cfg).unlink() - self.reload() - except FileNotFoundError: - pass - - def run(self, action, *args): - if action == 'start': - try: - service, cfg = args - except ValueError: - raise ValueError('start needs ') - port = self.get_port(service) - self.add_frappy_service(service, cfg, port) - self.start(cfg) - elif action == 'restart': - if len(args) != 1 or args[0] in {'main', 'stick', 'addons', 'addon'}: - raise ValueError('restart needs ') - self.restart(args[0]) - elif action == 'stop': - if len(args) != 1 or args[0] in {'main', 'stick', 'addons', 'addon'}: - raise ValueError('stop needs ') - self.delete_frappy_service(args[0]) - self.stop(args[0]) - else: - raise ValueError('unknown action %r', action) diff --git a/frappy_psi/superfrappy/normalizeuri.py b/frappy_psi/superfrappy/normalizeuri.py deleted file mode 100644 index ff06284b..00000000 --- a/frappy_psi/superfrappy/normalizeuri.py +++ /dev/null @@ -1,41 +0,0 @@ -import re -import socket - -# sorry for hardwiring this ... there is no CNAME reverse lookup! -# taking the original address as unique name would need a call to -# gethostbyaddr, which might take some time - also not what we want -reverse_alias = { - 'pc15139': 'linse-c', - 'pc16392': 'linse-a', -} - - -def normalizeuri(uri, use_localhost=False): - host, sep, port = uri.partition(':') - if host[0].isdigit(): - if not port and '.' not in host: # assume this is a port number - host, sep, port = 'localhost', ':', host - else: - try: - socket.setdefaulttimeout(1) - host = socket.gethostbyaddr(host)[0].split('.', 1)[0] - except socket.gaierror: - pass # keep numbered IP - finally: - socket.setdefaulttimeout(None) - else: - host = host.split('.', 1)[0] - hostname = socket.gethostname().split('.')[0] - if use_localhost: - if host in (hostname, reverse_alias.get(hostname, hostname)): - host = 'localhost' - else: - if host == 'localhost': - host = hostname - host = reverse_alias.get(host, host) - # strip appended IP when a host is registered twice (at PSI): - match = re.match(r'([^-]+)-129129\d{6}$', host) - host = match.group(1) if match else host - return f'{host}{sep}{port}' - - diff --git a/frappy_psi/superfrappy/secop_udp.py b/frappy_psi/superfrappy/secop_udp.py deleted file mode 100644 index d37c3de2..00000000 --- a/frappy_psi/superfrappy/secop_udp.py +++ /dev/null @@ -1,115 +0,0 @@ -# ***************************************************************************** -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# ***************************************************************************** - -import os -import time -import socket -import json -from select import select -from .normalizeuri import normalizeuri - - -SECOP_UDP_PORT = 10767 - - -class Listener: - socket = None - - def __init__(self, use_localhost=False): - self.use_localhost = use_localhost # whether 'localhost' or the real hostname is returned on the own machine - - def poll(self, log=None): - if self.socket is None: - return None - if not select([self.socket], [], [], 0)[0]: - return None - try: - msg, addr = self.socket.recvfrom(1024) - except socket.error: # pragma: no cover - return None - addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0] - msg = json.loads(msg.decode('utf-8')) - if log: - log.debug('got msg %r', msg) - kind = msg.pop('SECoP', None) - if kind == 'node': - msg['device'] = msg['equipment_id'].split('.')[0] - uri = f"{addr}:{msg['port']}" - elif kind == 'for_other_node': - uri = msg['uri'] - else: - return None - host, _, port = uri.rpartition(':') - host = normalizeuri(host or 'localhost', self.use_localhost) - msg['uri'] = f'{host}:{port}' - return msg - - -class UdpScan(Listener): - def start(self, log=None): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - # send a general broadcast - try: - sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'), - ('255.255.255.255', SECOP_UDP_PORT)) - except OSError as e: - if log: - log.info('could not send the broadcast %r:', e) - self.socket = sock - self.deadline = time.time() + 30 - - def poll(self, log=None): - if self.socket is None: - return None - if time.time() > self.deadline: - try: - self.socket.close() - except Exception: - pass - self.socket = None - return super().poll(log) - - -class UdpListener(Listener): - def start(self): - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.settimeout(1) - if os.name == 'nt': - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - else: - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - sock.bind(('0.0.0.0', SECOP_UDP_PORT)) - self.socket = sock - - -def send_other_udp(uri, instrument, device=None): - """inform the feeder about the start of a frappy server""" - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - msg = { - 'SECoP': 'for_other_node', - 'uri': uri, - 'instrument': instrument, - } - if device: - msg['device'] = device - msg = json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8') - sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT))