This commit is contained in:
2019-03-20 13:52:00 +01:00
parent 3084fe0510
commit 5db0f78aee
910 changed files with 191152 additions and 322 deletions

View File

@@ -0,0 +1,322 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
#try:
# from gda.device import Scannable
#except ImportError:
# from diffcalc.gdasupport.minigda.scannable import Scannable
from diffcalc.gdasupport.minigda.scannable import Scannable
from diffcalc.util import getMessageFromException, allnum, bold
import math
ROOT_NAMESPACE_DICT = {}
class Pos(object):
def __init__(self):
self.__name__ = 'pos'
def __call__(self, *posargs):
if len(posargs) == 0:
keys = dict(ROOT_NAMESPACE_DICT).keys()
keys.sort()
for key in keys:
val = ROOT_NAMESPACE_DICT[key]
if isinstance(val, Scannable):
print self.posReturningReport(val)
else:
print self.posReturningReport(*posargs)
def posReturningReport(self, *posargs):
# report position of this scannable
if len(posargs) == 1:
scannable = posargs[0]
self._assert_scannable(scannable)
return self._generatePositionReport(scannable)
# Move the scannable and report
elif len(posargs) == 2:
scannable = posargs[0]
self._assert_scannable(scannable)
# Move it
scannable.asynchronousMoveTo(posargs[1])
# TODO: minigda assumes all moves complete instantly, so no need
# yet to check the move is complete
return self._generatePositionReport(scannable)
else:
raise ValueError(
"Invlaid arguements: 'pos [ scannable [ value ] ]'")
def _assert_scannable(self, obj):
if not isinstance(obj, Scannable):
raise TypeError(
"The first argument to the pos command must be scannable. "
"Not: " + str(type(obj)))
def _generatePositionReport(self, scannable):
fieldNames = (tuple(scannable.getInputNames()) +
tuple(scannable.getExtraNames()))
# All scannables
result = "%s:" % scannable.getName()
result = result.ljust(10)
try:
pos = scannable.getPosition()
except Exception, e:
return result + "Error: %s" % getMessageFromException(e)
if pos is None:
return result + "---"
# Single field scannable:
if len(fieldNames) == 1:
try:
result += "%s" % scannable.formatPositionFields(pos)[0]
except AttributeError:
result += str(scannable())
# Multi field scannable:
else:
try:
formatted = scannable.formatPositionFields(pos)
for name, formattedValue in zip(fieldNames, formatted):
result += "%s: %s " % (name, formattedValue)
except AttributeError:
result += str(scannable())
return result
class ScanDataHandler:
def __init__(self):
self.scannables = None
def callAtScanStart(self, scannables):
pass
def callWithScanPoint(self, PositionDictIndexedByScannable):
pass
def callAtScanEnd(self):
pass
class ScanDataPrinter(ScanDataHandler):
def __init__(self):
self.first_point_printed = False
self.widths = []
self.scannables = []
def callAtScanStart(self, scannables):
self.first_point_printed = False
self.scannables = scannables
def print_first_point(self, position_dict):
# also sets self.widths
header_strings = []
for scn in self.scannables:
field_names = list(scn.getInputNames()) + list(scn.getExtraNames())
if len(field_names) == 1:
header_strings.append(scn.getName())
else:
for field_name in field_names:
header_strings.append(field_name)
first_row_strings = []
for scn in self.scannables:
pos = position_dict[scn]
first_row_strings.extend(scn.formatPositionFields(pos))
self.widths = []
for header, pos_string in zip(header_strings, first_row_strings):
self.widths.append(max(len(header), len(pos_string)))
header_cells = []
for heading, width in zip(header_strings, self.widths):
header_cells.append(heading.rjust(width))
underline_cells = ['-' * w for w in self.widths]
first_row_cells = []
for pos, width in zip(first_row_strings, self.widths):
first_row_cells.append(pos.rjust(width))
#table_width = sum(self.widths) + len(self.widths * 2) - 2
lines = []
#lines.append('=' * table_width)
lines.append(bold(' '.join(header_cells)))
lines.append(' '.join(underline_cells))
lines.append(' '.join(first_row_cells))
print '\n'.join(lines)
def callWithScanPoint(self, position_dict):
if not self.first_point_printed:
self.print_first_point(position_dict)
self.first_point_printed = True
else:
row_strings = []
for scn in self.scannables:
pos = position_dict[scn]
row_strings.extend(scn.formatPositionFields(pos))
row_cells = []
for pos, width in zip(row_strings, self.widths):
row_cells.append(pos.rjust(width))
print ' '.join(row_cells)
def callAtScanEnd(self):
#table_width = sum(self.widths) + len(self.widths * 2) - 2
#print '=' * table_width
pass
class Scan(object):
class Group:
def __init__(self, scannable):
self.scannable = scannable
self.args = []
def __cmp__(self, other):
return(self.scannable.getLevel() - other.scannable.getLevel())
def __repr__(self):
return "Group(%s, %s)" % (self.scannable.getName(), str(self.args))
def shouldTriggerLoop(self):
return len(self.args) == 3
def __init__(self, scanDataHandlers):
# scanDataHandlers should be list
if type(scanDataHandlers) not in (tuple, list):
scanDataHandlers = (scanDataHandlers,)
self.dataHandlers = scanDataHandlers
def __call__(self, *scanargs):
groups = self._parseScanArgsIntoScannableArgGroups(scanargs)
groups = self._reorderInnerGroupsAccordingToLevel(groups)
# Configure data handlers for a new scan
for handler in self.dataHandlers: handler.callAtScanStart(
[grp.scannable for grp in groups])
# Perform the scan
self._performScan(groups, currentRecursionLevel=0)
# Inform data handlers of scan completion
for handler in self.dataHandlers: handler.callAtScanEnd()
def _parseScanArgsIntoScannableArgGroups(self, scanargs):
"""
-> [ Group(scnA, (a1, a2, a2)), Group((scnB), (b1)), ...
... Group((scnC),()), Group((scnD),(d1))]
"""
result = []
if not isinstance(scanargs[0], Scannable):
raise TypeError("First scan argument must be a scannable")
# Parse out scannables followed by non-scannable args
for arg in scanargs:
if isinstance(arg, Scannable):
result.append(Scan.Group(arg))
else:
result[-1].args.append(arg)
return result
def _reorderInnerGroupsAccordingToLevel(self, groups):
# Find the first group not to trigger a loop
for idx, group in enumerate(groups):
if not group.shouldTriggerLoop():
break
latter = groups[idx:]; latter.sort() # Horrible hack not needed in python 3!
return groups[:idx] + latter
def _performScan(self, groups, currentRecursionLevel):
# groups[currentRecursionLevel:] will start with either:
# a) A loop triggering group
# b) A number (possibly 0) of non-loop triggering groups
unprocessedGroups = groups[currentRecursionLevel:]
# 1) If first remaining group should trigger a loop, perform this loop,
# recursively calling this method on the remaining groups
if len(unprocessedGroups) > 0:
first = unprocessedGroups[0]
# If groups starts with a request to loop:
if first.shouldTriggerLoop():
posList = self._frange(first.args[0], first.args[1], first.args[2])
for pos in posList:
first.scannable.asynchronousMoveTo(pos)
# TODO: Should wait. minigda assumes all moves complete immediately
self._performScan(groups, currentRecursionLevel + 1)
return
# 2) Move all non-loop triggering groups (may be zero)
self._moveNonLoopTriggeringGroups(unprocessedGroups)
# 3) Sample position of all scannables
posDict = self._samplePositionsOfAllScannables(groups)
# 4) Inform the data handlers that this point has been recorded
for handler in self.dataHandlers: handler.callWithScanPoint(posDict)
def _moveNonLoopTriggeringGroups(self, groups):
# TODO: Should wait. minigda assumes all moves complete immediately. groups could be zero lengthed.
for grp in groups:
if len(grp.args) == 0:
pass
elif len(grp.args) == 1:
grp.scannable.asynchronousMoveTo(grp.args[0])
elif len(grp.args) == 2:
raise Exception("Scannables followed by two args not supported by minigda's scan command ")
else:
raise Exception("Scannable: %s args%s" % (grp.scannable, str(grp.args)))
def _samplePositionsOfAllScannables(self, groups):
posDict = {}
for grp in groups:
posDict[grp.scannable] = grp.scannable.getPosition()
return posDict
def _frange(self, limit1, limit2, increment):
"""Range function that accepts floats (and integers).
"""
# limit1 = float(limit1)
# limit2 = float(limit2)
try:
increment = float(increment)
except TypeError:
raise TypeError(
"Only scaler values are supported, not GDA format vectors.")
count = int(math.ceil(((limit2 - limit1) + increment / 100.) / increment))
result = []
for n in range(count):
result.append(limit1 + n * increment)
return result
def sim(scn, hkl):
"""sim hkl scn -- simulates moving scannable (not all)
"""
if not isinstance(hkl, (tuple, list)):
raise TypeError()
if not allnum(hkl):
raise TypeError()
try:
print scn.simulateMoveTo(hkl)
except AttributeError:
raise TypeError(
"The first argument does not support simulated moves")

