From 0faba6f3811157d10dd3bc1950abffdbcb080361 Mon Sep 17 00:00:00 2001 From: Markus Zolliker Date: Wed, 19 Nov 2025 13:31:06 +0100 Subject: [PATCH] remove ppms and timestamp - this is moved to nicos_sinq/linse_nicos, which is a new gitea repo --- labcommands.py | 494 -------------------------------------------- setups/ppms.py | 26 --- setups/timestamp.py | 9 - timestamp.py | 124 ----------- 4 files changed, 653 deletions(-) delete mode 100644 labcommands.py delete mode 100644 setups/ppms.py delete mode 100644 setups/timestamp.py delete mode 100644 timestamp.py diff --git a/labcommands.py b/labcommands.py deleted file mode 100644 index 796ced6..0000000 --- a/labcommands.py +++ /dev/null @@ -1,494 +0,0 @@ -# ***************************************************************************** -# -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# -# ***************************************************************************** -"""collection of commands and devices for lab experiments - -the commands are probably better not used, as the standard scan and sweep -commands are overwritten - this might be confusing for users working also -in neutron instruments -""" - -import os -from os.path import join -import inspect -import shutil -import re -from glob import glob -import time -import math - -from nicos import session -from nicos.core import Override, Param, Measurable, Moveable, Device, \ - status, Readable -from nicos.core.errors import NicosError -from nicos.commands import helparglist, usercommand, parallel_safe -from nicos.commands.device import maw -import nicos.devices.generic.paramdev as paramdev -import nicos.commands.scan -from nicos_sinq.frappy_sinq.timestamp import Timestamp, Det - - -class FunDevice(Moveable): - """wrap a device in a function - - see usercommand fun - """ - temporary = True - _dev = None - _value = None - - def __init__(self, func, dev_or_name=None, unit=None, devname=None, **kwargs): - self._func = func - self._kwargs = kwargs - if isinstance(dev_or_name, Device): - self._dev = dev_or_name - self._devname = dev_or_name.name - if devname: - # or should we raise an error? - name = devname - else: - name = self._devname - else: - name = dev_or_name or devname - self._devname = devname - self._dev = session.devices.get(devname) - if self._dev and unit is None: - unit = self._dev.unit - Moveable.__init__(self, name, unit=unit or '') - - def doRead(self, maxage=0): - if self._devname: - if not self._dev: - try: - self._dev = session.devices[self._devname] - if not self.unit: - self.unit = self._dev.unit - except KeyError: - raise NicosError('device %r does not exist' % self._devname) - if self._value is None: - self._value = self._dev.read(maxage) - return self._value - - def doStart(self, value): - if self._devname: - oldvalue = self.doRead() - else: - oldvalue = self._value - # append args depending in function signature - argnames, varargs, varkw = inspect.getfullargspec(self._func)[0:3] - args = self._dev, oldvalue - kwds = dict(self._kwargs) - if 'oldvalue' in argnames: - kwds['oldvalue'] = oldvalue - args = args[:1] - if 'dev' in argnames: - kwds['dev'] = self._dev - args = () - if not varkw: - for k in kwds: - if k not in argnames: - kwds.pop(k) - result = self._func(value, *args[:len(argnames) - 1], **kwds) - self._value = value if result is None else result - - def doIsCompleted(self): - return True - - def doStop(self): - if self._dev: - self._dev.stop() - - -@usercommand -@helparglist('dev_or_name[, unit[, devname=..., **kwargs]]') -def fun(dev_or_name=None, unit=None, devname=None, **kwargs): - """decorator for turning a function into a device for scans - - Usage: - - A function wrapped around a device (the argument might be a device name or a device) - - @fun(dev="mydevice"): - def func(value, dev): - do_something_before_move() - maw(dev, value) - do_something_after_move() - - scan(func, ...) - - func will inherit the name and unit from the device. - func may have 1-3 positional arguments: - - value: the value to move to - - dev: the device given (in above example mydevice) - - oldvalue: the value before the move - - A function turned into a device: - - @fun("myname", "K") - def func(value): - result = do_some_thing(value) - return result - - if no name is given, the function name is used. - in this case the final result may be returned, if different from value - """ - def decorator(func, name=dev_or_name): - if name is None: - name = func.__name__ - return FunDevice(func, name, unit, devname, **kwargs) - - return decorator - - -class Range(tuple): - def __new__(cls, astext, *args): - return tuple.__new__(cls, args) - - def __init__(self, astext, *args): - super().__init__() - self.astext = astext - - def __repr__(self): - return self.astext - - def __add__(self, other): - astext = '%s+%s' % (repr(self), repr(other)) - try: - if self[-1] == other[0]: - other = other[1:] - except IndexError: - pass - return Range(astext, *self, *other) - - -@usercommand -@helparglist('start, step, end') -def lnr(start, step_or_end=None, end=None, n=None): - """linear range - - to be used as argument for the scan command - - Alternative form: - - >>> lnr(start, end, n=n) # n+1 points - - ranges might be added, adjacent equal points are omitted automatically - - >>> list(lnr(0,1,4) + lnr(4,2,10) + [15]) - > [0,1,2,3,4,6,8,10,15] - """ - if end is None: - astext = 'lnr(%s,%s,n=%s)' % (start, step_or_end, n) - if step_or_end is None: - # single value - return Range('lnr(%s)' % start, start) - end = step_or_end - if (n or 1) == 1: - # two values - return Range('lnr(%s,%s)' % (start, end), start, end) - n = abs(n) - step = (end - start) / n - else: - step = step_or_end - if n is not None or step == 0: - raise ValueError('illegal arguments') - astext = 'lnr(%s,%s,%s)' % (start, step_or_end, end) - n = int(round(abs((end - start) / step))) - if (start < end) == (step < 0): - step = -step - return Range(astext, start, *tuple(start + i * step for i in range(1,n)), end) - - -@usercommand -@helparglist('start, factor, end') -def lgr(start, factor_or_end, end=None, n=0): - """logarithmic range - - factor is the factor between two points (0.5 and 2 are equivalent) - - Alternative form: - - >>> lgr(start, end, n=n) # n+1 points - - ranges might be added, adjacent equal points are omitted automatically - - list(lnr(0,1,4) + lgr(4,2,15) + [30]) == [0,1,2,3,4,8,15,30] - """ - if end is None: - end = factor_or_end - if start <= 0 or end <= 0 or n <= 0: - raise ValueError('illegal arguments') - astext = 'lgr(%s,%s,n=%s)' % (start, end, n) - factor = (end / start) ** (1 / n) - else: - factor = factor_or_end - if start <= 0 or end <= 0 or factor <= 0: - raise ValueError('illegal arguments') - if (start > end) == (factor > 1): - factor = 1 / factor - astext = 'lgr(%s,%.18g,%s)' % (start, factor, end) - n = int(round(abs(math.log2(end / start) / math.log2(factor)))) - return Range(astext, start, - *(start * factor ** i for i in range(1, n)), - end) - - -class WithTimeoutDev(Moveable): - """Wrapper for better timeout/settling mechanism""" - - parameters = { - 'dev': Param('dev', type=lambda x=None: x, settable=True, mandatory=True), - 'tolerance': Param('tolerance', type=float, - settable=True, default=1, mandatory=False), - 'settle': Param('time to settle within tolerance', type=float, - settable=True, default=1, mandatory=False), - 'timeout': Param('timeout for less progress than tolerance (defaults to settle)', type=float, - settable=True, default=-1, mandatory=False), - } - - temporary = True - _start_time = None - - def _getCache(self): - return None - - def doStart(self, value): - self._start_time = time.time() - self._settle_start = None - self._settle_done = 0 - self._mindif = abs(value - self.dev()) - self.dev.doStart(value) - - def doStop(self): - self._start_time = None - self.dev.doStop() - - def doRead(self, maxage=0): - return self.dev.doRead(maxage) - - def doStatus(self, maxage=0): - if not self._start_time: - return status.OK, '' - now = time.time() - dif = abs(self.doRead(maxage) - self.target) - if dif < self.tolerance: - if not self._settle_start: - self.log.info('settling %s' % self.dev) - self._settle_start = now - self._settle_done - if now > self._settle_start + self.settle: - self._start_time = None - return status.OK, '' - else: - if now > self._start_time + (self.timeout or self.settle): - self._start_time = None - self.log.info('timeout waiting for %s' % self.dev) - return status.OK, '' - if self._settle_start: - self._settle_done = now - self._settle_start - self._settle_start = None - if dif < self._mindif: - self._mindif = dif - self.tolerance - self._start_time = now - return status.BUSY, '' - - -@usercommand -@helparglist('dev, tolerance [,settle [,timeout]]') -@parallel_safe -def WithTimeout(dev, tolerance, settle=300, timeout=0): - if isinstance(dev, WithTimeoutDev): - dev.tolerance = tolerance - dev.settle = settle - dev.timeout = timeout - return dev - return WithTimeoutDev(dev.name, dev=dev, tolerance=tolerance, settle=settle, timeout=timeout, - unit=dev.unit, fmtstr=dev.fmtstr) - - -@usercommand -@helparglist('dev, target[, tolerance[,settle[, timeout]]]') -def maw_tmo(dev, target, tolerance, settle=300, timeout=0): - """wait for target reached or no progress within timeout - - wait for either target within tolerance for a total of seconds - or no progress more than tolerance for seconds - """ - maw(WithTimeout(dev, tolerance, settle, timeout), target) - - -class ReadonlyParamDevice(paramdev.ReadonlyParamDevice): - """turn a parameter into a temporary Readable""" - temporary = True - - def __init__(self, devname_param): - devname, pname = devname_param.split('.') - paramdev.ReadonlyParamDevice.__init__(self, devname_param, device=devname, parameter=pname) - - def _getCache(self): - return None - - -class ParamDevice(paramdev.ParamDevice): - """turn a parameter into a temporary Moveable""" - temporary = True - - def __init__(self, devname_param): - devname, pname = devname_param.split('.') - paramdev.ParamDevice.__init__(self, devname_param, device=devname, parameter=pname) - - def _getCache(self): - return None - - -@usercommand -@helparglist('"dev.param, dev.param, ..."') -@parallel_safe -def out(*args): - """may be used as an argument in a scan for producing output columns for parameters - - can not be used in SetEnvironment - - For a single parameter: - scan(dev, ...., out('tt.raw')) - - For multiple parameters: - scan(dev, ...., *out('tt, tt.raw, mf.ramp')) - """ - result = [] - for arg in args: - for devpar in arg.split(','): - devpar = devpar.strip() - if '.' in devpar: - result.append(ReadonlyParamDevice(devpar)) - else: - result.append(session.devices[devpar]) - if len(result) == 1: - return result[0] - return result - - -@usercommand -@helparglist('"dev.param"') -@parallel_safe -def param(devpar): - """turning a parameter into a moveable device - - Usage: - - scan(param(dev), ....) - """ - return ParamDevice(devpar) - - -def copy_all(srcdir, dstdir): - """copy all files from srcdir to dstdir""" - files = glob(join(srcdir, '*')) - for src in files: - shutil.copy2(src, dstdir) - return files - - -@usercommand -@helparglist('script path | instrument, proposal') -@parallel_safe -def copy_scripts(ins_or_dir, proposal=None): - """copy scripts from another proposal or any directory""" - if os.path.isdir(ins_or_dir): - dirname = ins_or_dir - else: - data = join(os.environ['NICOS_DATA'], ins_or_dir) - if isinstance(proposal, int): - pat = re.compile('.*/(.*\D|)0*%d$' % proposal) - else: - pat = re.compile('.*/%s$' % re.escape(proposal)) - for dirname in reversed(sorted(glob(join(data, '20*', '*')))): - if pat.match(dirname): - break - else: - raise FileNotFoundError('directory for %s/%s not found' % (ins_or_dir, proposal)) - copy_all(join(dirname, 'scripts'), join(session.devices['Exp'].scriptpath)) - - -def handle_args(args): - detectors = [] - put_timestamp = False - for det in session.experiment.detectors: - if isinstance(det, Timestamp) and det.show: - put_timestamp = True - ret = [] - moveable_seen = False - for arg in args: - if moveable_seen and isinstance(arg, Readable): - if put_timestamp: - ret.append(TimestampReadable()) - put_timestamp = False - #ret.append(arg) - ret.append(Det(arg)) - elif isinstance(arg, str) and '.' in arg: - ret.append(param(arg)) - moveable_seen = True - elif isinstance(arg, Measurable): - if arg not in session.experiment.detectors: - ret.append(arg) - else: - if isinstance(arg, Moveable): - moveable_seen = True - ret.append(arg) - return ret - - -@usercommand -def scan(*args, **kwargs): - nicos.commands.scan.scan(*handle_args(args), **kwargs) - - -scan.help_arglist = nicos.commands.scan.scan.help_arglist -scan.__doc__ = nicos.commands.scan.scan.__doc__ - - -@usercommand -def sweep(dev, start, end, *args, **kwargs): - nicos.commands.scan.sweep(dev, start, end, *handle_args(args), **kwargs) - - -sweep.help_arglist = nicos.commands.scan.sweep.help_arglist -sweep.__doc__ = nicos.commands.scan.sweep.__doc__ - - -class TimestampReadable(Readable): - """timestamp as Readable""" - temporary = True - - def __init__(self): - Readable.__init__(self, 'timestamp', unit='s') - - def _getCache(self): - """no cache needed""" - self._cache = None - - def doRead(self, maxage=0): - return time.time() - - def doStatus(self, maxage=0): - return 100, '' - - def valueInfo(self): - return Readable.valueInfo(self) - diff --git a/setups/ppms.py b/setups/ppms.py deleted file mode 100644 index 5f1cbfa..0000000 --- a/setups/ppms.py +++ /dev/null @@ -1,26 +0,0 @@ -from os import environ -description = 'frappy main setup' -group = 'optional' - -devices = { - 'se_main': - device('nicos_sinq.frappy_sinq.devices.FrappyNode', - uri='pc12694:5000', - description='main SEC node', unit='', - prefix=environ.get('SE_PREFIX', ''), auto_create=True, service='main', - ), - 'timestamp': device('nicos_sinq.frappy_sinq.lab.Timestamp', description='time, a dummy detector'), -} - -startupcode = ''' -printinfo("=======================================================================================") -printinfo("Welcome to the NICOS frappy secnode setup for PPMS!") -printinfo(" ") -printinfo("Usage:") -printinfo(" frappy(stick='') # change sample-stick configuration") -printinfo(" frappy(addons=', ...') # change SE addons") -printinfo(" frappy(stick=None) # remove stick") -printinfo(" frappy(addons=None) # remove addons") -printinfo("=======================================================================================") -SetDetectors(timestamp) -''' diff --git a/setups/timestamp.py b/setups/timestamp.py deleted file mode 100644 index 8a6ddd8..0000000 --- a/setups/timestamp.py +++ /dev/null @@ -1,9 +0,0 @@ -description = 'timestamp dummy detector for offline measurements' -group = 'optional' - -devices = { - 'timestamp': device('nicos_sinq.frappy_sinq.lab.Timestamp', description='time, a dummy detector'), -} - -startupcode = 'SetDetectors(timestamp)' - diff --git a/timestamp.py b/timestamp.py deleted file mode 100644 index 5893166..0000000 --- a/timestamp.py +++ /dev/null @@ -1,124 +0,0 @@ -# ***************************************************************************** -# -# This program is free software; you can redistribute it and/or modify it under -# the terms of the GNU General Public License as published by the Free Software -# Foundation; either version 2 of the License, or (at your option) any later -# version. -# -# This program is distributed in the hope that it will be useful, but WITHOUT -# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS -# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more -# details. -# -# You should have received a copy of the GNU General Public License along with -# this program; if not, write to the Free Software Foundation, Inc., -# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# -# Module authors: -# Markus Zolliker -# -# ***************************************************************************** -"""helper devices for lab experiments""" - -import time - -from nicos.core import Override, Param, Measurable -from nicos.core.params import Value - - -class Timestamp(Measurable): - """to be used as a 'detector' in lab experiments - - just waiting the preset time - """ - parameter_overrides = { - 'unit': Override(default='sec', mandatory=False), - } - parameters = { - 'show': Param('show timestamp in data file', type=bool, - settable=True, default=True, mandatory=False), - } - - _preset = 0 - _time_used = 0 - _start = 0 - - def _getCache(self): - """no cache needed""" - self._cache = None - - def doRead(self, maxage=0): - return time.time() - - def doStatus(self, maxage=0): - return 100, '' - - def doSetPreset(self, t=0, **preset): - self._start = 0 - self._preset = t - self._value = 0 - - def doStart(self): - self._start = time.time() - - def doPause(self): - self._time_used = time.time() - self._start - return True - - def doResume(self): - self._start = time.time() - self._time_used - - def doFinish(self): - pass - - def doStop(self): - pass - - def doIsCompleted(self): - return time.time() > self._start + self._preset - - def valueInfo(self): - if self.show: - return Measurable.valueInfo(self) - return () - - -class Det(Measurable): - """wrap a Readable into a Detector - - just for placing the result in a scan file in the detector part - """ - temporary = True - - def __init__(self, dev): - self._dev = dev - Measurable.__init__(self, str(dev), unit=dev.unit, fmtstr=dev.fmtstr) - - def _getCache(self): - """no cache needed""" - self._cache = None - - def doRead(self, maxage=0): - return self._dev.doRead(maxage) - - def doStatus(self, maxage=0): - return self._dev.doStatus(maxage) - - def doSetPreset(self, t=5, **preset): - self._start = 0 - self._preset = t - - def doStart(self): - self._start = time.time() - - def doFinish(self): - pass - - def doStop(self): - self._start = 0 - - def doIsCompleted(self): - return time.time() > self._start + self._preset - - def valueInfo(self): - return Value(self.name, unit=self.unit, fmtstr=self.fmtstr),