Files
frappy_sinq/new.py
T
2026-03-12 17:28:56 +01:00

148 lines
6.2 KiB
Python

# *****************************************************************************
# NICOS, the Networked Instrument Control System of the MLZ
# Copyright (c) 2009-2018 by the NICOS contributors (see AUTHORS)
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""managing SECoP server and connections
SEC Node with added functionality for starting and stopping frappy servers
connected to a SEC node
"""
import sys
import os
instrument = os.uname().nodename.split('.')[0]
sys.path.insert(0, '/sq_sw/linse/frappy')
sys.path.insert(0, '/sq_sw/linse/frappy_{instrument}')
import threading
import socket
import json
from nicos import config, session
from nicos.core import Override, Param, Moveable, status, POLLER, SIMULATION, DeviceAlias, \
Device, Readable, anytype, listof, MASTER, Attach
from nicos_sinq.frappy_sinq.secop.devices import SecNodeDevice, SecopDevice, DefunctDevice, SecopWritable
from nicos.core.utils import createThread
from nicos.utils.comparestrings import compare
from nicos.devices.secop.devices import get_attaching_devices
from nicos.commands.basic import AddSetup, CreateAllDevices, CreateDevice
from nicos.utils import loggers
def cleanup_defunct():
for devname, setupname in list(session.dynamic_devices.items()):
dev = session.devices.get(devname)
if dev and dev._defunct:
devnames = [d.name for d, _ in get_attaching_devices(dev)]
if devnames:
session.log.warning('can not remove device %r due to dependencies on %s'
% (devname, ', '.join(devnames)))
else:
session.destroyDevice(devname)
session.dynamic_devices.pop(devname, None)
class FrappyMarcheNode(SecNodeDevice):
"""SEC node device, works together with superfrappy"""
parameters = {
'param_category': Param("category of parameters\n\n"
"set to 'general' if all parameters should appear in the datafile header",
type=str, default='', settable=True),
'quiet_init': Param('flag to set loglevel to error while initializing',
type=bool, default=False, settable=True),
# duplicate from SecNodeDevice. needed for the case where the code for
# the SecNodeDevcice is not up to date, but the setup is already new
# does not yet have the i
'general_stop_whitelist': Param('module names to accept general stop',
type=listof(str), prefercache=False,
default=[], userparam=False),
}
_lastcfg = None
# _marche = None
def doInit(self, mode):
# self._marche = MarcheControl()
if mode != SIMULATION and session.sessiontype != POLLER:
pass
# TODO:
# host_port = self.uri.rsplit('://')[-1]
# status = self._marche.status(config.instrument)
# running = self._attached_superfrappy.check_running(host_port)
# if self.frappycfg and running != self.frappycfg:
# self.superfrappy.add_server(host_port)
super().doInit(mode)
def createDevices(self):
super().createDevices()
if self.param_category:
for devname, (_, devcfg) in self.setup_info.items():
params_cfg = devcfg['params_cfg']
dev = session.devices[devname]
for pname, pargs in params_cfg.items():
pinfo = dev.parameters[pname]
if not pinfo.category:
pinfo.category = self.param_category
def makeDynamicDevices(self, setup_info):
patched_loggers = {}
if self.quiet_init:
for devname, (_, devcfg) in setup_info.items():
log = session.getLogger(devname)
if log not in patched_loggers:
result = [loggers.INFO] # default level
patched_loggers[log] = result
log.setLevel(loggers.ERROR)
# avoid level change when the loglevel parameter is treated
# store level instead in result
log.__dict__['setLevel'] = result.append
try:
super().makeDynamicDevices(setup_info)
finally:
for log, result in patched_loggers.items():
log.__dict__.pop('setLevel', None) # re-enable setLevel
log.setLevel(result[-1]) # set to stored or default value
def disable(self):
seaconn = session.devices.get('sea_%s' % self.service)
if seaconn and seaconn._attached_secnode:
seaconn.communicate('frappy_remove %s' % self.service)
self._set_status(*self._status)
def _set_status(self, code, text):
if self.uri == '':
code, text = status.DISABLED, 'disabled'
SecNodeDevice._set_status(self, code, text)
# def restart(self):
# """restart frappy server"""
# host_port = self.uri.rsplit('://')[-1]
# self._marche.restart(config.instrument, self.frappycfg)
def get_info(self):
result = self.doRead() or ''
code, text = self.status()
if not result:
return '<disconnected>'
if code == status.OK or result == '':
return result
if (code, text) == (status.ERROR, 'reconnecting'):
return '%s (frappy not running)' % result
return '%s (%s)' % (result, text)