View File

@@ -0,0 +1,511 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
import time
try:
from gda.device.scannable import ScannableBase
except ImportError:
class Scannable(object):
pass
class ScannableBase(Scannable):
"""Implemtation of a subset of OpenGDA's Scannable interface
"""
level = 5
inputNames = []
extraNames = []
outputFormat = []
def isBusy(self):
raise NotImplementedError()
def rawGetPosition(self):
raise NotImplementedError()
def rawAsynchronousMoveTo(self, newpos):
raise NotImplementedError()
def waitWhileBusy(self):
while self.isBusy():
time.sleep(.1)
def getPosition(self):
return self.rawGetPosition()
def asynchronousMoveTo(self, newpos):
self.rawAsynchronousMoveTo(newpos)
def atScanStart(self):
pass
def atScanEnd(self):
pass
def atCommandFailure(self):
pass
###
def __repr__(self):
pos = self.getPosition()
formattedValues = self.formatPositionFields(pos)
if len(tuple(self.getInputNames()) + tuple(self.getExtraNames())) > 1:
result = self.getName() + ': '
else:
result = ''
names = tuple(self.getInputNames()) + tuple(self.getExtraNames())
for name, val in zip(names, formattedValues):
result += ' ' + name + ': ' + val
return result
###
def formatPositionFields(self, pos):
"""Returns position as array of formatted strings"""
# Make sure pos is a tuple or list
if type(pos) not in (tuple, list):
pos = tuple([pos])
# Sanity check
if len(pos) != len(self.getOutputFormat()):
raise Exception(
"In scannable '%s':number of position fields differs from "
"number format strings specified" % self.getName())
result = []
for field, format in zip(pos, self.getOutputFormat()):
if field is None:
result.append('???')
else:
s = (format % field)
## if width!=None:
## s = s.ljust(width)
result.append(s)
return result
def getName(self):
return self.name
def setName(self, value):
self.name = value
def getLevel(self):
return self.level
def setLevel(self, value):
self.level = value
def getInputNames(self):
return self.inputNames
def setInputNames(self, value):
self.inputNames = value
def getExtraNames(self):
return self.extraNames
def setExtraNames(self, value):
self.extraNames = value
def getOutputFormat(self):
return self.outputFormat
def setOutputFormat(self, value):
if type(value) not in (tuple, list):
raise TypeError(
"%s.setOutputFormat() expects tuple or list; not %s" %
(self.getName(), str(type(value))))
self.outputFormat = value
def __call__(self, newpos=None):
if newpos is None:
return self.getPosition()
self.asynchronousMoveTo(newpos)
class ScannableAdapter(Scannable):
'''Wrap up a Scannable and give it a new name and optionally an offset
(added to the delegate when reading up and subtracting when setting down
'''
def __init__(self, delegate_scn, name, offset=0):
assert len(delegate_scn.getInputNames()) == 1
assert len(delegate_scn.getExtraNames()) == 0
self.delegate_scn = delegate_scn
self.name = name
self.offset = offset
def __getattr__(self, name):
return getattr(self.delegate_scn, name)
def getName(self):
return self.name
def getInputNames(self):
return [self.name]
def getPosition(self):
return self.delegate_scn.getPosition() + self.offset
def asynchronousMoveTo(self, newpos):
self.delegate_scn.asynchronousMoveTo(newpos - self.offset)
def __repr__(self):
pos = self.getPosition()
formatted_values = self.delegate_scn.formatPositionFields(pos)
return self.name + ': ' + formatted_values[0] + ' ' + self.get_hint()
def get_hint(self):
if self.offset:
offset_hint = ' + ' if self.offset >= 0 else ' - '
offset_hint += str(self.offset)
else:
offset_hint = ''
return '(%s%s)' % (self.delegate_scn.name, offset_hint)
def __call__(self, newpos=None):
if newpos is None:
return self.getPosition()
self.asynchronousMoveTo(newpos)
class SingleFieldDummyScannable(ScannableBase):
def __init__(self, name, initial_position=0.):
self.name = name
self.inputNames = [name]
self.outputFormat = ['% 6.4f']
self.level = 3
self._current_position = float(initial_position)
def isBusy(self):
return False
def waitWhileBusy(self):
return
def asynchronousMoveTo(self, new_position):
self._current_position = float(new_position)
def getPosition(self):
return self._current_position
class DummyPD(SingleFieldDummyScannable):
"""For compatability with the gda's dummy_pd module"""
pass
class MultiInputExtraFieldsDummyScannable(ScannableBase):
'''Multi input Dummy PD Class supporting input and extra fields'''
def __init__(self, name, inputNames, extraNames):
self.setName(name)
self.setInputNames(inputNames)
self.setExtraNames(extraNames)
self.setOutputFormat(['%6.4f'] * (len(inputNames) + len(extraNames)))
self.setLevel(3)
self.currentposition = [0.0] * len(inputNames)
def isBusy(self):
return 0
def asynchronousMoveTo(self, new_position):
if type(new_position) == type(1) or type(new_position) == type(1.0):
new_position = [new_position]
msg = "Wrong new_position size"
assert len(new_position) == len(self.currentposition), msg
for i in range(len(new_position)):
if new_position[i] != None:
self.currentposition[i] = float(new_position[i])
def getPosition(self):
extraValues = range(100, 100 + (len(self.getExtraNames())))
return self.currentposition + map(float, extraValues)
class ZeroInputExtraFieldsDummyScannable(ScannableBase):
'''Zero input/extra field dummy pd
'''
def __init__(self, name):
self.setName(name)
self.setInputNames([])
self.setOutputFormat([])
def isBusy(self):
return 0
def asynchronousMoveTo(self, new_position):
pass
def getPosition(self):
pass
class ScannableGroup(ScannableBase):
"""wraps up motors. Simulates motors if non given."""
def __init__(self, name, motorList):
self.setName(name)
# Set input format
motorNames = []
for scn in motorList:
motorNames.append(scn.getName())
self.setInputNames(motorNames)
# Set output format
format = []
for motor in motorList:
format.append(motor.getOutputFormat()[0])
self.setOutputFormat(format)
self.__motors = motorList
def asynchronousMoveTo(self, position):
# if input has any Nones, then replace these with the current positions
if None in position:
position = list(position)
current = self.getPosition()
for idx, val in enumerate(position):
if val is None:
position[idx] = current[idx]
for scn, pos in zip(self.__motors, position):
scn.asynchronousMoveTo(pos)
def getPosition(self):
return [scn.getPosition() for scn in self.__motors]
def isBusy(self):
for scn in self.__motors:
if scn.isBusy():
return True
return False
def configure(self):
pass
class ScannableMotionWithScannableFieldsBase(ScannableBase):
'''
This extended version of ScannableMotionBase contains a
completeInstantiation() method which adds a dictionary of
MotionScannableParts to an instance. Each part allows one of the
instances fields to be interacted with like it itself is a scannable.
Fields are dynamically added to the instance linking to these parts
allowing dotted access from Jython. They may also be accessed using
Jython container access methods (via the __getitem__() method). To acess
them from Jave use the getComponent(name) method.
When moving a part (via either a pos or scan command), the part calls
the parent to perform the actual task. The parts asynchronousMoveto
command will call the parent with a list of None values except for the
field it represents which will be passed the desired position value.
The asynchronousMoveTo method in class that inherats from this base
class then must handle these Nones. In some cases the method may
actually be able to move the underlying system assoiciated with one
field individually from others. If this is not possible the best
behaviour may be to simply not support this beahviour and exception or
alternatively to substitute the None values with actual current position
of parent's scannables associated fields.
ScannableMotionBaseWithMemory() inherats from this calss and provides a
solution useful for some scenarious: it keeps track of the last position
moved to, and replaces the Nones in an asynchronousMoveTo request with
these values. There are a number of dangers associated with this which
are addressed in that class's documentation, but it provides a way to
move one axis within a group of non-orthogonal axis while keeping the
others still.
'''
childrenDict = {}
numInputFields = None
numExtraFields = None
def completeInstantiation(self):
'''This method should be called at the end of all user defined
consructors'''
# self.validate()
self.numInputFields = len(self.getInputNames())
self.numExtraFields = len(self.getExtraNames())
self.addScannableParts()
self.autoCompletePartialMoveToTargets = False
self.positionAtScanStart = None
def setAutoCompletePartialMoveToTargets(self, b):
self.autoCompletePartialMoveToTargets = b
def atScanStart(self):
self.positionAtScanStart = self.getPosition()
def atCommandFailure(self):
self.positionAtScanStart = None
def atScanEnd(self):
self.positionAtScanStart = None
###
def __repr__(self):
pos = self.getPosition()
formattedValues = self.formatPositionFields(pos)
if len(tuple(self.getInputNames()) + tuple(self.getExtraNames())) > 1:
result = self.getName() + ': '
else:
result = ''
names = tuple(self.getInputNames()) + tuple(self.getExtraNames())
for name, val in zip(names, formattedValues):
result += ' ' + name + ': ' + val
return result
###
def formatPositionFields(self, pos):
"""Returns position as array of formatted strings"""
# Make sure pos is a tuple or list
if type(pos) not in (tuple, list):
pos = tuple([pos])
# Sanity check
if len(pos) != len(self.getOutputFormat()):
raise Exception(
"In scannable '%s':number of position fields differs from "
"number format strings specified" % self.getName())
result = []
for field, format in zip(pos, self.getOutputFormat()):
if field is None:
result.append('???')
else:
s = (format % field)
## if width!=None:
## s = s.ljust(width)
result.append(s)
return result
###
def addScannableParts(self):
'''
Creates an array of MotionScannableParts each of which allows access to
the scannable's fields. See this class's documentation for more info.
'''
self.childrenDict = {}
# Add parts to access the input fields
for index in range(len(self.getInputNames())):
scannableName = self.getInputNames()[index]
self.childrenDict[scannableName] = self.MotionScannablePart(
scannableName, index, self, isInputField=1)
# Add parts to access the extra fields
for index in range(len(self.getExtraNames())):
scannableName = self.getExtraNames()[index]
self.childrenDict[scannableName] = self.MotionScannablePart(
scannableName, index + len(self.getInputNames()),
self, isInputField=0)
def asynchronousMoveTo(self, newpos):
if self.autoCompletePartialMoveToTargets:
newpos = self.completePosition(newpos)
ScannableBase.asynchronousMoveTo(self, newpos)
def completePosition(self, position):
'''
If position contains any null or None values, these are replaced with
the corresponding fields from the scannables current position and then
returned.'''
# Just return position if it does not need padding
if None not in position:
return position
if self.positionAtScanStart is not None:
basePosition = self.positionAtScanStart
else:
basePosition = self.getPosition()[:self.numInputFields]
for i in range(self.numInputFields):
if position[i] is None:
position[i] = basePosition[i]
return position
def __getattr__(self, name):
try:
return self.childrenDict[name]
except:
raise AttributeError("No child named:" + name)
def __getitem__(self, key):
'''Provides container like access from Jython'''
return self.childrenDict[key]
def getPart(self, name):
'''Returns the a compnent scannable'''
return self.childrenDict[name]
class MotionScannablePart(ScannableBase):
'''
A scannable to be placed in the parent's childrenDict that allows
access to the parent's individual fields.'''
def __init__(self, scannableName, index, parentScannable,
isInputField):
self.setName(scannableName)
if isInputField:
self.setInputNames([scannableName])
else:
self.setExtraNames([scannableName])
self.index = index
self.parentScannable = parentScannable
self.setOutputFormat(
[self.parentScannable.getOutputFormat()[index]])
def isBusy(self):
return self.parentScannable.isBusy()
def asynchronousMoveTo(self, new_position):
if self.parentScannable.isBusy():
raise Exception(
self.parentScannable.getName() + "." + self.getName() +
" cannot be moved because " +
self.parentScannable.getName() + " is already moving")
toMoveTo = [None] * len(self.parentScannable.getInputNames())
toMoveTo[self.index] = new_position
self.parentScannable.asynchronousMoveTo(toMoveTo)
def moveTo(self, new_position):
self.asynchronousMoveTo(new_position)
self.waitWhileBusy()
def getPosition(self):
return self.parentScannable.getPosition()[self.index]
def __str__(self):
return self.__repr__()
def __repr__(self):
# Get the name of this field
# (assume its an input field first and correct if wrong)
name = self.getInputNames()[0]
if name == 'value':
name = self.getExtraNames()[0]
parentName = self.parentScannable.getName()
return parentName + "." + name + " : " + str(self.getPosition())

