Files
frappy_sinq/labcommands.py
Markus Zolliker 9f7413a1a5 split lab.py in labcommands.py and timestamp.py
- timestamp.TimeStamp is used as a dummy detector
- labcommands may be used in lab expts. but is somehow outdated
2025-04-29 08:48:13 +02:00

495 lines
15 KiB
Python

# *****************************************************************************
#
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
#
# *****************************************************************************
"""collection of commands and devices for lab experiments
the commands are probably better not used, as the standard scan and sweep
commands are overwritten - this might be confusing for users working also
in neutron instruments
"""
import os
from os.path import join
import inspect
import shutil
import re
from glob import glob
import time
import math
from nicos import session
from nicos.core import Override, Param, Measurable, Moveable, Device, \
status, Readable
from nicos.core.errors import NicosError
from nicos.commands import helparglist, usercommand, parallel_safe
from nicos.commands.device import maw
import nicos.devices.generic.paramdev as paramdev
import nicos.commands.scan
from nicos_sinq.frappy_sinq.timestamp import Timestamp, Det
class FunDevice(Moveable):
"""wrap a device in a function
see usercommand fun
"""
temporary = True
_dev = None
_value = None
def __init__(self, func, dev_or_name=None, unit=None, devname=None, **kwargs):
self._func = func
self._kwargs = kwargs
if isinstance(dev_or_name, Device):
self._dev = dev_or_name
self._devname = dev_or_name.name
if devname:
# or should we raise an error?
name = devname
else:
name = self._devname
else:
name = dev_or_name or devname
self._devname = devname
self._dev = session.devices.get(devname)
if self._dev and unit is None:
unit = self._dev.unit
Moveable.__init__(self, name, unit=unit or '')
def doRead(self, maxage=0):
if self._devname:
if not self._dev:
try:
self._dev = session.devices[self._devname]
if not self.unit:
self.unit = self._dev.unit
except KeyError:
raise NicosError('device %r does not exist' % self._devname)
if self._value is None:
self._value = self._dev.read(maxage)
return self._value
def doStart(self, value):
if self._devname:
oldvalue = self.doRead()
else:
oldvalue = self._value
# append args depending in function signature
argnames, varargs, varkw = inspect.getfullargspec(self._func)[0:3]
args = self._dev, oldvalue
kwds = dict(self._kwargs)
if 'oldvalue' in argnames:
kwds['oldvalue'] = oldvalue
args = args[:1]
if 'dev' in argnames:
kwds['dev'] = self._dev
args = ()
if not varkw:
for k in kwds:
if k not in argnames:
kwds.pop(k)
result = self._func(value, *args[:len(argnames) - 1], **kwds)
self._value = value if result is None else result
def doIsCompleted(self):
return True
def doStop(self):
if self._dev:
self._dev.stop()
@usercommand
@helparglist('dev_or_name[, unit[, devname=..., **kwargs]]')
def fun(dev_or_name=None, unit=None, devname=None, **kwargs):
"""decorator for turning a function into a device for scans
Usage:
A function wrapped around a device (the argument might be a device name or a device)
@fun(dev="mydevice"):
def func(value, dev):
do_something_before_move()
maw(dev, value)
do_something_after_move()
scan(func, ...)
func will inherit the name and unit from the device.
func may have 1-3 positional arguments:
- value: the value to move to
- dev: the device given (in above example mydevice)
- oldvalue: the value before the move
A function turned into a device:
@fun("myname", "K")
def func(value):
result = do_some_thing(value)
return result
if no name is given, the function name is used.
in this case the final result may be returned, if different from value
"""
def decorator(func, name=dev_or_name):
if name is None:
name = func.__name__
return FunDevice(func, name, unit, devname, **kwargs)
return decorator
class Range(tuple):
def __new__(cls, astext, *args):
return tuple.__new__(cls, args)
def __init__(self, astext, *args):
super().__init__()
self.astext = astext
def __repr__(self):
return self.astext
def __add__(self, other):
astext = '%s+%s' % (repr(self), repr(other))
try:
if self[-1] == other[0]:
other = other[1:]
except IndexError:
pass
return Range(astext, *self, *other)
@usercommand
@helparglist('start, step, end')
def lnr(start, step_or_end=None, end=None, n=None):
"""linear range
to be used as argument for the scan command
Alternative form:
>>> lnr(start, end, n=n) # n+1 points
ranges might be added, adjacent equal points are omitted automatically
>>> list(lnr(0,1,4) + lnr(4,2,10) + [15])
> [0,1,2,3,4,6,8,10,15]
"""
if end is None:
astext = 'lnr(%s,%s,n=%s)' % (start, step_or_end, n)
if step_or_end is None:
# single value
return Range('lnr(%s)' % start, start)
end = step_or_end
if (n or 1) == 1:
# two values
return Range('lnr(%s,%s)' % (start, end), start, end)
n = abs(n)
step = (end - start) / n
else:
step = step_or_end
if n is not None or step == 0:
raise ValueError('illegal arguments')
astext = 'lnr(%s,%s,%s)' % (start, step_or_end, end)
n = int(round(abs((end - start) / step)))
if (start < end) == (step < 0):
step = -step
return Range(astext, start, *tuple(start + i * step for i in range(1,n)), end)
@usercommand
@helparglist('start, factor, end')
def lgr(start, factor_or_end, end=None, n=0):
"""logarithmic range
factor is the factor between two points (0.5 and 2 are equivalent)
Alternative form:
>>> lgr(start, end, n=n) # n+1 points
ranges might be added, adjacent equal points are omitted automatically
list(lnr(0,1,4) + lgr(4,2,15) + [30]) == [0,1,2,3,4,8,15,30]
"""
if end is None:
end = factor_or_end
if start <= 0 or end <= 0 or n <= 0:
raise ValueError('illegal arguments')
astext = 'lgr(%s,%s,n=%s)' % (start, end, n)
factor = (end / start) ** (1 / n)
else:
factor = factor_or_end
if start <= 0 or end <= 0 or factor <= 0:
raise ValueError('illegal arguments')
if (start > end) == (factor > 1):
factor = 1 / factor
astext = 'lgr(%s,%.18g,%s)' % (start, factor, end)
n = int(round(abs(math.log2(end / start) / math.log2(factor))))
return Range(astext, start,
*(start * factor ** i for i in range(1, n)),
end)
class WithTimeoutDev(Moveable):
"""Wrapper for better timeout/settling mechanism"""
parameters = {
'dev': Param('dev', type=lambda x=None: x, settable=True, mandatory=True),
'tolerance': Param('tolerance', type=float,
settable=True, default=1, mandatory=False),
'settle': Param('time to settle within tolerance', type=float,
settable=True, default=1, mandatory=False),
'timeout': Param('timeout for less progress than tolerance (defaults to settle)', type=float,
settable=True, default=-1, mandatory=False),
}
temporary = True
_start_time = None
def _getCache(self):
return None
def doStart(self, value):
self._start_time = time.time()
self._settle_start = None
self._settle_done = 0
self._mindif = abs(value - self.dev())
self.dev.doStart(value)
def doStop(self):
self._start_time = None
self.dev.doStop()
def doRead(self, maxage=0):
return self.dev.doRead(maxage)
def doStatus(self, maxage=0):
if not self._start_time:
return status.OK, ''
now = time.time()
dif = abs(self.doRead(maxage) - self.target)
if dif < self.tolerance:
if not self._settle_start:
self.log.info('settling %s' % self.dev)
self._settle_start = now - self._settle_done
if now > self._settle_start + self.settle:
self._start_time = None
return status.OK, ''
else:
if now > self._start_time + (self.timeout or self.settle):
self._start_time = None
self.log.info('timeout waiting for %s' % self.dev)
return status.OK, ''
if self._settle_start:
self._settle_done = now - self._settle_start
self._settle_start = None
if dif < self._mindif:
self._mindif = dif - self.tolerance
self._start_time = now
return status.BUSY, ''
@usercommand
@helparglist('dev, tolerance [,settle [,timeout]]')
@parallel_safe
def WithTimeout(dev, tolerance, settle=300, timeout=0):
if isinstance(dev, WithTimeoutDev):
dev.tolerance = tolerance
dev.settle = settle
dev.timeout = timeout
return dev
return WithTimeoutDev(dev.name, dev=dev, tolerance=tolerance, settle=settle, timeout=timeout,
unit=dev.unit, fmtstr=dev.fmtstr)
@usercommand
@helparglist('dev, target[, tolerance[,settle[, timeout]]]')
def maw_tmo(dev, target, tolerance, settle=300, timeout=0):
"""wait for target reached or no progress within timeout
wait for either target within tolerance for a total of <settle> seconds
or no progress more than tolerance for <timeout> seconds
"""
maw(WithTimeout(dev, tolerance, settle, timeout), target)
class ReadonlyParamDevice(paramdev.ReadonlyParamDevice):
"""turn a parameter into a temporary Readable"""
temporary = True
def __init__(self, devname_param):
devname, pname = devname_param.split('.')
paramdev.ReadonlyParamDevice.__init__(self, devname_param, device=devname, parameter=pname)
def _getCache(self):
return None
class ParamDevice(paramdev.ParamDevice):
"""turn a parameter into a temporary Moveable"""
temporary = True
def __init__(self, devname_param):
devname, pname = devname_param.split('.')
paramdev.ParamDevice.__init__(self, devname_param, device=devname, parameter=pname)
def _getCache(self):
return None
@usercommand
@helparglist('"dev.param, dev.param, ..."')
@parallel_safe
def out(*args):
"""may be used as an argument in a scan for producing output columns for parameters
can not be used in SetEnvironment
For a single parameter:
scan(dev, ...., out('tt.raw'))
For multiple parameters:
scan(dev, ...., *out('tt, tt.raw, mf.ramp'))
"""
result = []
for arg in args:
for devpar in arg.split(','):
devpar = devpar.strip()
if '.' in devpar:
result.append(ReadonlyParamDevice(devpar))
else:
result.append(session.devices[devpar])
if len(result) == 1:
return result[0]
return result
@usercommand
@helparglist('"dev.param"')
@parallel_safe
def param(devpar):
"""turning a parameter into a moveable device
Usage:
scan(param(dev), ....)
"""
return ParamDevice(devpar)
def copy_all(srcdir, dstdir):
"""copy all files from srcdir to dstdir"""
files = glob(join(srcdir, '*'))
for src in files:
shutil.copy2(src, dstdir)
return files
@usercommand
@helparglist('script path | instrument, proposal')
@parallel_safe
def copy_scripts(ins_or_dir, proposal=None):
"""copy scripts from another proposal or any directory"""
if os.path.isdir(ins_or_dir):
dirname = ins_or_dir
else:
data = join(os.environ['NICOS_DATA'], ins_or_dir)
if isinstance(proposal, int):
pat = re.compile('.*/(.*\D|)0*%d$' % proposal)
else:
pat = re.compile('.*/%s$' % re.escape(proposal))
for dirname in reversed(sorted(glob(join(data, '20*', '*')))):
if pat.match(dirname):
break
else:
raise FileNotFoundError('directory for %s/%s not found' % (ins_or_dir, proposal))
copy_all(join(dirname, 'scripts'), join(session.devices['Exp'].scriptpath))
def handle_args(args):
detectors = []
put_timestamp = False
for det in session.experiment.detectors:
if isinstance(det, Timestamp) and det.show:
put_timestamp = True
ret = []
moveable_seen = False
for arg in args:
if moveable_seen and isinstance(arg, Readable):
if put_timestamp:
ret.append(TimestampReadable())
put_timestamp = False
#ret.append(arg)
ret.append(Det(arg))
elif isinstance(arg, str) and '.' in arg:
ret.append(param(arg))
moveable_seen = True
elif isinstance(arg, Measurable):
if arg not in session.experiment.detectors:
ret.append(arg)
else:
if isinstance(arg, Moveable):
moveable_seen = True
ret.append(arg)
return ret
@usercommand
def scan(*args, **kwargs):
nicos.commands.scan.scan(*handle_args(args), **kwargs)
scan.help_arglist = nicos.commands.scan.scan.help_arglist
scan.__doc__ = nicos.commands.scan.scan.__doc__
@usercommand
def sweep(dev, start, end, *args, **kwargs):
nicos.commands.scan.sweep(dev, start, end, *handle_args(args), **kwargs)
sweep.help_arglist = nicos.commands.scan.sweep.help_arglist
sweep.__doc__ = nicos.commands.scan.sweep.__doc__
class TimestampReadable(Readable):
"""timestamp as Readable"""
temporary = True
def __init__(self):
Readable.__init__(self, 'timestamp', unit='s')
def _getCache(self):
"""no cache needed"""
self._cache = None
def doRead(self, maxage=0):
return time.time()
def doStatus(self, maxage=0):
return 100, ''
def valueInfo(self):
return Readable.valueInfo(self)