Files
gac-x04sa 74c54dff38 Startup
2019-08-16 14:54:56 +02:00

322 lines
11 KiB
Python

###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
#try:
# from gda.device import Scannable
#except ImportError:
# from diffcalc.gdasupport.minigda.scannable import Scannable
from diffcalc.gdasupport.minigda.scannable import Scannable
from diffcalc.util import getMessageFromException, allnum, bold
import math
ROOT_NAMESPACE_DICT = {}
class Pos(object):
def __init__(self):
self.__name__ = 'pos'
def __call__(self, *posargs):
if len(posargs) == 0:
keys = dict(ROOT_NAMESPACE_DICT).keys()
keys.sort()
for key in keys:
val = ROOT_NAMESPACE_DICT[key]
if isinstance(val, Scannable):
print self.posReturningReport(val)
else:
print self.posReturningReport(*posargs)
def posReturningReport(self, *posargs):
# report position of this scannable
if len(posargs) == 1:
scannable = posargs[0]
self._assert_scannable(scannable)
return self._generatePositionReport(scannable)
# Move the scannable and report
elif len(posargs) == 2:
scannable = posargs[0]
self._assert_scannable(scannable)
# Move it
scannable.asynchronousMoveTo(posargs[1])
# TODO: minigda assumes all moves complete instantly, so no need
# yet to check the move is complete
return self._generatePositionReport(scannable)
else:
raise ValueError(
"Invlaid arguements: 'pos [ scannable [ value ] ]'")
def _assert_scannable(self, obj):
if not isinstance(obj, Scannable):
raise TypeError(
"The first argument to the pos command must be scannable. "
"Not: " + str(type(obj)))
def _generatePositionReport(self, scannable):
fieldNames = (tuple(scannable.getInputNames()) +
tuple(scannable.getExtraNames()))
# All scannables
result = "%s:" % scannable.getName()
result = result.ljust(10)
try:
pos = scannable.getPosition()
except Exception, e:
return result + "Error: %s" % getMessageFromException(e)
if pos is None:
return result + "---"
# Single field scannable:
if len(fieldNames) == 1:
try:
result += "%s" % scannable.formatPositionFields(pos)[0]
except AttributeError:
result += str(scannable())
# Multi field scannable:
else:
try:
formatted = scannable.formatPositionFields(pos)
for name, formattedValue in zip(fieldNames, formatted):
result += "%s: %s " % (name, formattedValue)
except AttributeError:
result += str(scannable())
return result
class ScanDataHandler:
def __init__(self):
self.scannables = None
def callAtScanStart(self, scannables):
pass
def callWithScanPoint(self, PositionDictIndexedByScannable):
pass
def callAtScanEnd(self):
pass
class ScanDataPrinter(ScanDataHandler):
def __init__(self):
self.first_point_printed = False
self.widths = []
self.scannables = []
def callAtScanStart(self, scannables):
self.first_point_printed = False
self.scannables = scannables
def print_first_point(self, position_dict):
# also sets self.widths
header_strings = []
for scn in self.scannables:
field_names = list(scn.getInputNames()) + list(scn.getExtraNames())
if len(field_names) == 1:
header_strings.append(scn.getName())
else:
for field_name in field_names:
header_strings.append(field_name)
first_row_strings = []
for scn in self.scannables:
pos = position_dict[scn]
first_row_strings.extend(scn.formatPositionFields(pos))
self.widths = []
for header, pos_string in zip(header_strings, first_row_strings):
self.widths.append(max(len(header), len(pos_string)))
header_cells = []
for heading, width in zip(header_strings, self.widths):
header_cells.append(heading.rjust(width))
underline_cells = ['-' * w for w in self.widths]
first_row_cells = []
for pos, width in zip(first_row_strings, self.widths):
first_row_cells.append(pos.rjust(width))
#table_width = sum(self.widths) + len(self.widths * 2) - 2
lines = []
#lines.append('=' * table_width)
lines.append(bold(' '.join(header_cells)))
lines.append(' '.join(underline_cells))
lines.append(' '.join(first_row_cells))
print '\n'.join(lines)
def callWithScanPoint(self, position_dict):
if not self.first_point_printed:
self.print_first_point(position_dict)
self.first_point_printed = True
else:
row_strings = []
for scn in self.scannables:
pos = position_dict[scn]
row_strings.extend(scn.formatPositionFields(pos))
row_cells = []
for pos, width in zip(row_strings, self.widths):
row_cells.append(pos.rjust(width))
print ' '.join(row_cells)
def callAtScanEnd(self):
#table_width = sum(self.widths) + len(self.widths * 2) - 2
#print '=' * table_width
pass
class Scan(object):
class Group:
def __init__(self, scannable):
self.scannable = scannable
self.args = []
def __cmp__(self, other):
return(self.scannable.getLevel() - other.scannable.getLevel())
def __repr__(self):
return "Group(%s, %s)" % (self.scannable.getName(), str(self.args))
def shouldTriggerLoop(self):
return len(self.args) == 3
def __init__(self, scanDataHandlers):
# scanDataHandlers should be list
if type(scanDataHandlers) not in (tuple, list):
scanDataHandlers = (scanDataHandlers,)
self.dataHandlers = scanDataHandlers
def __call__(self, *scanargs):
groups = self._parseScanArgsIntoScannableArgGroups(scanargs)
groups = self._reorderInnerGroupsAccordingToLevel(groups)
# Configure data handlers for a new scan
for handler in self.dataHandlers: handler.callAtScanStart(
[grp.scannable for grp in groups])
# Perform the scan
self._performScan(groups, currentRecursionLevel=0)
# Inform data handlers of scan completion
for handler in self.dataHandlers: handler.callAtScanEnd()
def _parseScanArgsIntoScannableArgGroups(self, scanargs):
"""
-> [ Group(scnA, (a1, a2, a2)), Group((scnB), (b1)), ...
... Group((scnC),()), Group((scnD),(d1))]
"""
result = []
if not isinstance(scanargs[0], Scannable):
raise TypeError("First scan argument must be a scannable")
# Parse out scannables followed by non-scannable args
for arg in scanargs:
if isinstance(arg, Scannable):
result.append(Scan.Group(arg))
else:
result[-1].args.append(arg)
return result
def _reorderInnerGroupsAccordingToLevel(self, groups):
# Find the first group not to trigger a loop
for idx, group in enumerate(groups):
if not group.shouldTriggerLoop():
break
latter = groups[idx:]; latter.sort() # Horrible hack not needed in python 3!
return groups[:idx] + latter
def _performScan(self, groups, currentRecursionLevel):
# groups[currentRecursionLevel:] will start with either:
# a) A loop triggering group
# b) A number (possibly 0) of non-loop triggering groups
unprocessedGroups = groups[currentRecursionLevel:]
# 1) If first remaining group should trigger a loop, perform this loop,
# recursively calling this method on the remaining groups
if len(unprocessedGroups) > 0:
first = unprocessedGroups[0]
# If groups starts with a request to loop:
if first.shouldTriggerLoop():
posList = self._frange(first.args[0], first.args[1], first.args[2])
for pos in posList:
first.scannable.asynchronousMoveTo(pos)
# TODO: Should wait. minigda assumes all moves complete immediately
self._performScan(groups, currentRecursionLevel + 1)
return
# 2) Move all non-loop triggering groups (may be zero)
self._moveNonLoopTriggeringGroups(unprocessedGroups)
# 3) Sample position of all scannables
posDict = self._samplePositionsOfAllScannables(groups)
# 4) Inform the data handlers that this point has been recorded
for handler in self.dataHandlers: handler.callWithScanPoint(posDict)
def _moveNonLoopTriggeringGroups(self, groups):
# TODO: Should wait. minigda assumes all moves complete immediately. groups could be zero lengthed.
for grp in groups:
if len(grp.args) == 0:
pass
elif len(grp.args) == 1:
grp.scannable.asynchronousMoveTo(grp.args[0])
elif len(grp.args) == 2:
raise Exception("Scannables followed by two args not supported by minigda's scan command ")
else:
raise Exception("Scannable: %s args%s" % (grp.scannable, str(grp.args)))
def _samplePositionsOfAllScannables(self, groups):
posDict = {}
for grp in groups:
posDict[grp.scannable] = grp.scannable.getPosition()
return posDict
def _frange(self, limit1, limit2, increment):
"""Range function that accepts floats (and integers).
"""
# limit1 = float(limit1)
# limit2 = float(limit2)
try:
increment = float(increment)
except TypeError:
raise TypeError(
"Only scaler values are supported, not GDA format vectors.")
count = int(math.ceil(((limit2 - limit1) + increment / 100.) / increment))
result = []
for n in range(count):
result.append(limit1 + n * increment)
return result
def sim(scn, hkl):
"""sim hkl scn -- simulates moving scannable (not all)
"""
if not isinstance(hkl, (tuple, list)):
raise TypeError()
if not allnum(hkl):
raise TypeError()
try:
print scn.simulateMoveTo(hkl)
except AttributeError:
raise TypeError(
"The first argument does not support simulated moves")