View File

@@ -0,0 +1,62 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
try:
from gda.device.scannable import PseudoDevice
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableBase as PseudoDevice
class ScannableGroup(PseudoDevice):
def __init__(self, name, motorList):
self.setName(name)
# Set input format
motorNames = []
for scn in motorList:
motorNames.append(scn.getName())
self.setInputNames(motorNames)
# Set output format
format = []
for motor in motorList:
format.append(motor.getOutputFormat()[0])
self.setOutputFormat(format)
self.__motors = motorList
def asynchronousMoveTo(self, position):
# if input has any Nones, then replace these with the current positions
if None in position:
position = list(position)
current = self.getPosition()
for idx, val in enumerate(position):
if val is None:
position[idx] = current[idx]
for scn, pos in zip(self.__motors, position):
scn.asynchronousMoveTo(pos)
def getPosition(self):
return [scn.getPosition() for scn in self.__motors]
def isBusy(self):
for scn in self.__motors:
if scn.isBusy():
return True
return False

View File

@@ -0,0 +1,126 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
try:
from gda.device.scannable import ScannableMotionBase
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableBase as ScannableMotionBase
from diffcalc.util import getMessageFromException
# TODO: Split into a base class when making other scannables
class DiffractometerScannableGroup(ScannableMotionBase):
""""
Wraps up a scannableGroup of axis to tweak the way the resulting
object is displayed and to add a simulate move to method.
The scannable group should have the same geometry as that expected
by the diffractometer hardware geometry used in the diffraction
calculator.
The optional parameter slaveDriver can be used to provide a
slave_driver. This is useful for triggering a move of an incidental
axis whose position depends on that of the diffractometer, but whose
position need not be included in the DiffractometerScannableGroup
itself. This parameter is exposed as a field and can be set or
cleared to null at will without effecting the core calculation code.
"""
def __init__(self, name, diffcalc_module, scannableGroup,
slave_driver=None, hint_generator=None):
# if motorList is None, will create a dummy __group
self.diffcalc_module = diffcalc_module
self.__group = scannableGroup
self.slave_driver = slave_driver
self.setName(name)
self.hint_generator = hint_generator
def getInputNames(self):
return self.__group.getInputNames()
def getExtraNames(self):
if self.slave_driver is None:
return []
else:
return self.slave_driver.getScannableNames()
def getOutputFormat(self):
if self.slave_driver is None:
slave_formats = []
else:
slave_formats = self.slave_driver.getScannableNames()
return list(self.__group.getOutputFormat()) + slave_formats
def asynchronousMoveTo(self, position):
self.__group.asynchronousMoveTo(position)
if self.slave_driver is not None:
self.slave_driver.triggerAsynchronousMove(position)
def getPosition(self):
if self.slave_driver is None:
slave_positions = []
else:
slave_positions = self.slave_driver.getPositions()
return list(self.__group.getPosition()) + list(slave_positions)
def isBusy(self):
if self.slave_driver is None:
return self.__group.isBusy()
else:
return self.__group.isBusy() or self.slave_driver.isBusy()
def waitWhileBusy(self):
self.__group.waitWhileBusy()
if self.slave_driver is not None:
self.slave_driver.waitWhileBusy()
def simulateMoveTo(self, pos):
if len(pos) != len(self.getInputNames()):
raise ValueError('Wrong number of inputs')
try:
(hkl, params) = self.diffcalc_module.angles_to_hkl(pos)
except Exception, e:
return "Error: %s" % getMessageFromException(e)
width = max(len(k) for k in params)
lines = ([' ' + 'hkl'.rjust(width) + ' : % 9.4f %.4f %.4f' %
(hkl[0], hkl[1], hkl[2])])
lines[-1] = lines[-1] + '\n'
fmt = ' %' + str(width) + 's : % 9.4f'
for k in sorted(params):
lines.append(fmt % (k, params[k]))
return '\n'.join(lines)
def __repr__(self):
position = self.getPosition()
names = list(self.getInputNames()) + list(self.getExtraNames())
if self.hint_generator is None:
hint_list = [''] * len(self.getInputNames())
else:
hint_list = self.hint_generator()
lines = [self.name + ':']
width = max(len(k) for k in names)
fmt = ' %' + str(width) + 's : % 9.4f %s'
for name, pos, hint in zip(names, position, hint_list):
lines.append(fmt % (name, pos, hint))
lines[len(self.getInputNames())] += '\n'
return '\n'.join(lines)

