[WIP] superfrappy: state as of 2026-05-06

Change-Id: Ifd83a297dd5593db502bb2c0f6ec1c99717d4a4a
This commit is contained in:
2026-05-06 16:02:02 +02:00
parent 8cb0844ac8
commit 331f3e02b1
6 changed files with 452 additions and 1000 deletions
+21 -6
View File
@@ -97,7 +97,7 @@ class SeaConfig:
for dir in self.dirs:
file = dir / json_file
if file.is_file():
return json.load(file.read_text())
return json.loads(file.read_text())
raise FileNotFoundError(f'{json_file} not found')
@@ -129,7 +129,7 @@ class SeaClient(ProxyClient, Module):
raise ConfigError('missing sea port for %s' % instance)
opts['uri'] = {'value': 'tcp://localhost:%s' % port}
self.objects = set()
self.shutdown = False
self._shutdown = threading.Event()
self.path2param = {}
self._write_lock = threading.RLock()
self._connect_thread = None
@@ -200,10 +200,15 @@ class SeaClient(ProxyClient, Module):
raise CommunicationFailedError(f'reply from frappy_config: {result}')
# frappy_async_client switches to the json protocol (better for updates)
self.asynio.writeline(b'frappy_async_client')
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
if self.config == 'device':
self.asynio.writeline(b'get_all_param device')
else:
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
self.log.info('connected to %s', self.uri)
self._connected.set()
mkthread(self._rxthread)
except CommunicationFailedError:
pass
finally:
self._connect_thread = None
@@ -266,14 +271,18 @@ class SeaClient(ProxyClient, Module):
def _rxthread(self):
recheck = None
while not self.shutdown:
while not self._shutdown.is_set():
if recheck and time.time() > recheck:
# try to collect device changes within 1 sec
recheck = None
result = self.request('check_config %s %s' % (self.service, self.config))
if self.service:
result = self.request('check_config %s %s' % (self.service, self.config))
else:
result = '1'
if result == '1':
self.asynio.writeline(('get_all_param ' + ' '.join(self.objects)).encode())
else:
print('SHUTDOWN', result)
self.secNode.srv.shutdown()
try:
reply = self.asynio.readline()
@@ -281,6 +290,7 @@ class SeaClient(ProxyClient, Module):
continue
except ConnectionClosed:
self.close_connections()
print('CLOSED')
break
try:
msg = json.loads(reply)
@@ -331,7 +341,8 @@ class SeaClient(ProxyClient, Module):
if path.startswith('/device'):
if path == '/device/changetime':
recheck = time.time() + 1
elif path.startswith('/device/frappy_%s' % self.service) and value == '':
elif self.service and path.startswith('/device/frappy_%s' % self.service) and value == '':
print('SHUT', self.service, path)
self.secNode.srv.shutdown()
else:
for module, param in mplist:
@@ -343,6 +354,10 @@ class SeaClient(ProxyClient, Module):
# do not update unchanged values within 60 sec
self.updateValue(module, param, value, now, readerror)
def stopPollThread(self):
self._shutdown.set()
super().stopPollThread()
@Command(StringType(), result=StringType())
def communicate(self, command):
"""send a command to SEA"""
+431
View File
@@ -0,0 +1,431 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Module to handle running SEC nodes for an instrument
Also start and stop frappy servers related to SEA
Handled by linsetools:
For all detected SEC nodes, setup files are written to the
setup directory, and they are disabled ('lowlevel') when shut down.
For SEC nodes started on the instrument computer, wrapper config files
containing the port number are produced in the wrapper directory.
"""
import sys
sys.path.append('/home/l_samenv')
sys.path.append('/sq_sw/linse')
import time
import socket
import json
import os
from select import select
from frappy.core import Readable, Parameter, Property, Command, Attached, IDLE
from frappy.datatypes import ArrayOf, StructOf, StringType, IntRange, TupleOf, BoolType
from frappy.client import SecopClient
import linsetools.frappy
from sehistory.normalizeuri import normalizeuri
porttype = IntRange(0, 0xc000)
secnodetype = StructOf(cfg=StringType(), uri=StringType(), service=StringType())
SECOP_UDP_PORT = 10767
class FrappyControl(linsetools.frappy.FrappyControl):
# TODO; move to linsetools
def service_from_uri(self, uri):
try:
idx = self.frappy_servers.index(uri)
return {0: 'main', 1: 'stick'}.get(idx, 'addons')
except ValueError:
return ''
class Listener:
socket = None
def __init__(self, use_localhost=False):
self.use_localhost = use_localhost # whether 'localhost' or the real hostname is returned on the own machine
def poll(self, log=None):
if self.socket is None:
return None
if not select([self.socket], [], [], 0)[0]:
return None
try:
msg, addr = self.socket.recvfrom(1024)
except socket.error: # pragma: no cover
return None
addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0]
msg = json.loads(msg.decode('utf-8'))
if log:
log.debug('got msg %r', msg)
kind = msg.pop('SECoP', None)
if kind == 'node':
msg['device'] = msg['equipment_id'].split('.')[0]
uri = f"{addr}:{msg['port']}"
elif kind == 'for_other_node':
uri = msg['uri']
else:
return None
host, _, port = uri.rpartition(':')
host = normalizeuri(host or 'localhost', self.use_localhost)
msg['uri'] = f'{host}:{port}'
return msg
class UdpScan(Listener):
def start(self, log=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# send a general broadcast
try:
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))
except OSError as e:
if log:
log.info('could not send the broadcast %r:', e)
self.socket = sock
self.deadline = time.time() + 30
def poll(self, log=None):
if self.socket is None:
return None
if time.time() > self.deadline:
try:
self.socket.close()
except Exception:
pass
self.socket = None
return super().poll(log)
class UdpListener(Listener):
def start(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
if os.name == 'nt':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', SECOP_UDP_PORT))
self.socket = sock
def send_other_udp(uri, instrument, device=None):
"""inform the feeder about the start of a frappy server"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {
'SECoP': 'for_other_node',
'uri': uri,
'instrument': instrument,
}
if device:
msg['device'] = device
msg = json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT))
class SecNode:
log = None
def __init__(self, host_port, cfg, service=''):
self.host_port = host_port
self.host, _, self.port = host_port.partition(':')
self.client = SecopClient(host_port)
self.client.register_callback(None, self.nodeStateChange)
self.nodename = cfg
self.cfg = cfg
self.description = cfg
self.online = False
self.status = 'created'
self.trigger = True
# self.announce_pnp = True # None: do not announce, True: announce new, False: announce removal
self.setup_was_loaded = False
self.service = service
def connect(self, complete_callback=None, log=None):
self.status = 'connecting'
self.log = log
self.complete_callback = complete_callback
if self.log:
self.log.info('spawn connect')
self.client.spawn_connect(self.complete)
def complete(self):
try:
self.online = True
self.status = 'completing'
self.nodename = self.client.nodename
if self.log:
self.log.info('connected to %r', self.nodename)
if self.complete_callback:
if not self.cfg:
if self.nodename == self.client.uri:
self.cfg = self.host_port
else:
self.cfg = self.nodename.replace('.', '_')
try:
self.complete_callback(self)
except Exception as e:
self.log.exception('complete_callback failed')
self.complete_callback = None
desc = self.client.properties.get('description') or self.nodename
self.description = desc.split('\n')[0]
self.status = 'connected'
except Exception as e:
self.log.exception('connect failed')
self.status = f'disconnected {e!r}'
def nodeStateChange(self, online, statetext):
pass
def disconnect(self):
self.online = False
self.status = 'disconnecting'
self.client.disconnect()
self.status = 'disconnected'
class SuperFrappy(Readable):
seascan = Attached()
marcheport = Property('marche port number', porttype, default=8124)
is_main_instrument = Property('this is the main instrument', BoolType(), default=True)
value = Parameter('running servers', ArrayOf(secnodetype), default=())
instance = Parameter('"this" or <instrument>', StringType(), default='this')
# plugplay = Parameter('enable plug and play', BoolType(), readonly=False, default=False)
# nicos_setups = Parameter('active nicos se setups', ArrayOf(StringType()))
_secnodes = None # dict <host_post> of SecNode
_udp_listener = None
_setups = ()
_to_close = ()
_fast_deadline = 0
def initModule(self):
super().initModule()
# fc = FrappyControl(self.instance, 'localhost', self.marcheport)
self._secnodes = {}
self._udp_listener = [UdpScan(True), UdpListener(True)]
self.rescan()
self._to_close = set()
for pname in 'device_name', 'stick_name', 'addons':
self.seascan.addCallback(pname, self.sea_update, pname)
def add_secnode(self, service, cfg, start):
self.log.info('add %r as %r', cfg, service)
# TODO: treat unknown service
for secnode in self._secnodes.values():
if secnode.service == service:
if cfg == secnode.cfg:
return
elif cfg == secnode.cfg:
raise ValueError(f'{cfg} is already used as {secnode.service!r}')
fc = FrappyControl(self.instance, 'localhost', self.marcheport)
uri = fc.add_frappy_service(service, cfg, None, self.log)
if start:
self.log.info('start %r', cfg)
fc.start(cfg)
self._secnodes[uri] = secnode = SecNode(uri, cfg, service)
secnode.connect()
self.read_value()
self._fast_deadline = time.time() + 15
self.setFastPoll(True, 0.25)
def remove_secnode(self, service_or_cfg, remaining=()):
fc = FrappyControl(self.instance, 'localhost', self.marcheport)
count = 0
for uri, secnode in list(self._secnodes.items()):
if secnode.service == service_or_cfg:
cfg = secnode.cfg
elif secnode.cfg == service_or_cfg:
cfg = service_or_cfg
else:
continue
if cfg not in remaining:
count += 1
self.log.info('remove %s at %s', secnode.cfg, uri)
secnode.disconnect()
fc.stop(cfg)
fc.delete_frappy_service(cfg)
self.read_value()
return count
def sea_update(self, pname, value, error):
if error or not value:
self.log.warn('SEA ignore %r %r %r', pname, value, error)
return
self.log.warn('SEA %r %r %r', pname, value, error)
fc = FrappyControl(self.instance, 'localhost', self.marcheport)
if pname == 'device_name':
service = 'main'
configs = [f'{value}.config']
elif pname == 'stick_name':
service = 'stick'
configs = [f'{value}.stick']
else:
service = 'addons'
configs = [f'{v}.config' for v in value.split()]
self.log.warn('SeA %r', configs)
if value:
fc.all_cfg(self.instance, service, details=True)
for seacfg in configs:
cfgs = list(fc.sea2frappy.get(seacfg, ()))
if len(cfgs) == 1:
cfg = cfgs[0]
self.log.warn('SeA cfg=%s', cfg)
try:
self.add_secnode(service, cfg, False) # TODO: must be True, finally
except ValueError as e:
self.log.warn('%s', e)
elif len(cfgs) > 1:
self.log.warn('frappy config for %s is ambiguous: %s', seacfg, ','.join(cfgs))
if service == 'addons':
remaining = {fc.sea2frappy.get(f'{v["cfg"]}.addons') for v in self.value}
else:
return
else:
remaining = ()
self.remove_secnode(service, remaining=remaining)
def doPoll(self):
super().doPoll()
done = False
for listener in self._udp_listener:
while msg := listener.poll(self.log):
done = True
uri = msg['uri']
if uri in self._secnodes:
self.log.info('%r is already known', msg)
else:
cfg = msg.get('device', '')
# if uri.startswith('localhost:') and cfg != 'superfrappy':
if cfg != 'superfrappy':
self.log.warn('%r appeared', msg)
try:
self.connect(uri)
except Exception as e:
self.log.warning('%s: %r', uri, e)
else:
self.log.info('skip %r', msg)
if self._fast_deadline == 0:
self.log.warn('INIT')
try:
for pname in 'device_name', 'stick_name', 'addons':
method = f'read_{pname}'
value = getattr(self.seascan, method)()
self.sea_update(pname, value, None)
self._fast_deadline = time.time() + 15
self.setFastPoll(True, 0.25)
except Exception as e:
self.log.error('doPoll %r', e)
if not done and time.time() > self._fast_deadline:
self.setFastPoll(False)
def read_value(self):
value = []
for secnode in self._secnodes.values():
info = dict(cfg=secnode.cfg, uri=secnode.host_port, service=secnode.service)
value.append(info)
if tuple(value) != self.value:
self.log.info('----')
for info in value:
formatted = '%(cfg)s uri=%(uri)s service=%(service)s' % info
self.log.info('---- %s', formatted)
return value
def read_status(self):
return IDLE, ', '.join((f'{k}: {v.status}' for k, v in self._secnodes.items()))
@Command
def rescan(self):
"""rescan secop servers in subnet"""
for listener in self._udp_listener:
listener.start()
@Command(argument=StringType())
def restart(self, cfg):
"""restart the frappy server
works only when it runs on the local machine
"""
fc = FrappyControl(self.instance, 'localhost', self.marcheport)
fc.restart(cfg)
@Command(argument=StringType())
def connect(self, host_port):
"""connect secnode
:param host_port: box address <host>:<port>
"""
service, _, host_port = host_port.rpartition('=')
host_port = normalizeuri(host_port, True)
secnode = self._secnodes.get(host_port)
if secnode:
self.log.info('already connected %r', host_port)
else:
self.log.info('connect secnode %r', host_port)
fc = FrappyControl(self.instance, 'localhost', self.marcheport)
if not service:
service = fc.service_from_uri(host_port)
if not service:
raise ValueError(f'can not determine service from {host_port}')
secnode = SecNode(host_port, fc.cfg_info.get(host_port, ''), service)
self._secnodes[host_port] = secnode
secnode.connect(log=self.log)
self.read_value()
@Command(argument=StringType())
def main(self, cfg):
"""add or change main cfg"""
self.add_secnode('main', cfg, True)
@Command(argument=StringType())
def stick(self, cfg):
"""add or change stick cfg"""
self.add_secnode('stick', cfg, True)
@Command(argument=StringType())
def add(self, cfg):
"""add an addons cfg"""
self.add_secnode('addons', cfg, True)
@Command(argument=StringType())
def remove(self, service_or_cfg):
"""remove server
- for servers on localhost stop the server
- register the setup file for deletion
"""
count = self.remove_secnode(service_or_cfg)
if count == 0:
raise ValueError(f'no running server found for {service_or_cfg}')
@Command(argument=TupleOf(StringType(), porttype), result=StringType())
def running_cfg(self, host_port):
"""when running, return cfg, else an empty string"""
secnode = self._secnodes.get(host_port)
if secnode and secnode.online:
return secnode.cfg
return ''
-562
View File
@@ -1,562 +0,0 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""module to trigger plug and play mechanism
For all detected secop servers, setup files are written to the
setup directory.
For SEC nodes started on the instrument computer, wrapper config files
containing the port number are produced in the wrapper directory.
Mechanism:
start or detect a SecNode
(on start only) create a wrapper file and tell marche to start
add SecNode to superfrappy._secnodes in 'connecting' mode
action table:
connection == online, setupfile does not exist:
- announce = new, create wrapperfile (if needed) and setup file
nicos == idle, setup not in loaded_setups, announce == new:
- announce = None, plugplay = new
setup in loaded_setups, plugplay == new:
- pluglay = quiet (handled in _handle_setups)
connection != online, setup in loaded_setups:
- announce = cancel
nicos == idle, setup not in loaded_setups, announce == cancel:
- announce = None, plugplay = cancel
setup not in loaded_setups, plugplay == cancel:
- plugplay = quiet (handled in _handle_setups)
setup not in loaded_setups, plugplay == quiet and setupfile exists:
- remove setupfile and wrapper file (if present)
"""
import time
import socket
import re
from ast import literal_eval
from pathlib import Path
from frappy.lib import mkthread, formatExtendedTraceback
from frappy.core import Readable, Parameter, Property, Command, Communicator, HasIO
from frappy.datatypes import ArrayOf, StructOf, StringType, IntRange, TupleOf, BoolType
from frappy.client import SecopClient
from .marche_frappy import FrappyMarche
from .secop_udp import UdpScan, UdpListener
from .normalizeuri import normalizeuri
porttype = IntRange(0, 0xc000)
secnodetype = StructOf(cfg=StringType(), uri=StringType(), status=StringType())
SETUP_TEMPLATE = """description = 'frappy %(cfg)s setup'
group = 'plugplay'
devices = {
%(devname)s:
device('nicos_sinq.frappy_sinq.new.FrappyMarcheNode',
description='%(cfg)s SEC node', unit='', async_only=True,
prefix='se_', auto_create=True,
uri=%(uri)s,
general_stop_whitelist=['om', 'stickrot'],
%(nodeargs)s),
}
%(aliasconfig)s
"""
MEANINGS = {
'temperature': 'Ts',
'temperature_regulation': 'T',
'magneticfield': 'B',
'pressure': 'p',
'rotation_z': 'a3',
'stick_rotation': 'dom',
}
SKIP_ENV = 'rotation_z', 'dom'
class SecNode:
log = None
def __init__(self, host_port, cfg):
self.host_port = host_port
self.host, _, self.port = host_port.partition(':')
self.client = SecopClient(host_port)
self.nodename = cfg
self.cfg = cfg
self.description = cfg
self.online = False
self.status = 'created'
self.trigger = True
self.announce_pnp = True # None: do not announce, True: announce new, False: announce removal
self.setup_was_loaded = False
def connect(self, complete_callback=None, log=None):
self.status = 'connecting'
self.log = log
self.complete_callback = complete_callback
if self.log:
self.log.info('spawn connect')
self.client.spawn_connect(self.complete)
def get_setup(self):
return f'se_{self.cfg}'
def complete(self):
try:
self.online = True
self.status = 'completing'
self.nodename = self.client.nodename
if self.log:
self.log.info('connected to %r', self.nodename)
if self.complete_callback:
if not self.cfg:
if self.nodename == self.client.uri:
self.cfg = self.host_port
else:
self.cfg = self.nodename.replace('.', '_')
try:
self.complete_callback(self)
except Exception as e:
self.log.exception('complete_callback failed')
self.complete_callback = None
desc = self.client.properties.get('description') or self.nodename
self.description = desc.split('\n')[0]
self.status = 'connected'
except Exception as e:
self.log.exception('connect failed')
self.status = f'disconnected {e!r}'
def disconnect(self):
self.online = False
self.status = 'disconnecting'
self.client.disconnect()
self.status = 'disconnected'
def noop(*args):
pass
MSGPAT = re.compile(r'([^=!]*)(?:([=!])(.*))?')
def noop(*args):
pass
def get_lookup_key(key):
split = key.split('/')
return f'/{split[-2]}/{split[-1]}' if len(split) > 2 else '/'.join(split[-2:])
class NicosCache(Communicator):
uri = Property('<host>:<port>>', StringType(), default='localhost')
_error = None
_sock = None
def doPoll(self):
if self._error:
self.log.error('%r', self._error)
self._error = None
@Command(argument=StringType())
def communicate(self, command):
"""send a command, do not wait for any response"""
self._connect()
self.log.info('> %r', command)
self._sock.sendall(command.encode() + b'\n')
def _connect(self, keys=()):
if self._sock is None:
host, _, port = self.uri.partition(':')
self._sock = socket.create_connection((host or 'localhost', int(port or 14869)))
for lookup_key in keys:
msg = f'{lookup_key}*\n{lookup_key}:\n'
self._sock.sendall(msg.encode())
def recvloop(self, handlers, other=noop, exception=noop):
handler_lookup = {}
for key, handler in handlers.items():
pat = re.compile(key.replace('*', '.*'))
lookup_key = get_lookup_key(key)
handler_lookup.setdefault(lookup_key, []).append((pat, handler))
while True:
try:
self._connect(handler_lookup)
except Exception:
self._error = formatExtendedTraceback()
time.sleep(10)
continue
try:
buffer = b''
while True:
raw = self._sock.recv(8192)
if not raw:
break
messages = (buffer + raw).split(b'\n')
buffer = messages.pop()
for msg in messages:
msg = msg.strip().decode()
match = MSGPAT.match(msg)
if not match:
other(msg)
continue
key, op, value = match.groups()
lookup_key = get_lookup_key(key)
for pat, handler in handler_lookup.get(lookup_key, ()):
if pat.match(key):
try:
if value is not None:
value = literal_eval(value)
handler(key, op, value)
except Exception as e:
self._error = formatExtendedTraceback()
exception(e)
break
else:
other(key)
messages.extend(raw.split(b'\n'))
except Exception as e:
self._error = formatExtendedTraceback()
self._sock = None
exception(e)
raise
class SuperFrappy(HasIO, Readable):
ioClass = NicosCache
marcheport = Property('marche port number', porttype, default=8124)
is_main_instrument = Property('this is the main instrument', BoolType(), default=True)
value = Parameter('running servers', ArrayOf(secnodetype), default=())
instance = Parameter('"this" or <instrument>', StringType())
plugplay = Parameter('enable plug and play', BoolType(), readonly=False, default=False)
nicos_setups = Parameter('active nicos se setups', ArrayOf(StringType()))
_marche = None
_secnodes = None # dict <host_post> of SecNode
_udp_listener = None
_nicos_idle_since = None
_setups = ()
_to_close = ()
def initModule(self):
super().initModule()
self._marche = FrappyMarche(self.instance, 'localhost', self.marcheport)
self.setupdir = self._marche.config['setupdir']
self.wrapperdir = self._marche.wrapperdir
self._secnodes = {}
self._udp_listener = [UdpScan(True), UdpListener(True)]
self.rescan()
self._announced_setups = {}
self._current_plugplay = {}
self._to_close = set()
# self.log.info('%r', self.log.handlers[0].setLevel(10))
mkthread(self.io.recvloop, {
'nicos/session/mastersetupexplicit': self._handle_setups,
'nicos/exp/scripts': self._handle_scripts,
'se/*/nicos/setupname': self._handle_plugplay,
})
def _cache_send(self, key, op, value=''):
if op in ('=', '!'):
value = repr(value)
self.io.communicate(key + op + value)
def _handle_scripts(self, key, op, value):
self.log.info('scripts %r', value)
if value:
self._nicos_idle_since = None
self.setFastPoll(False)
else:
if not self._nicos_idle_since:
self._nicos_idle_since = time.time()
self.setFastPoll(True, 0.25)
def _handle_setups(self, key, op, value):
self._setups = set(value)
self.log.info('setups %r', value)
for secnode in self._secnodes.values():
setup = secnode.get_setup()
loaded = setup in self._setups
pnp = self._current_plugplay.get(secnode.nodename)
if loaded:
if pnp is True:
if self.plugplay:
self._send_pnp_message(secnode.nodename, None, True)
secnode.status = 'loaded'
secnode.setup_was_loaded = True
else:
if pnp is False:
if self.plugplay:
self._send_pnp_message(secnode.nodename, None, False)
if secnode.setup_was_loaded and secnode.host == 'localhost' or not secnode.client.online:
secnode.status = 'to_close'
self._to_close.add(secnode.host_port)
def _handle_plugplay(self, key, op, value):
value = None if value is None else (op == '=')
if value is None:
self._current_plugplay.pop(key, None)
else:
self._current_plugplay[key] = value
self.log.info('pnp %r', self._current_plugplay)
def _write_setup_file(self, secnode):
setup_file = Path(self.setupdir) / f'{secnode.get_setup()}.py'
envlist, alias_config, devmap = self.node_setup_info(secnode)
nodeargs = f'device_mapping={devmap!r}' if devmap else ''
aliasconfig = f'alias_config = {alias_config!r}' if alias_config else ''
setup_content = SETUP_TEMPLATE % {
'cfg': secnode.cfg, 'devname': f'"secnode_{secnode.cfg}"',
'uri': repr(secnode.host_port), 'nodeargs': nodeargs, 'aliasconfig': aliasconfig}
setup_file.write_text(setup_content)
def _send_pnp_message(self, nodename, setup, on):
self._current_plugplay[nodename] = None if setup is None else on
self._cache_send(f'se/{nodename}/nicos/setupname', '=' if on else '!', setup)
def _update(self):
while self._to_close:
secnode = self._secnodes.pop(self._to_close.pop(), None)
self._remove_secnode(secnode)
superfluous_setup_files = set(Path(self.setupdir).glob('*.py'))
superfluous_cfg_files = set(Path(self.wrapperdir).glob('*_cfg.py'))
for secnode in self._secnodes.values():
nodename = secnode.nodename.replace('/', '_')
if not secnode.cfg:
continue
setup = secnode.get_setup()
setup_file = Path(self.setupdir) / f'{setup}.py'
superfluous_setup_files.discard(setup_file)
superfluous_cfg_files.discard(Path(self.wrapperdir) / f'{secnode.cfg}_cfg.py')
if setup in self._setups:
if not secnode.online:
if secnode.announce_pnp is False:
if self._nicos_idle_since:
secnode.announce_pnp = None
if self.plugplay:
self._send_pnp_message(nodename, setup, False)
elif secnode.host_port not in self._to_close:
secnode.announce_pnp = False
else:
if secnode.online:
if not setup_file.is_file():
self.log.info('write_setup %r', secnode.cfg)
self._write_setup_file(secnode)
if secnode.announce_pnp is True and self._nicos_idle_since:
secnode.announce_pnp = None
if self.plugplay:
self._send_pnp_message(nodename, setup, True)
self._to_close.discard(secnode.host_port)
for setup in self._setups:
# do not delete active setups
superfluous_setup_files.discard(Path(self.setupdir) / f'{setup}.py')
reload_marche = bool(superfluous_cfg_files)
for file in superfluous_setup_files | superfluous_cfg_files:
self.log.info('remove %s', file)
try:
file.unlink()
except FileNotFoundError:
pass
if reload_marche:
self._marche.reload()
def doPoll(self):
super().doPoll()
for listener in self._udp_listener:
msg = listener.poll(self.log)
if msg:
uri = msg['uri']
if uri in self._secnodes:
self.log.info('%r is already known', msg)
else:
self.log.info('%r appeared', msg)
cfg = msg.get('device', '')
if uri.startswith('localhost:') and cfg != 'superfrappy':
self.connect(uri)
now = time.time()
if now > (self._nicos_idle_since or now) + 0.5:
self.setFastPoll(False)
self._update()
@Command
def rescan(self):
"""rescan secop servers in subnet"""
for listener in self._udp_listener:
listener.start()
def read_value(self):
value = []
for secnode in self._secnodes.values():
info = {'cfg': secnode.cfg, 'uri': secnode.host_port, 'status': secnode.status}
value.append(info)
return value
def read_nicos_setups(self):
return list(self._setups & {s.get_setup() for s in self._secnodes.values()})
@Command()
def ping(self):
"""nicos cache ping"""
self._cache_send('#ping#', '?', '')
@Command(argument=StringType())
def restart(self, cfg):
"""restart the frappy server
works only when it runs on the local machine
"""
self._marche.restart(cfg)
@Command(argument=StringType())
def connect(self, host_port):
"""connect secnode
- add a setup file to the setup dir
:param host_port: box address <host>:<port>
"""
host_port = normalizeuri(host_port, True)
secnode = self._secnodes.get(host_port)
if secnode:
self.log.info('already connected %r', host_port)
else:
self.log.info('connect secnode %r', host_port)
secnode = SecNode(host_port, self._marche.cfg_info.get(host_port, ''))
self._secnodes[host_port] = secnode
secnode.connect(log=self.log)
self.read_value()
@Command(argument=StringType())
def main(self, cfg):
"""add or change main cfg"""
self._add_secnode('main', cfg)
@Command(argument=StringType())
def stick(self, cfg):
"""add or change stick cfg"""
self._add_secnode('stick', cfg)
@Command(argument=StringType())
def add(self, cfg):
"""add an addons cfg"""
self._add_secnode('addons', cfg)
def _add_secnode(self, service, cfg):
"""add and start server on localhost
- add a wrapper cfg file to the wrapper dir
- add a setup file to the setup dir
:param service: 'stick', 'main', '' or a stringified port number
:param cfg: config file or equipment id
"""
self.log.info('add and start %r', cfg)
port = self._marche.get_port(service)
host_port = f'localhost:{port}'
secnode = SecNode(host_port, cfg)
self._secnodes[host_port] = secnode
self._marche.add_frappy_service(service, secnode.cfg, secnode.port, self.log)
self.log.info('start %r at %r', secnode.cfg, host_port)
self._marche.start(secnode.cfg)
secnode.connect(log=self.log)
self.read_value()
def _remove_secnode(self, secnode):
secnode.announce = False
secnode.disconnect()
self.log.info('secnode.host %r', secnode)
if secnode.host == 'localhost':
self._marche.stop(secnode.cfg)
self._update()
self.read_value()
@Command(argument=StringType())
def remove(self, host_port_or_cfg):
"""remove server
- for servers on localhost stop the server
- register the setup file for deletion
"""
by_cfg = [s for s in self._secnodes.values() if s.cfg == host_port_or_cfg]
if by_cfg:
secnode = by_cfg[0]
else:
host_port = normalizeuri(host_port_or_cfg, True)
secnode = self._secnodes.get(host_port)
if secnode:
self._remove_secnode(secnode)
self._update()
self.read_value()
else:
raise ValueError(f'no running server found for {host_port_or_cfg}')
@Command(argument=TupleOf(StringType(), porttype), result=StringType())
def running_cfg(self, host_port):
"""when running, return cfg, else an empty string"""
secnode = self._secnodes.get(host_port)
if secnode and secnode.online:
return secnode.cfg
return ''
def node_setup_info(self, secnode):
"""create aliases and envlist for SECoP devices
depending on their meaning
"""
modules = secnode.client.modules
result = {} # dict <meaning name> of list of (<importance>, <target>)
device_mapping = {}
reserved_names = {v.lower() for v in MEANINGS.values()}
for modname, moddesc in modules.items():
if modname.lower() in reserved_names:
device_mapping[modname] = {'name': f'{modname}_'}
meaning = moddesc['properties'].get('meaning')
if meaning:
meaning_name, importance = meaning
if meaning_name not in MEANINGS:
self.log.warning('%s: meaning %r is unknown', modname, meaning_name)
continue
result.setdefault(meaning_name, []).append((importance, modname))
if meaning_name == 'temperature_regulation':
# add temperature_regulation to temperature list, with very low importance
result.setdefault('temperature', []).append((importance - 100, modname))
elif meaning_name == 'temperature' and moddesc['parameters'].get('target'):
result.setdefault('temperature_regulation', []).append((importance, modname))
envlist = []
alias_config = {}
for meaning_name, info in result.items():
importance, modname = sorted(info)[-1]
target = MEANINGS.get(meaning_name)
alias_config[target] = {modname: importance}
if target == 'a3' and meaning_name == 'rotation_z':
alias_config['om'] = {modname: importance}
if target not in SKIP_ENV:
envlist.append(target)
return envlist, alias_config, device_mapping
-276
View File
@@ -1,276 +0,0 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import sys
import time
import re
import socket
from pathlib import Path
from configparser import ConfigParser
import logging
MARCHESRC = ['/home/software/marche']
# CFGDIRS = ['/home/linse/config', '/home/l_samenv/linse_config']
def get_logger(previous=[]):
if previous:
return previous[0]
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
previous.append(logger)
return logger
for marchedir in MARCHESRC:
if Path(marchedir).is_dir():
sys.path.append(marchedir)
import marche.jobs as mj
from marche.client import Client
break
STATUS_MAP = { # values are (<busy: bool), <target state:bool>, name)
mj.DEAD: (False, False, 'DEAD'),
mj.NOT_RUNNING: (False, False, 'NOT RUNNING'),
mj.STARTING: (True, True, 'STARTING'),
mj.INITIALIZING: (True, True, 'INITIALIZING'),
mj.RUNNING: (False, True, 'RUNNING'),
mj.WARNING: (False, True, 'WARNING'),
mj.STOPPING: (True, False, 'STOPPING'),
mj.NOT_AVAILABLE: (False, False, 'NOT AVAILABLE'),
}
def wait_status(cl, service):
delay = 0.2
while True:
sts = cl.getServiceStatus(service)
if STATUS_MAP[sts][0]: # busy
if delay > 1.5: # this happens after about 5 sec
return False
time.sleep(delay)
delay *= 1.225
continue
return True
class MarcheControl:
port = 8124
def __init__(self, host, port=None, user=None, instrument=None):
self.host = host
self.instrument = instrument or socket.gethostname().split('.')[0]
self.user = user or instrument # SINQ instruments
if port is not None:
self.port = port
self._client = None
def connect(self):
if self._client is None:
# TODO: may need more generic solution for last arg
print(self.host, self.port, self.user)
self._client = Client(self.host, self.port, self.user, self.instrument.upper() + 'LNS')
# TODO; do we need disconnect?
def get_service(self, instance):
return instance
def start(self, instance):
self.connect()
print(self.get_service(instance))
self._client.startService(self.get_service(instance))
def restart(self, instance):
self.connect()
self._client.restartService(self.get_service(instance))
def stop(self, instance):
self.connect()
self._client.stopService(self.get_service(instance))
def status(self, service):
"""returns a dict <service> of (<busy>, <running (now or soon)>, <state name>)"""
self.connect()
servdict = self._client.getAllServiceInfo().get(service)
if not servdict:
return {}
statedict = servdict['instances']
return {k: STATUS_MAP[v['state']] for k, v in statedict.items()}
def reload(self):
self.connect()
self._client.reloadJobs()
def run(self, action, instance=None, *args):
if action == 'start':
self.start(instance)
elif action == 'restart':
self.restart(instance)
elif action == 'stop':
self.stop(instance)
else:
raise ValueError('unknown args %r', (action, instance) + args)
WRAPPER_CFG = """interface = '{port}'
include({cfg!r})
overrideNode(interface=interface)
"""
WRAPPER_PAT = re.compile(r"interface\s=\s*'(\d*)'\s*\n")
class FrappyMarche(MarcheControl):
def __init__(self, instance, host='localhost', port=None, user=None):
parser = ConfigParser()
parser.optionxform = str
gencfg = '/sq_sw/linse/frappycfg/generalConfig.cfg'
parser.read([gencfg])
try:
section = dict(parser['superfrappy'])
except KeyError:
raise ValueError(f'bad config {gencfg}')
self.instance = instance # 'this' or an instrument on a generic computer
instrument = section.get('instrument', instance)
if instrument == 'this':
instrument = socket.gethostname().split('.')[0]
print(instance, instrument)
self.instrument = instrument # the instrument name
self.config = {k: section[k].replace('<INS>', instrument) for k in section}
self.wrapperdir = self.config.pop('wrapperdir')
self.cfgdirs = self.config.pop('cfgdirs')
self.main_port = int(self.config.pop('main_port'))
if not Path(self.wrapperdir).is_dir():
raise ValueError(f'{self.wrapperdir} does not exist')
self.get_cfg_info() # do we need to update this from time to time?
super().__init__(host, port, user, instrument)
def get_service(self, instance):
return f'frappy.{instance}' if self.instance == 'main' else f'frappy.{self.instrument}-{instance}'
def wrapper_file(self, cfg):
return Path(self.wrapperdir) / f'{cfg}_cfg.py'
def cfg_file(self, cfgdirs, service, cfg):
cfgpy = f'{cfg}_cfg.py'
tries = []
for servicedir in (service, ''):
for cfgdir in cfgdirs.split(':'):
cfgfile = Path(cfgdir) / servicedir / cfgpy
tries.append(cfgfile)
if cfgfile.is_file():
return cfgfile
else:
raise FileNotFoundError(f'can not find {cfgpy} in {tries}')
def get_std_port(self, service):
port = self.main_port
if service == 'main':
return port
port += 1
if service == 'stick':
return port
return port + 1, self.main_port + 10
def get_local_ports(self):
self.get_cfg_info()
run_state = self.status('frappy')
result = {}
for host_port, cfg in self.cfg_info.items():
host, port = host_port.split(':')
if host == 'localhost':
busy, on, _ = run_state.get(cfg, (0,0,0))
if on or busy:
result.setdefault(port, []).append((on, busy, cfg))
return {sorted(v)[-1][-1]: k for k, v in result.items()}
def get_port(self, service):
"""get a port number for service
return a predefined port number for 'main' and 'stick'
or a free port number for 'addons'
"""
if service not in {'main', 'stick', 'addons', 'addon'}:
raise ValueError('illegal service argument')
ports = self.get_std_port(service)
if isinstance(ports, int):
return ports
self.get_cfg_info()
used_ports = self.get_local_ports()
for port in range(*ports):
if port not in used_ports:
return port
raise ValueError('too many frappy servers')
def add_frappy_service(self, service, cfg, port, log=None):
if log is None:
log = get_logger()
log.info('add %r port=%r', cfg, port)
cfgfile = self.cfg_file(self.cfgdirs, service, cfg)
wrapper_content = WRAPPER_CFG.format(cfg=str(cfgfile), port=port)
self.wrapper_file(cfg).write_text(wrapper_content)
self.get_cfg_info()
log.info('wrapper %r %r', self.wrapper_file(cfg), wrapper_content)
self.reload()
log.info('registered %r', cfg)
def get_cfg_info(self):
"""get info from wrapper dir"""
result = {}
for cfgfile in Path(self.wrapperdir).glob('*_cfg.py'):
cfg = cfgfile.stem[:-4]
match = WRAPPER_PAT.match(cfgfile.read_text())
if match:
result[f'localhost:{match.group(1)}'] = cfg
self.cfg_info = result
def delete_frappy_service(self, cfg):
try:
self.wrapper_file(cfg).unlink()
self.reload()
except FileNotFoundError:
pass
def run(self, action, *args):
if action == 'start':
try:
service, cfg = args
except ValueError:
raise ValueError('start needs <service> <cfg>')
port = self.get_port(service)
self.add_frappy_service(service, cfg, port)
self.start(cfg)
elif action == 'restart':
if len(args) != 1 or args[0] in {'main', 'stick', 'addons', 'addon'}:
raise ValueError('restart needs <cfg>')
self.restart(args[0])
elif action == 'stop':
if len(args) != 1 or args[0] in {'main', 'stick', 'addons', 'addon'}:
raise ValueError('stop needs <cfg>')
self.delete_frappy_service(args[0])
self.stop(args[0])
else:
raise ValueError('unknown action %r', action)
-41
View File
@@ -1,41 +0,0 @@
import re
import socket
# sorry for hardwiring this ... there is no CNAME reverse lookup!
# taking the original address as unique name would need a call to
# gethostbyaddr, which might take some time - also not what we want
reverse_alias = {
'pc15139': 'linse-c',
'pc16392': 'linse-a',
}
def normalizeuri(uri, use_localhost=False):
host, sep, port = uri.partition(':')
if host[0].isdigit():
if not port and '.' not in host: # assume this is a port number
host, sep, port = 'localhost', ':', host
else:
try:
socket.setdefaulttimeout(1)
host = socket.gethostbyaddr(host)[0].split('.', 1)[0]
except socket.gaierror:
pass # keep numbered IP
finally:
socket.setdefaulttimeout(None)
else:
host = host.split('.', 1)[0]
hostname = socket.gethostname().split('.')[0]
if use_localhost:
if host in (hostname, reverse_alias.get(hostname, hostname)):
host = 'localhost'
else:
if host == 'localhost':
host = hostname
host = reverse_alias.get(host, host)
# strip appended IP when a host is registered twice (at PSI):
match = re.match(r'([^-]+)-129129\d{6}$', host)
host = match.group(1) if match else host
return f'{host}{sep}{port}'
-115
View File
@@ -1,115 +0,0 @@
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
import os
import time
import socket
import json
from select import select
from .normalizeuri import normalizeuri
SECOP_UDP_PORT = 10767
class Listener:
socket = None
def __init__(self, use_localhost=False):
self.use_localhost = use_localhost # whether 'localhost' or the real hostname is returned on the own machine
def poll(self, log=None):
if self.socket is None:
return None
if not select([self.socket], [], [], 0)[0]:
return None
try:
msg, addr = self.socket.recvfrom(1024)
except socket.error: # pragma: no cover
return None
addr = socket.getnameinfo(addr, socket.NI_NOFQDN)[0]
msg = json.loads(msg.decode('utf-8'))
if log:
log.debug('got msg %r', msg)
kind = msg.pop('SECoP', None)
if kind == 'node':
msg['device'] = msg['equipment_id'].split('.')[0]
uri = f"{addr}:{msg['port']}"
elif kind == 'for_other_node':
uri = msg['uri']
else:
return None
host, _, port = uri.rpartition(':')
host = normalizeuri(host or 'localhost', self.use_localhost)
msg['uri'] = f'{host}:{port}'
return msg
class UdpScan(Listener):
def start(self, log=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# send a general broadcast
try:
sock.sendto(json.dumps(dict(SECoP='discover')).encode('utf-8'),
('255.255.255.255', SECOP_UDP_PORT))
except OSError as e:
if log:
log.info('could not send the broadcast %r:', e)
self.socket = sock
self.deadline = time.time() + 30
def poll(self, log=None):
if self.socket is None:
return None
if time.time() > self.deadline:
try:
self.socket.close()
except Exception:
pass
self.socket = None
return super().poll(log)
class UdpListener(Listener):
def start(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
if os.name == 'nt':
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(('0.0.0.0', SECOP_UDP_PORT))
self.socket = sock
def send_other_udp(uri, instrument, device=None):
"""inform the feeder about the start of a frappy server"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
msg = {
'SECoP': 'for_other_node',
'uri': uri,
'instrument': instrument,
}
if device:
msg['device'] = device
msg = json.dumps(msg, ensure_ascii=False, separators=(',', ':')).encode('utf-8')
sock.sendto(msg, ('255.255.255.255', SECOP_UDP_PORT))