diff --git a/labcommands.py b/labcommands.py new file mode 100644 index 0000000..796ced6 --- /dev/null +++ b/labcommands.py @@ -0,0 +1,494 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""collection of commands and devices for lab experiments + +the commands are probably better not used, as the standard scan and sweep +commands are overwritten - this might be confusing for users working also +in neutron instruments +""" + +import os +from os.path import join +import inspect +import shutil +import re +from glob import glob +import time +import math + +from nicos import session +from nicos.core import Override, Param, Measurable, Moveable, Device, \ + status, Readable +from nicos.core.errors import NicosError +from nicos.commands import helparglist, usercommand, parallel_safe +from nicos.commands.device import maw +import nicos.devices.generic.paramdev as paramdev +import nicos.commands.scan +from nicos_sinq.frappy_sinq.timestamp import Timestamp, Det + + +class FunDevice(Moveable): + """wrap a device in a function + + see usercommand fun + """ + temporary = True + _dev = None + _value = None + + def __init__(self, func, dev_or_name=None, unit=None, devname=None, **kwargs): + self._func = func + self._kwargs = kwargs + if isinstance(dev_or_name, Device): + self._dev = dev_or_name + self._devname = dev_or_name.name + if devname: + # or should we raise an error? + name = devname + else: + name = self._devname + else: + name = dev_or_name or devname + self._devname = devname + self._dev = session.devices.get(devname) + if self._dev and unit is None: + unit = self._dev.unit + Moveable.__init__(self, name, unit=unit or '') + + def doRead(self, maxage=0): + if self._devname: + if not self._dev: + try: + self._dev = session.devices[self._devname] + if not self.unit: + self.unit = self._dev.unit + except KeyError: + raise NicosError('device %r does not exist' % self._devname) + if self._value is None: + self._value = self._dev.read(maxage) + return self._value + + def doStart(self, value): + if self._devname: + oldvalue = self.doRead() + else: + oldvalue = self._value + # append args depending in function signature + argnames, varargs, varkw = inspect.getfullargspec(self._func)[0:3] + args = self._dev, oldvalue + kwds = dict(self._kwargs) + if 'oldvalue' in argnames: + kwds['oldvalue'] = oldvalue + args = args[:1] + if 'dev' in argnames: + kwds['dev'] = self._dev + args = () + if not varkw: + for k in kwds: + if k not in argnames: + kwds.pop(k) + result = self._func(value, *args[:len(argnames) - 1], **kwds) + self._value = value if result is None else result + + def doIsCompleted(self): + return True + + def doStop(self): + if self._dev: + self._dev.stop() + + +@usercommand +@helparglist('dev_or_name[, unit[, devname=..., **kwargs]]') +def fun(dev_or_name=None, unit=None, devname=None, **kwargs): + """decorator for turning a function into a device for scans + + Usage: + + A function wrapped around a device (the argument might be a device name or a device) + + @fun(dev="mydevice"): + def func(value, dev): + do_something_before_move() + maw(dev, value) + do_something_after_move() + + scan(func, ...) + + func will inherit the name and unit from the device. + func may have 1-3 positional arguments: + - value: the value to move to + - dev: the device given (in above example mydevice) + - oldvalue: the value before the move + + A function turned into a device: + + @fun("myname", "K") + def func(value): + result = do_some_thing(value) + return result + + if no name is given, the function name is used. + in this case the final result may be returned, if different from value + """ + def decorator(func, name=dev_or_name): + if name is None: + name = func.__name__ + return FunDevice(func, name, unit, devname, **kwargs) + + return decorator + + +class Range(tuple): + def __new__(cls, astext, *args): + return tuple.__new__(cls, args) + + def __init__(self, astext, *args): + super().__init__() + self.astext = astext + + def __repr__(self): + return self.astext + + def __add__(self, other): + astext = '%s+%s' % (repr(self), repr(other)) + try: + if self[-1] == other[0]: + other = other[1:] + except IndexError: + pass + return Range(astext, *self, *other) + + +@usercommand +@helparglist('start, step, end') +def lnr(start, step_or_end=None, end=None, n=None): + """linear range + + to be used as argument for the scan command + + Alternative form: + + >>> lnr(start, end, n=n) # n+1 points + + ranges might be added, adjacent equal points are omitted automatically + + >>> list(lnr(0,1,4) + lnr(4,2,10) + [15]) + > [0,1,2,3,4,6,8,10,15] + """ + if end is None: + astext = 'lnr(%s,%s,n=%s)' % (start, step_or_end, n) + if step_or_end is None: + # single value + return Range('lnr(%s)' % start, start) + end = step_or_end + if (n or 1) == 1: + # two values + return Range('lnr(%s,%s)' % (start, end), start, end) + n = abs(n) + step = (end - start) / n + else: + step = step_or_end + if n is not None or step == 0: + raise ValueError('illegal arguments') + astext = 'lnr(%s,%s,%s)' % (start, step_or_end, end) + n = int(round(abs((end - start) / step))) + if (start < end) == (step < 0): + step = -step + return Range(astext, start, *tuple(start + i * step for i in range(1,n)), end) + + +@usercommand +@helparglist('start, factor, end') +def lgr(start, factor_or_end, end=None, n=0): + """logarithmic range + + factor is the factor between two points (0.5 and 2 are equivalent) + + Alternative form: + + >>> lgr(start, end, n=n) # n+1 points + + ranges might be added, adjacent equal points are omitted automatically + + list(lnr(0,1,4) + lgr(4,2,15) + [30]) == [0,1,2,3,4,8,15,30] + """ + if end is None: + end = factor_or_end + if start <= 0 or end <= 0 or n <= 0: + raise ValueError('illegal arguments') + astext = 'lgr(%s,%s,n=%s)' % (start, end, n) + factor = (end / start) ** (1 / n) + else: + factor = factor_or_end + if start <= 0 or end <= 0 or factor <= 0: + raise ValueError('illegal arguments') + if (start > end) == (factor > 1): + factor = 1 / factor + astext = 'lgr(%s,%.18g,%s)' % (start, factor, end) + n = int(round(abs(math.log2(end / start) / math.log2(factor)))) + return Range(astext, start, + *(start * factor ** i for i in range(1, n)), + end) + + +class WithTimeoutDev(Moveable): + """Wrapper for better timeout/settling mechanism""" + + parameters = { + 'dev': Param('dev', type=lambda x=None: x, settable=True, mandatory=True), + 'tolerance': Param('tolerance', type=float, + settable=True, default=1, mandatory=False), + 'settle': Param('time to settle within tolerance', type=float, + settable=True, default=1, mandatory=False), + 'timeout': Param('timeout for less progress than tolerance (defaults to settle)', type=float, + settable=True, default=-1, mandatory=False), + } + + temporary = True + _start_time = None + + def _getCache(self): + return None + + def doStart(self, value): + self._start_time = time.time() + self._settle_start = None + self._settle_done = 0 + self._mindif = abs(value - self.dev()) + self.dev.doStart(value) + + def doStop(self): + self._start_time = None + self.dev.doStop() + + def doRead(self, maxage=0): + return self.dev.doRead(maxage) + + def doStatus(self, maxage=0): + if not self._start_time: + return status.OK, '' + now = time.time() + dif = abs(self.doRead(maxage) - self.target) + if dif < self.tolerance: + if not self._settle_start: + self.log.info('settling %s' % self.dev) + self._settle_start = now - self._settle_done + if now > self._settle_start + self.settle: + self._start_time = None + return status.OK, '' + else: + if now > self._start_time + (self.timeout or self.settle): + self._start_time = None + self.log.info('timeout waiting for %s' % self.dev) + return status.OK, '' + if self._settle_start: + self._settle_done = now - self._settle_start + self._settle_start = None + if dif < self._mindif: + self._mindif = dif - self.tolerance + self._start_time = now + return status.BUSY, '' + + +@usercommand +@helparglist('dev, tolerance [,settle [,timeout]]') +@parallel_safe +def WithTimeout(dev, tolerance, settle=300, timeout=0): + if isinstance(dev, WithTimeoutDev): + dev.tolerance = tolerance + dev.settle = settle + dev.timeout = timeout + return dev + return WithTimeoutDev(dev.name, dev=dev, tolerance=tolerance, settle=settle, timeout=timeout, + unit=dev.unit, fmtstr=dev.fmtstr) + + +@usercommand +@helparglist('dev, target[, tolerance[,settle[, timeout]]]') +def maw_tmo(dev, target, tolerance, settle=300, timeout=0): + """wait for target reached or no progress within timeout + + wait for either target within tolerance for a total of seconds + or no progress more than tolerance for seconds + """ + maw(WithTimeout(dev, tolerance, settle, timeout), target) + + +class ReadonlyParamDevice(paramdev.ReadonlyParamDevice): + """turn a parameter into a temporary Readable""" + temporary = True + + def __init__(self, devname_param): + devname, pname = devname_param.split('.') + paramdev.ReadonlyParamDevice.__init__(self, devname_param, device=devname, parameter=pname) + + def _getCache(self): + return None + + +class ParamDevice(paramdev.ParamDevice): + """turn a parameter into a temporary Moveable""" + temporary = True + + def __init__(self, devname_param): + devname, pname = devname_param.split('.') + paramdev.ParamDevice.__init__(self, devname_param, device=devname, parameter=pname) + + def _getCache(self): + return None + + +@usercommand +@helparglist('"dev.param, dev.param, ..."') +@parallel_safe +def out(*args): + """may be used as an argument in a scan for producing output columns for parameters + + can not be used in SetEnvironment + + For a single parameter: + scan(dev, ...., out('tt.raw')) + + For multiple parameters: + scan(dev, ...., *out('tt, tt.raw, mf.ramp')) + """ + result = [] + for arg in args: + for devpar in arg.split(','): + devpar = devpar.strip() + if '.' in devpar: + result.append(ReadonlyParamDevice(devpar)) + else: + result.append(session.devices[devpar]) + if len(result) == 1: + return result[0] + return result + + +@usercommand +@helparglist('"dev.param"') +@parallel_safe +def param(devpar): + """turning a parameter into a moveable device + + Usage: + + scan(param(dev), ....) + """ + return ParamDevice(devpar) + + +def copy_all(srcdir, dstdir): + """copy all files from srcdir to dstdir""" + files = glob(join(srcdir, '*')) + for src in files: + shutil.copy2(src, dstdir) + return files + + +@usercommand +@helparglist('script path | instrument, proposal') +@parallel_safe +def copy_scripts(ins_or_dir, proposal=None): + """copy scripts from another proposal or any directory""" + if os.path.isdir(ins_or_dir): + dirname = ins_or_dir + else: + data = join(os.environ['NICOS_DATA'], ins_or_dir) + if isinstance(proposal, int): + pat = re.compile('.*/(.*\D|)0*%d$' % proposal) + else: + pat = re.compile('.*/%s$' % re.escape(proposal)) + for dirname in reversed(sorted(glob(join(data, '20*', '*')))): + if pat.match(dirname): + break + else: + raise FileNotFoundError('directory for %s/%s not found' % (ins_or_dir, proposal)) + copy_all(join(dirname, 'scripts'), join(session.devices['Exp'].scriptpath)) + + +def handle_args(args): + detectors = [] + put_timestamp = False + for det in session.experiment.detectors: + if isinstance(det, Timestamp) and det.show: + put_timestamp = True + ret = [] + moveable_seen = False + for arg in args: + if moveable_seen and isinstance(arg, Readable): + if put_timestamp: + ret.append(TimestampReadable()) + put_timestamp = False + #ret.append(arg) + ret.append(Det(arg)) + elif isinstance(arg, str) and '.' in arg: + ret.append(param(arg)) + moveable_seen = True + elif isinstance(arg, Measurable): + if arg not in session.experiment.detectors: + ret.append(arg) + else: + if isinstance(arg, Moveable): + moveable_seen = True + ret.append(arg) + return ret + + +@usercommand +def scan(*args, **kwargs): + nicos.commands.scan.scan(*handle_args(args), **kwargs) + + +scan.help_arglist = nicos.commands.scan.scan.help_arglist +scan.__doc__ = nicos.commands.scan.scan.__doc__ + + +@usercommand +def sweep(dev, start, end, *args, **kwargs): + nicos.commands.scan.sweep(dev, start, end, *handle_args(args), **kwargs) + + +sweep.help_arglist = nicos.commands.scan.sweep.help_arglist +sweep.__doc__ = nicos.commands.scan.sweep.__doc__ + + +class TimestampReadable(Readable): + """timestamp as Readable""" + temporary = True + + def __init__(self): + Readable.__init__(self, 'timestamp', unit='s') + + def _getCache(self): + """no cache needed""" + self._cache = None + + def doRead(self, maxage=0): + return time.time() + + def doStatus(self, maxage=0): + return 100, '' + + def valueInfo(self): + return Readable.valueInfo(self) + diff --git a/timestamp.py b/timestamp.py new file mode 100644 index 0000000..5893166 --- /dev/null +++ b/timestamp.py @@ -0,0 +1,124 @@ +# ***************************************************************************** +# +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Markus Zolliker +# +# ***************************************************************************** +"""helper devices for lab experiments""" + +import time + +from nicos.core import Override, Param, Measurable +from nicos.core.params import Value + + +class Timestamp(Measurable): + """to be used as a 'detector' in lab experiments + + just waiting the preset time + """ + parameter_overrides = { + 'unit': Override(default='sec', mandatory=False), + } + parameters = { + 'show': Param('show timestamp in data file', type=bool, + settable=True, default=True, mandatory=False), + } + + _preset = 0 + _time_used = 0 + _start = 0 + + def _getCache(self): + """no cache needed""" + self._cache = None + + def doRead(self, maxage=0): + return time.time() + + def doStatus(self, maxage=0): + return 100, '' + + def doSetPreset(self, t=0, **preset): + self._start = 0 + self._preset = t + self._value = 0 + + def doStart(self): + self._start = time.time() + + def doPause(self): + self._time_used = time.time() - self._start + return True + + def doResume(self): + self._start = time.time() - self._time_used + + def doFinish(self): + pass + + def doStop(self): + pass + + def doIsCompleted(self): + return time.time() > self._start + self._preset + + def valueInfo(self): + if self.show: + return Measurable.valueInfo(self) + return () + + +class Det(Measurable): + """wrap a Readable into a Detector + + just for placing the result in a scan file in the detector part + """ + temporary = True + + def __init__(self, dev): + self._dev = dev + Measurable.__init__(self, str(dev), unit=dev.unit, fmtstr=dev.fmtstr) + + def _getCache(self): + """no cache needed""" + self._cache = None + + def doRead(self, maxage=0): + return self._dev.doRead(maxage) + + def doStatus(self, maxage=0): + return self._dev.doStatus(maxage) + + def doSetPreset(self, t=5, **preset): + self._start = 0 + self._preset = t + + def doStart(self): + self._start = time.time() + + def doFinish(self): + pass + + def doStop(self): + self._start = 0 + + def doIsCompleted(self): + return time.time() > self._start + self._preset + + def valueInfo(self): + return Value(self.name, unit=self.unit, fmtstr=self.fmtstr),