View File

@@ -0,0 +1,135 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
import platform
DEBUG = False
try:
from gda.device.scannable.scannablegroup import \
ScannableMotionWithScannableFieldsBase
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableMotionWithScannableFieldsBase
from diffcalc.util import getMessageFromException, DiffcalcException
class _DynamicDocstringMetaclass(type):
def _get_doc(self):
return Hkl.dynamic_docstring
__doc__ = property(_get_doc) # @ReservedAssignment
class Hkl(ScannableMotionWithScannableFieldsBase):
if platform.system() != 'Java':
__metaclass__ = _DynamicDocstringMetaclass # TODO: Removed to fix Jython
dynamic_docstring = 'Hkl Scannable'
def _get_doc(self):
return Hkl.dynamic_docstring
__doc__ = property(_get_doc) # @ReservedAssignment
def __init__(self, name, diffractometerObject, diffcalcObject,
virtualAnglesToReport=None):
self.diffhw = diffractometerObject
self._diffcalc = diffcalcObject
if type(virtualAnglesToReport) is str:
virtualAnglesToReport = (virtualAnglesToReport,)
self.vAngleNames = virtualAnglesToReport
self.setName(name)
self.setInputNames(['h', 'k', 'l'])
self.setOutputFormat(['%7.5f'] * 3)
if self.vAngleNames:
self.setExtraNames(self.vAngleNames)
self.setOutputFormat(['%7.5f'] * (3 + len(self.vAngleNames)))
self.completeInstantiation()
self.setAutoCompletePartialMoveToTargets(True)
self.dynamic_class_doc = 'Hkl Scannable xyz'
def rawAsynchronousMoveTo(self, hkl):
if len(hkl) != 3: raise ValueError('Hkl device expects three inputs')
try:
(pos, _) = self._diffcalc.hkl_to_angles(hkl[0], hkl[1], hkl[2])
except DiffcalcException, e:
if DEBUG:
raise
else:
raise DiffcalcException(e.message)
self.diffhw.asynchronousMoveTo(pos)
def rawGetPosition(self):
pos = self.diffhw.getPosition() # a tuple
(hkl , params) = self._diffcalc.angles_to_hkl(pos)
result = list(hkl)
if self.vAngleNames:
for vAngleName in self.vAngleNames:
result.append(params[vAngleName])
return result
def getFieldPosition(self, i):
return self.getPosition()[i]
def isBusy(self):
return self.diffhw.isBusy()
def waitWhileBusy(self):
return self.diffhw.waitWhileBusy()
def simulateMoveTo(self, hkl):
if type(hkl) not in (list, tuple):
raise ValueError('Hkl device expects three inputs')
if len(hkl) != 3:
raise ValueError('Hkl device expects three inputs')
(pos, params) = self._diffcalc.hkl_to_angles(hkl[0], hkl[1], hkl[2])
width = max(len(k) for k in (params.keys() + list(self.diffhw.getInputNames())))
fmt = ' %' + str(width) + 's : % 9.4f'
lines = [self.diffhw.getName() + ' would move to:']
for idx, name in enumerate(self.diffhw.getInputNames()):
lines.append(fmt % (name, pos[idx]))
lines[-1] = lines[-1] + '\n'
for k in sorted(params):
lines.append(fmt % (k, params[k]))
return '\n'.join(lines)
def __str__(self):
return self.__repr__()
def __repr__(self):
lines = ['hkl:']
pos = self.diffhw.getPosition()
try:
(hkl, params) = self._diffcalc.angles_to_hkl(pos)
except Exception, e:
return "<hkl: %s>" % getMessageFromException(e)
width = max(len(k) for k in params)
lines.append(' ' + 'hkl'.rjust(width) + ' : %9.4f %.4f %.4f' % (hkl[0], hkl[1], hkl[2]))
lines[-1] = lines[-1] + '\n'
fmt = ' %' + str(width) + 's : % 9.4f'
for k in sorted(params):
lines.append(fmt % (k, params[k]))
return '\n'.join(lines)

