### # Copyright 2008-2011 Diamond Light Source Ltd. # This file is part of Diffcalc. # # Diffcalc is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Diffcalc is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Diffcalc. If not, see . ### #try: # from gda.device import Scannable #except ImportError: # from diffcalc.gdasupport.minigda.scannable import Scannable from diffcalc.gdasupport.minigda.scannable import Scannable from diffcalc.util import getMessageFromException, allnum, bold import math ROOT_NAMESPACE_DICT = {} class Pos(object): def __init__(self): self.__name__ = 'pos' def __call__(self, *posargs): if len(posargs) == 0: keys = dict(ROOT_NAMESPACE_DICT).keys() keys.sort() for key in keys: val = ROOT_NAMESPACE_DICT[key] if isinstance(val, Scannable): print self.posReturningReport(val) else: print self.posReturningReport(*posargs) def posReturningReport(self, *posargs): # report position of this scannable if len(posargs) == 1: scannable = posargs[0] self._assert_scannable(scannable) return self._generatePositionReport(scannable) # Move the scannable and report elif len(posargs) == 2: scannable = posargs[0] self._assert_scannable(scannable) # Move it scannable.asynchronousMoveTo(posargs[1]) # TODO: minigda assumes all moves complete instantly, so no need # yet to check the move is complete return self._generatePositionReport(scannable) else: raise ValueError( "Invlaid arguements: 'pos [ scannable [ value ] ]'") def _assert_scannable(self, obj): if not isinstance(obj, Scannable): raise TypeError( "The first argument to the pos command must be scannable. " "Not: " + str(type(obj))) def _generatePositionReport(self, scannable): fieldNames = (tuple(scannable.getInputNames()) + tuple(scannable.getExtraNames())) # All scannables result = "%s:" % scannable.getName() result = result.ljust(10) try: pos = scannable.getPosition() except Exception, e: return result + "Error: %s" % getMessageFromException(e) if pos is None: return result + "---" # Single field scannable: if len(fieldNames) == 1: try: result += "%s" % scannable.formatPositionFields(pos)[0] except AttributeError: result += str(scannable()) # Multi field scannable: else: try: formatted = scannable.formatPositionFields(pos) for name, formattedValue in zip(fieldNames, formatted): result += "%s: %s " % (name, formattedValue) except AttributeError: result += str(scannable()) return result class ScanDataHandler: def __init__(self): self.scannables = None def callAtScanStart(self, scannables): pass def callWithScanPoint(self, PositionDictIndexedByScannable): pass def callAtScanEnd(self): pass class ScanDataPrinter(ScanDataHandler): def __init__(self): self.first_point_printed = False self.widths = [] self.scannables = [] def callAtScanStart(self, scannables): self.first_point_printed = False self.scannables = scannables def print_first_point(self, position_dict): # also sets self.widths header_strings = [] for scn in self.scannables: field_names = list(scn.getInputNames()) + list(scn.getExtraNames()) if len(field_names) == 1: header_strings.append(scn.getName()) else: for field_name in field_names: header_strings.append(field_name) first_row_strings = [] for scn in self.scannables: pos = position_dict[scn] first_row_strings.extend(scn.formatPositionFields(pos)) self.widths = [] for header, pos_string in zip(header_strings, first_row_strings): self.widths.append(max(len(header), len(pos_string))) header_cells = [] for heading, width in zip(header_strings, self.widths): header_cells.append(heading.rjust(width)) underline_cells = ['-' * w for w in self.widths] first_row_cells = [] for pos, width in zip(first_row_strings, self.widths): first_row_cells.append(pos.rjust(width)) #table_width = sum(self.widths) + len(self.widths * 2) - 2 lines = [] #lines.append('=' * table_width) lines.append(bold(' '.join(header_cells))) lines.append(' '.join(underline_cells)) lines.append(' '.join(first_row_cells)) print '\n'.join(lines) def callWithScanPoint(self, position_dict): if not self.first_point_printed: self.print_first_point(position_dict) self.first_point_printed = True else: row_strings = [] for scn in self.scannables: pos = position_dict[scn] row_strings.extend(scn.formatPositionFields(pos)) row_cells = [] for pos, width in zip(row_strings, self.widths): row_cells.append(pos.rjust(width)) print ' '.join(row_cells) def callAtScanEnd(self): #table_width = sum(self.widths) + len(self.widths * 2) - 2 #print '=' * table_width pass class Scan(object): class Group: def __init__(self, scannable): self.scannable = scannable self.args = [] def __cmp__(self, other): return(self.scannable.getLevel() - other.scannable.getLevel()) def __repr__(self): return "Group(%s, %s)" % (self.scannable.getName(), str(self.args)) def shouldTriggerLoop(self): return len(self.args) == 3 def __init__(self, scanDataHandlers): # scanDataHandlers should be list if type(scanDataHandlers) not in (tuple, list): scanDataHandlers = (scanDataHandlers,) self.dataHandlers = scanDataHandlers def __call__(self, *scanargs): groups = self._parseScanArgsIntoScannableArgGroups(scanargs) groups = self._reorderInnerGroupsAccordingToLevel(groups) # Configure data handlers for a new scan for handler in self.dataHandlers: handler.callAtScanStart( [grp.scannable for grp in groups]) # Perform the scan self._performScan(groups, currentRecursionLevel=0) # Inform data handlers of scan completion for handler in self.dataHandlers: handler.callAtScanEnd() def _parseScanArgsIntoScannableArgGroups(self, scanargs): """ -> [ Group(scnA, (a1, a2, a2)), Group((scnB), (b1)), ... ... Group((scnC),()), Group((scnD),(d1))] """ result = [] if not isinstance(scanargs[0], Scannable): raise TypeError("First scan argument must be a scannable") # Parse out scannables followed by non-scannable args for arg in scanargs: if isinstance(arg, Scannable): result.append(Scan.Group(arg)) else: result[-1].args.append(arg) return result def _reorderInnerGroupsAccordingToLevel(self, groups): # Find the first group not to trigger a loop for idx, group in enumerate(groups): if not group.shouldTriggerLoop(): break latter = groups[idx:]; latter.sort() # Horrible hack not needed in python 3! return groups[:idx] + latter def _performScan(self, groups, currentRecursionLevel): # groups[currentRecursionLevel:] will start with either: # a) A loop triggering group # b) A number (possibly 0) of non-loop triggering groups unprocessedGroups = groups[currentRecursionLevel:] # 1) If first remaining group should trigger a loop, perform this loop, # recursively calling this method on the remaining groups if len(unprocessedGroups) > 0: first = unprocessedGroups[0] # If groups starts with a request to loop: if first.shouldTriggerLoop(): posList = self._frange(first.args[0], first.args[1], first.args[2]) for pos in posList: first.scannable.asynchronousMoveTo(pos) # TODO: Should wait. minigda assumes all moves complete immediately self._performScan(groups, currentRecursionLevel + 1) return # 2) Move all non-loop triggering groups (may be zero) self._moveNonLoopTriggeringGroups(unprocessedGroups) # 3) Sample position of all scannables posDict = self._samplePositionsOfAllScannables(groups) # 4) Inform the data handlers that this point has been recorded for handler in self.dataHandlers: handler.callWithScanPoint(posDict) def _moveNonLoopTriggeringGroups(self, groups): # TODO: Should wait. minigda assumes all moves complete immediately. groups could be zero lengthed. for grp in groups: if len(grp.args) == 0: pass elif len(grp.args) == 1: grp.scannable.asynchronousMoveTo(grp.args[0]) elif len(grp.args) == 2: raise Exception("Scannables followed by two args not supported by minigda's scan command ") else: raise Exception("Scannable: %s args%s" % (grp.scannable, str(grp.args))) def _samplePositionsOfAllScannables(self, groups): posDict = {} for grp in groups: posDict[grp.scannable] = grp.scannable.getPosition() return posDict def _frange(self, limit1, limit2, increment): """Range function that accepts floats (and integers). """ # limit1 = float(limit1) # limit2 = float(limit2) try: increment = float(increment) except TypeError: raise TypeError( "Only scaler values are supported, not GDA format vectors.") count = int(math.ceil(((limit2 - limit1) + increment / 100.) / increment)) result = [] for n in range(count): result.append(limit1 + n * increment) return result def sim(scn, hkl): """sim hkl scn -- simulates moving scannable (not all) """ if not isinstance(hkl, (tuple, list)): raise TypeError() if not allnum(hkl): raise TypeError() try: print scn.simulateMoveTo(hkl) except AttributeError: raise TypeError( "The first argument does not support simulated moves")