# ***************************************************************************** # # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # # ***************************************************************************** """collection of commands and devices for lab experiments the commands are probably better not used, as the standard scan and sweep commands are overwritten - this might be confusing for users working also in neutron instruments """ import os from os.path import join import inspect import shutil import re from glob import glob import time import math from nicos import session from nicos.core import Override, Param, Measurable, Moveable, Device, \ status, Readable from nicos.core.errors import NicosError from nicos.commands import helparglist, usercommand, parallel_safe from nicos.commands.device import maw import nicos.devices.generic.paramdev as paramdev import nicos.commands.scan from nicos_sinq.frappy_sinq.timestamp import Timestamp, Det class FunDevice(Moveable): """wrap a device in a function see usercommand fun """ temporary = True _dev = None _value = None def __init__(self, func, dev_or_name=None, unit=None, devname=None, **kwargs): self._func = func self._kwargs = kwargs if isinstance(dev_or_name, Device): self._dev = dev_or_name self._devname = dev_or_name.name if devname: # or should we raise an error? name = devname else: name = self._devname else: name = dev_or_name or devname self._devname = devname self._dev = session.devices.get(devname) if self._dev and unit is None: unit = self._dev.unit Moveable.__init__(self, name, unit=unit or '') def doRead(self, maxage=0): if self._devname: if not self._dev: try: self._dev = session.devices[self._devname] if not self.unit: self.unit = self._dev.unit except KeyError: raise NicosError('device %r does not exist' % self._devname) if self._value is None: self._value = self._dev.read(maxage) return self._value def doStart(self, value): if self._devname: oldvalue = self.doRead() else: oldvalue = self._value # append args depending in function signature argnames, varargs, varkw = inspect.getfullargspec(self._func)[0:3] args = self._dev, oldvalue kwds = dict(self._kwargs) if 'oldvalue' in argnames: kwds['oldvalue'] = oldvalue args = args[:1] if 'dev' in argnames: kwds['dev'] = self._dev args = () if not varkw: for k in kwds: if k not in argnames: kwds.pop(k) result = self._func(value, *args[:len(argnames) - 1], **kwds) self._value = value if result is None else result def doIsCompleted(self): return True def doStop(self): if self._dev: self._dev.stop() @usercommand @helparglist('dev_or_name[, unit[, devname=..., **kwargs]]') def fun(dev_or_name=None, unit=None, devname=None, **kwargs): """decorator for turning a function into a device for scans Usage: A function wrapped around a device (the argument might be a device name or a device) @fun(dev="mydevice"): def func(value, dev): do_something_before_move() maw(dev, value) do_something_after_move() scan(func, ...) func will inherit the name and unit from the device. func may have 1-3 positional arguments: - value: the value to move to - dev: the device given (in above example mydevice) - oldvalue: the value before the move A function turned into a device: @fun("myname", "K") def func(value): result = do_some_thing(value) return result if no name is given, the function name is used. in this case the final result may be returned, if different from value """ def decorator(func, name=dev_or_name): if name is None: name = func.__name__ return FunDevice(func, name, unit, devname, **kwargs) return decorator class Range(tuple): def __new__(cls, astext, *args): return tuple.__new__(cls, args) def __init__(self, astext, *args): super().__init__() self.astext = astext def __repr__(self): return self.astext def __add__(self, other): astext = '%s+%s' % (repr(self), repr(other)) try: if self[-1] == other[0]: other = other[1:] except IndexError: pass return Range(astext, *self, *other) @usercommand @helparglist('start, step, end') def lnr(start, step_or_end=None, end=None, n=None): """linear range to be used as argument for the scan command Alternative form: >>> lnr(start, end, n=n) # n+1 points ranges might be added, adjacent equal points are omitted automatically >>> list(lnr(0,1,4) + lnr(4,2,10) + [15]) > [0,1,2,3,4,6,8,10,15] """ if end is None: astext = 'lnr(%s,%s,n=%s)' % (start, step_or_end, n) if step_or_end is None: # single value return Range('lnr(%s)' % start, start) end = step_or_end if (n or 1) == 1: # two values return Range('lnr(%s,%s)' % (start, end), start, end) n = abs(n) step = (end - start) / n else: step = step_or_end if n is not None or step == 0: raise ValueError('illegal arguments') astext = 'lnr(%s,%s,%s)' % (start, step_or_end, end) n = int(round(abs((end - start) / step))) if (start < end) == (step < 0): step = -step return Range(astext, start, *tuple(start + i * step for i in range(1,n)), end) @usercommand @helparglist('start, factor, end') def lgr(start, factor_or_end, end=None, n=0): """logarithmic range factor is the factor between two points (0.5 and 2 are equivalent) Alternative form: >>> lgr(start, end, n=n) # n+1 points ranges might be added, adjacent equal points are omitted automatically list(lnr(0,1,4) + lgr(4,2,15) + [30]) == [0,1,2,3,4,8,15,30] """ if end is None: end = factor_or_end if start <= 0 or end <= 0 or n <= 0: raise ValueError('illegal arguments') astext = 'lgr(%s,%s,n=%s)' % (start, end, n) factor = (end / start) ** (1 / n) else: factor = factor_or_end if start <= 0 or end <= 0 or factor <= 0: raise ValueError('illegal arguments') if (start > end) == (factor > 1): factor = 1 / factor astext = 'lgr(%s,%.18g,%s)' % (start, factor, end) n = int(round(abs(math.log2(end / start) / math.log2(factor)))) return Range(astext, start, *(start * factor ** i for i in range(1, n)), end) class WithTimeoutDev(Moveable): """Wrapper for better timeout/settling mechanism""" parameters = { 'dev': Param('dev', type=lambda x=None: x, settable=True, mandatory=True), 'tolerance': Param('tolerance', type=float, settable=True, default=1, mandatory=False), 'settle': Param('time to settle within tolerance', type=float, settable=True, default=1, mandatory=False), 'timeout': Param('timeout for less progress than tolerance (defaults to settle)', type=float, settable=True, default=-1, mandatory=False), } temporary = True _start_time = None def _getCache(self): return None def doStart(self, value): self._start_time = time.time() self._settle_start = None self._settle_done = 0 self._mindif = abs(value - self.dev()) self.dev.doStart(value) def doStop(self): self._start_time = None self.dev.doStop() def doRead(self, maxage=0): return self.dev.doRead(maxage) def doStatus(self, maxage=0): if not self._start_time: return status.OK, '' now = time.time() dif = abs(self.doRead(maxage) - self.target) if dif < self.tolerance: if not self._settle_start: self.log.info('settling %s' % self.dev) self._settle_start = now - self._settle_done if now > self._settle_start + self.settle: self._start_time = None return status.OK, '' else: if now > self._start_time + (self.timeout or self.settle): self._start_time = None self.log.info('timeout waiting for %s' % self.dev) return status.OK, '' if self._settle_start: self._settle_done = now - self._settle_start self._settle_start = None if dif < self._mindif: self._mindif = dif - self.tolerance self._start_time = now return status.BUSY, '' @usercommand @helparglist('dev, tolerance [,settle [,timeout]]') @parallel_safe def WithTimeout(dev, tolerance, settle=300, timeout=0): if isinstance(dev, WithTimeoutDev): dev.tolerance = tolerance dev.settle = settle dev.timeout = timeout return dev return WithTimeoutDev(dev.name, dev=dev, tolerance=tolerance, settle=settle, timeout=timeout, unit=dev.unit, fmtstr=dev.fmtstr) @usercommand @helparglist('dev, target[, tolerance[,settle[, timeout]]]') def maw_tmo(dev, target, tolerance, settle=300, timeout=0): """wait for target reached or no progress within timeout wait for either target within tolerance for a total of seconds or no progress more than tolerance for seconds """ maw(WithTimeout(dev, tolerance, settle, timeout), target) class ReadonlyParamDevice(paramdev.ReadonlyParamDevice): """turn a parameter into a temporary Readable""" temporary = True def __init__(self, devname_param): devname, pname = devname_param.split('.') paramdev.ReadonlyParamDevice.__init__(self, devname_param, device=devname, parameter=pname) def _getCache(self): return None class ParamDevice(paramdev.ParamDevice): """turn a parameter into a temporary Moveable""" temporary = True def __init__(self, devname_param): devname, pname = devname_param.split('.') paramdev.ParamDevice.__init__(self, devname_param, device=devname, parameter=pname) def _getCache(self): return None @usercommand @helparglist('"dev.param, dev.param, ..."') @parallel_safe def out(*args): """may be used as an argument in a scan for producing output columns for parameters can not be used in SetEnvironment For a single parameter: scan(dev, ...., out('tt.raw')) For multiple parameters: scan(dev, ...., *out('tt, tt.raw, mf.ramp')) """ result = [] for arg in args: for devpar in arg.split(','): devpar = devpar.strip() if '.' in devpar: result.append(ReadonlyParamDevice(devpar)) else: result.append(session.devices[devpar]) if len(result) == 1: return result[0] return result @usercommand @helparglist('"dev.param"') @parallel_safe def param(devpar): """turning a parameter into a moveable device Usage: scan(param(dev), ....) """ return ParamDevice(devpar) def copy_all(srcdir, dstdir): """copy all files from srcdir to dstdir""" files = glob(join(srcdir, '*')) for src in files: shutil.copy2(src, dstdir) return files @usercommand @helparglist('script path | instrument, proposal') @parallel_safe def copy_scripts(ins_or_dir, proposal=None): """copy scripts from another proposal or any directory""" if os.path.isdir(ins_or_dir): dirname = ins_or_dir else: data = join(os.environ['NICOS_DATA'], ins_or_dir) if isinstance(proposal, int): pat = re.compile('.*/(.*\D|)0*%d$' % proposal) else: pat = re.compile('.*/%s$' % re.escape(proposal)) for dirname in reversed(sorted(glob(join(data, '20*', '*')))): if pat.match(dirname): break else: raise FileNotFoundError('directory for %s/%s not found' % (ins_or_dir, proposal)) copy_all(join(dirname, 'scripts'), join(session.devices['Exp'].scriptpath)) def handle_args(args): detectors = [] put_timestamp = False for det in session.experiment.detectors: if isinstance(det, Timestamp) and det.show: put_timestamp = True ret = [] moveable_seen = False for arg in args: if moveable_seen and isinstance(arg, Readable): if put_timestamp: ret.append(TimestampReadable()) put_timestamp = False #ret.append(arg) ret.append(Det(arg)) elif isinstance(arg, str) and '.' in arg: ret.append(param(arg)) moveable_seen = True elif isinstance(arg, Measurable): if arg not in session.experiment.detectors: ret.append(arg) else: if isinstance(arg, Moveable): moveable_seen = True ret.append(arg) return ret @usercommand def scan(*args, **kwargs): nicos.commands.scan.scan(*handle_args(args), **kwargs) scan.help_arglist = nicos.commands.scan.scan.help_arglist scan.__doc__ = nicos.commands.scan.scan.__doc__ @usercommand def sweep(dev, start, end, *args, **kwargs): nicos.commands.scan.sweep(dev, start, end, *handle_args(args), **kwargs) sweep.help_arglist = nicos.commands.scan.sweep.help_arglist sweep.__doc__ = nicos.commands.scan.sweep.__doc__ class TimestampReadable(Readable): """timestamp as Readable""" temporary = True def __init__(self): Readable.__init__(self, 'timestamp', unit='s') def _getCache(self): """no cache needed""" self._cache = None def doRead(self, maxage=0): return time.time() def doStatus(self, maxage=0): return 100, '' def valueInfo(self): return Readable.valueInfo(self)