View File

@@ -0,0 +1,47 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
try:
from gda.device.scannable import ScannableMotionBase
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableBase as ScannableMotionBase
class MockMotor(ScannableMotionBase):
def __init__(self, name='mock'):
self.pos = 0.0
self._busy = False
self.name = name
def asynchronousMoveTo(self, pos):
self._busy = True
self.pos = float(pos)
def getPosition(self):
return self.pos
def isBusy(self):
return self._busy
def makeNotBusy(self):
self._busy = False
def getOutputFormat(self):
return ['%f']

View File

@@ -0,0 +1,45 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
try:
from gda.device.scannable import ScannableMotionBase
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableBase as ScannableMotionBase
class DiffractionCalculatorParameter(ScannableMotionBase):
def __init__(self, name, parameterName, parameter_manager):
self.parameter_manager = parameter_manager
self.parameterName = parameterName
self.setName(name)
self.setInputNames([parameterName])
self.setOutputFormat(['%5.5f'])
self.setLevel(3)
def asynchronousMoveTo(self, value):
self.parameter_manager.set_constraint(self.parameterName, value)
def getPosition(self):
return self.parameter_manager.get_constraint(self.parameterName)
def isBusy(self):
return False

View File

@@ -0,0 +1,21 @@
'''
Created on 7 May 2016
@author: walton
'''
from diffcalc.util import allnum
def sim(scn, hkl):
"""sim hkl scn -- simulates moving scannable (not all)
"""
if not isinstance(hkl, (tuple, list)):
raise TypeError
if not allnum(hkl):
raise TypeError()
try:
print scn.simulateMoveTo(hkl)
except AttributeError:
raise TypeError(
"The first argument does not support simulated moves")

View File

@@ -0,0 +1,139 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
import time
from math import sqrt, pi, exp
try:
from gda.device.scannable import PseudoDevice
except ImportError:
from diffcalc.gdasupport.minigda.scannable import \
ScannableBase as PseudoDevice
from diffcalc.ub.crystal import CrystalUnderTest
from diffcalc.hkl.you.calc import youAnglesToHkl
from diffcalc.hkl.vlieg.calc import vliegAnglesToHkl
from diffcalc.hkl.you.geometry import calcCHI, calcPHI
TORAD = pi / 180
TODEG = 180 / pi
class Equation(object):
def __call__(self, dh, dk, dl):
raise Exception('Abstract')
def __str__(self):
"Abstract equation"
class Gaussian(Equation):
def __init__(self, variance):
self.variance = float(variance)
def __call__(self, dh, dk, dl):
dr_squared = dh * dh + dk * dk + dl * dl
return (1 / sqrt(2 * pi * self.variance) *
exp(-dr_squared / (2 * self.variance)))
class SimulatedCrystalCounter(PseudoDevice):
def __init__(self, name, diffractometerScannable, geometryPlugin,
wavelengthScannable, equation=Gaussian(.01), engine='you'):
self.setName(name)
self.setInputNames([name + '_count'])
self.setOutputFormat(['%7.5f'])
self.exposureTime = 1
self.pause = True
self.diffractometerScannable = diffractometerScannable
self.geometry = geometryPlugin
self.wavelengthScannable = wavelengthScannable
self.equation = equation
self.engine = engine
self.cut = None
self.UB = None
self.chiMissmount = 0.
self.phiMissmount = 0.
self.setCrystal('cubic', 1, 1, 1, 90, 90, 90)
def setCrystal(self, name, a, b, c, alpha, beta, gamma):
self.cut = CrystalUnderTest(name, a, b, c, alpha, beta, gamma)
self.calcUB()
def setChiMissmount(self, chi):
self.chiMissmount = chi
self.calcUB()
def setPhiMissmount(self, phi):
self.phiMissmount = phi
self.calcUB()
def calcUB(self):
CHI = calcCHI(self.chiMissmount * TORAD)
PHI = calcPHI(self.phiMissmount * TORAD)
self.UB = CHI * PHI * self.cut.B
def asynchronousMoveTo(self, exposureTime):
self.exposureTime = exposureTime
if self.pause:
time.sleep(exposureTime) # Should not technically block!
def getPosition(self):
h, k, l = self.getHkl()
dh, dk, dl = h - round(h), k - round(k), l - round(l)
count = self.equation(dh, dk, dl)
#return self.exposureTime, count*self.exposureTime
return count * self.exposureTime
def getHkl(self):
pos = self.geometry.physical_angles_to_internal_position(
self.diffractometerScannable.getPosition())
pos.changeToRadians()
wavelength = self.wavelengthScannable.getPosition()
if self.engine.lower() == 'vlieg':
return vliegAnglesToHkl(pos, wavelength, self.UB)
elif self.engine.lower() == 'you':
return youAnglesToHkl(pos, wavelength, self.UB)
else:
raise ValueError(self.engine)
def isBusy(self):
return False
def __str__(self):
return self.__repr__()
def __repr__(self):
s = 'simulated crystal detector: %s\n' % self.getName()
h, k, l = self.getHkl()
s += ' h : %f\n' % h
s += ' k : %f\n' % k
s += ' l : %f\n' % l
s += self.cut.__str__() + '\n'
s += "chi orientation: %s\n" % self.chiMissmount
s += "phi orientation: %s\n" % self.phiMissmount
ub = self.UB.tolist()
s += "UB:\n"
s += " % 18.13f% 18.13f% 18.12f\n" % (ub[0][0], ub[0][1], ub[0][2])
s += " % 18.13f% 18.13f% 18.12f\n" % (ub[1][0], ub[1][1], ub[1][2])
s += " % 18.13f% 18.13f% 18.12f\n" % (ub[2][0], ub[2][1], ub[2][2])
return s

View File

@@ -0,0 +1,109 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
from math import pi, tan, sin, atan, cos, atan2
TORAD = pi / 180
TODEG = 180 / pi
class SlaveScannableDriver(object):
def __init__(self, scannables):
self.scannables = scannables
def isBusy(self):
for scn in self.scannables:
if scn.isBusy():
return True
return False
def waitWhileBusy(self):
for scn in self.scannables:
scn.waitWhileBusy()
def triggerAsynchronousMove(self, triggerPos):
nu = self.slaveFromTriggerPos(triggerPos)
for scn in self.scannables:
scn.asynchronousMoveTo(nu)
def getPosition(self):
return self.scannables[0].getPosition()
def slaveFromTriggerPos(self, triggerPos):
raise Exception("Abstract")
def getScannableNames(self):
return [scn.name for scn in self.scannables]
def getOutputFormat(self):
return [list(scn.outputFormat)[0] for scn in self.scannables]
def getPositions(self):
return [float(scn.getPosition()) for scn in self.scannables]
"""
Based on: Elias Vlieg, "A (2+3)-Type Surface Diffractometer: Mergence of the
z-axis and (2+2)-Type Geometries", J. Appl. Cryst. (1998). 31. 198-203
"""
class NuDriverForSixCirclePlugin(SlaveScannableDriver):
def slaveFromTriggerPos(self, triggerPos):
alpha, delta, gamma, _, _, _ = triggerPos
alpha = alpha * TORAD
delta = delta * TORAD
gamma = gamma * TORAD
### Equation16 RHS ###
rhs = -1 * tan(gamma - alpha) * sin(delta)
nu = atan(rhs) # -pi/2 <= nu <= pi/2
return nu * TODEG
class NuDriverForWillmottHorizontalGeometry(SlaveScannableDriver):
"""
Based on: Phillip Willmott, "Angle calculations for a (2+3)-type
diffractometer: focus on area detectors", J. Appl. Cryst. (2011). 44.
73-83
"""
def __init__(self, scannables, area_detector=False):
SlaveScannableDriver.__init__(self, scannables)
self.area_detector = area_detector
def slaveFromTriggerPos(self, triggerPos):
delta, gamma, omegah, _ = triggerPos
delta *= TORAD
gamma *= TORAD
omegah *= TORAD
if self.area_detector:
nu = atan2(sin(delta - omegah), tan(gamma)) # (66)
else:
top = -sin(gamma) * sin(omegah)
bot = (sin(omegah) * cos(gamma) * sin(delta) +
cos(omegah) * cos(delta))
nu = atan2(top, bot) # (61)
print 'nu:', nu * TODEG
return nu * TODEG

View File

@@ -0,0 +1,184 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
import time
import threading
import socket
PORT = 4567
from gda.device.scannable import ScannableMotionWithScannableFieldsBaseTest
#import scannable.vrmlModelDriver
#reload(scannable.vrmlModelDriver);from scannable.vrmlModelDriver import \
# VrmlModelDriver, LinearProfile, MoveThread
#fc=VrmlModelDriver(
# 'fc',['alpha','delta','omega', 'chi','phi'], speed=30, host='diamrl5104')
#alpha = fc.alpha
#delta = fc.delta
#omega = fc.omega
#chi = fc.chi
#phi = fc.phi
def connect_to_socket(host, port):
print "Connecting to %s on port %d" % (host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect((host, port))
print "Connected"
socketfile = sock.makefile('rw', 0)
return socketfile
class LinearProfile(object):
def __init__(self, v, t_accel, startList, endList):
assert len(startList) == len(endList)
self.v = float(v)
self.start = startList
self.end = endList
self.t_accel = t_accel
distances = [e - s for e, s in zip(self.end, self.start)]
max_distance = max([abs(d) for d in distances])
if max_distance == 0:
self.delta_time = 0
else:
self.delta_time = abs(max_distance / self.v)
self.speeds = [d / self.delta_time for d in distances]
self.start_time = time.time()
def getPosition(self):
if self.start_time is None:
return self.start
if not self.isMoving():
return self.end
t = abs(float(time.time() - self.start_time))
if t > self.delta_time:
# we are in the deceleration phase (i.e paused for now)
return self.end
return [s + v * t for s, v in zip(self.start, self.speeds)]
def isMoving(self):
return time.time() < self.start_time + self.delta_time + self.t_accel
class MoveThread(threading.Thread):
def __init__(self, profile, socketfile, axisNames):
threading.Thread.__init__(self)
self.profile = profile
self.socketfile = socketfile
self.axisNames = axisNames
def run(self):
while self.profile.isMoving():
self.update()
time.sleep(.1)
self.update()
def update(self):
pos = self.profile.getPosition()
d = dict(zip(map(str, self.axisNames), pos))
if self.socketfile:
self.socketfile.write(repr(d) + '\n')
class VrmlModelDriver(ScannableMotionWithScannableFieldsBaseTest):
def __init__(self, name, axes_names, host=None, speed=60, t_accel=.1,
format='%.3f'):
self.name = name
self.inputNames = list(axes_names)
self.extraNames = []
self.outputFormat = [format] * len(self.inputNames)
self.completeInstantiation()
self.__last_target = [0.] * len(self.inputNames)
self.verbose = False
self.move_thread = None
self.speed = speed
self.host = host
self.t_accel = t_accel
self.socketfile = None
if self.host:
try:
self.connect()
except socket.error:
print "Failed to connect to %s:%r" % (self.host, PORT)
print "Connect with: %s.connect()" % self.name
def connect(self):
self.socketfile = connect_to_socket(self.host, PORT)
self.rawAsynchronousMoveTo(self.__last_target)
def isBusy(self):
if self.move_thread is None:
return False
return self.move_thread.profile.isMoving()
def rawGetPosition(self):
if self.move_thread is None:
return self.__last_target
else:
return self.move_thread.profile.getPosition()
def rawAsynchronousMoveTo(self, targetList):
if self.isBusy():
raise Exception(self.name + ' is already moving')
if self.verbose:
print self.name + ".rawAsynchronousMoveTo(%r)" % targetList
for i, target in enumerate(targetList):
if target is None:
targetList[i] = self.__last_target[i]
profile = LinearProfile(
self.speed, self.t_accel, self.__last_target, targetList)
self.move_thread = MoveThread(
profile, self.socketfile, self.inputNames)
self.move_thread.start()
self.__last_target = targetList
def getFieldPosition(self, index):
return self.getPosition()[index]
def __del__(self):
self.socketfile.close()
#class TrapezoidProfile(object):
#
# def __init__(self, t_accel, v_max, delta_x):
# self.t_a = t_accel
# self.v_m = v_max
# self.delta_x = delta_x
#
# self.t_c = (self.X - self.v_m*self.t_a) / self.v_m
#
# def x(self, t):
# if self.t_c <=0:
# return self.__xshort(t)
# else:
# return self.__xlong(t)
#
# def __xshort(self, t):
# delta_t = 2 * sqrt(self.delta_x*self.t_a/self.v_m)
# if t <= .5*delta_t:
# return (.5*self.v_m/self.t_a) * t**2
# else:
# v_peak = (self.v_m/self.t_a) * .5*delta_t
# return (t-.5*delta_t)*v_peak - (t-.5*delta_t)**2 ####HERE, bugged
# self.delta_x/2

View File

@@ -0,0 +1,50 @@
###
# Copyright 2008-2011 Diamond Light Source Ltd.
# This file is part of Diffcalc.
#
# Diffcalc is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Diffcalc is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Diffcalc. If not, see <http://www.gnu.org/licenses/>.
###
try:
from gdascripts.pd.dummy_pds import DummyPD
except ImportError:
from diffcalc.gdasupport.minigda.scannable import DummyPD
class Wavelength(DummyPD):
def __init__(self, name, energyScannable,
energyScannableMultiplierToGetKeV=1):
self.energyScannable = energyScannable
self.energyScannableMultiplierToGetKeV = \
energyScannableMultiplierToGetKeV
DummyPD.__init__(self, name)
def asynchronousMoveTo(self, pos):
self.energyScannable.asynchronousMoveTo(
(12.39842 / pos) / self.energyScannableMultiplierToGetKeV)
def getPosition(self):
energy = self.energyScannable.getPosition()
if energy == 0:
raise Exception(
"The energy is 0, so no wavelength could be calculated.run_All()")
return 12.39842 / (energy * self.energyScannableMultiplierToGetKeV)
def isBusy(self):
return self.energyScannable.isBusy()
def waitWhileBusy(self):
return self.energyScannable.waitWhileBusy()

View File

@@ -0,0 +1,116 @@
from diffcalc.gdasupport.scannable.diffractometer import DiffractometerScannableGroup
from diffcalc.gdasupport.scannable.hkl import Hkl
from diffcalc.gdasupport.scannable.simulation import SimulatedCrystalCounter
from diffcalc.gdasupport.scannable.wavelength import Wavelength
from diffcalc.gdasupport.scannable.parameter import DiffractionCalculatorParameter
from diffcalc.dc import dcyou as _dc
from diffcalc.dc.help import format_command_help
reload(_dc)
from diffcalc.dc.dcyou import * # @UnusedWildImport
from diffcalc import settings
try:
import gda # @UnusedImport @UnresolvedImport
GDA = True
except:
GDA = False
if not GDA:
from diffcalc.gdasupport.minigda import command
_pos = command.Pos()
_scan = command.Scan(command.ScanDataPrinter())
def pos(*args):
"""
pos show position of all Scannables
pos scn show position of scn
pos scn targetmove scn to target (a number)
"""
return _pos(*args)
def scan(*args):
"""
scan scn start stop step {scn {target}} {det t}
"""
return _scan(*args)
from diffcalc.gdasupport.scannable.sim import sim # @UnusedImport
_scn_group = settings.axes_scannable_group
_diff_scn_name = settings.geometry.name # @UndefinedVariable
_energy_scannable = settings.energy_scannable
# Create diffractometer scannable
_diff_scn = DiffractometerScannableGroup(_diff_scn_name, _dc, _scn_group)
globals()[_diff_scn_name] = _diff_scn
# Create hkl scannables
hkl = Hkl('hkl', _scn_group, _dc)
h = hkl.h
k = hkl.k
l = hkl.l
Hkl.dynamic_docstring = format_command_help(hkl_commands_for_help) # must be on the class
ub.__doc__ = format_command_help(ub_commands_for_help)
_virtual_angles = ('theta', 'qaz', 'alpha', 'naz', 'tau', 'psi', 'beta')
hklverbose = Hkl('hklverbose', _scn_group, _dc, _virtual_angles)
# Create wavelength scannable
wl = Wavelength(
'wl', _energy_scannable, settings.energy_scannable_multiplier_to_get_KeV)
###GOBBO
#if not GDA:
# wl.asynchronousMoveTo(1) # Angstrom
_energy_scannable.level = 3
wl.level = 3
# Create simulated counter timer
ct = SimulatedCrystalCounter('ct', _scn_group, settings.geometry, wl)
ct.level = 10
# Create constraint scannables
def _create_constraint_scannable(con_name, scn_name=None):
if not scn_name:
scn_name = con_name
return DiffractionCalculatorParameter(
scn_name, con_name, _dc.constraint_manager)
# Detector constraints
def isconstrainable(name):
return not constraint_manager.is_constraint_fixed(name)
if isconstrainable('delta'): delta_con = _create_constraint_scannable('delta', 'delta_con')
if isconstrainable('gam'): gam_con = _create_constraint_scannable('gam', 'gam_con')
if isconstrainable('qaz'): qaz = _create_constraint_scannable('qaz')
if isconstrainable('naz'): naz = _create_constraint_scannable('naz')
# Reference constraints
alpha = _create_constraint_scannable('alpha')
beta = _create_constraint_scannable('beta')
psi = _create_constraint_scannable('psi')
a_eq_b = 'a_eq_b'
# Sample constraints
if isconstrainable('mu'): mu_con = _create_constraint_scannable('mu', 'mu_con')
if isconstrainable('eta'): eta_con = _create_constraint_scannable('eta', 'eta_con')
if isconstrainable('chi'): chi_con = _create_constraint_scannable('chi', 'chi_con')
if isconstrainable('phi'): phi_con = _create_constraint_scannable('phi', 'phi_con')
if isconstrainable('mu') and isconstrainable('gam'): mu_is_gam = 'mu_is_gam'
# Cleanup to allow "from gdasupport.you import *"
del DiffractometerScannableGroup, Hkl, SimulatedCrystalCounter
del Wavelength, DiffractionCalculatorParameter
# Cleanup other cruft
del format_command_help