First Aerotech scan works
This commit is contained in:
2107
pxiii_bec/config_saved.yaml
Normal file
2107
pxiii_bec/config_saved.yaml
Normal file
File diff suppressed because it is too large
Load Diff
2094
pxiii_bec/config_saved.yaml~
Normal file
2094
pxiii_bec/config_saved.yaml~
Normal file
File diff suppressed because it is too large
Load Diff
@@ -220,25 +220,25 @@ class AerotechAbrStage(Device):
|
||||
# Set the corresponding global variables
|
||||
if "scan_command" in d:
|
||||
self.scan_command.set(d['scan_command']).wait()
|
||||
if "var_1" in d:
|
||||
if "var_1" in d and d['var_1'] is not None:
|
||||
self._var_1.set(d['var_1']).wait()
|
||||
if "var_2" in d:
|
||||
if "var_2" in d and d['var_2'] is not None:
|
||||
self._var_2.set(d['var_2']).wait()
|
||||
if "var_3" in d:
|
||||
if "var_3" in d and d['var_3'] is not None:
|
||||
self._var_3.set(d['var_3']).wait()
|
||||
if "var_4" in d:
|
||||
if "var_4" in d and d['var_4'] is not None:
|
||||
self._var_4.set(d['var_4']).wait()
|
||||
if "var_5" in d:
|
||||
if "var_5" in d and d['var_5'] is not None:
|
||||
self._var_5.set(d['var_5']).wait()
|
||||
if "var_6" in d:
|
||||
if "var_6" in d and d['var_6'] is not None:
|
||||
self._var_6.set(d['var_6']).wait()
|
||||
if "var_7" in d:
|
||||
if "var_7" in d and d['var_7'] is not None:
|
||||
self._var_7.set(d['var_7']).wait()
|
||||
if "var_8" in d:
|
||||
if "var_8" in d and d['var_8'] is not None:
|
||||
self._var_8.set(d['var_8']).wait()
|
||||
if "var_9" in d:
|
||||
if "var_9" in d and d['var_9'] is not None:
|
||||
self._var_9.set(d['var_9']).wait()
|
||||
if "var_10" in d:
|
||||
if "var_10" in d and d['var_10'] is not None:
|
||||
self._var_10.set(d['var_10']).wait()
|
||||
|
||||
new = self.read_configuration()
|
||||
|
||||
@@ -1 +1 @@
|
||||
from .mx_measurements import MeasureStandardWedge
|
||||
from .mx_measurements import MeasureFastOmega
|
||||
@@ -41,21 +41,66 @@ class AbrCmd:
|
||||
SCAN_SASTT_V3 = 21
|
||||
|
||||
|
||||
class MxFlyscanBase(AsyncFlyScanBase):
|
||||
|
||||
class AerotechFlyscanBase(AsyncFlyScanBase):
|
||||
""" Base class for MX flyscans
|
||||
|
||||
I'd be quite easy to improve standardization.
|
||||
Low-level base class for standard scans at the PX beamlines at SLS. Theese scans use the
|
||||
A3200 rotation stage and the actual experiment is performed using an AeroBasic script
|
||||
controlled via global variables.
|
||||
"""
|
||||
scan_report_hint = "table"
|
||||
arg_input = {}
|
||||
arg_bundle_size = {"bundle": len(arg_input), "min": None, "max": None}
|
||||
|
||||
# Aerotech stage config
|
||||
abr_command = None
|
||||
abr_globals = {}
|
||||
abr_reset = False
|
||||
abr_raster_reset =False
|
||||
abr_complete = False
|
||||
abr_timeout = None
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
""" Just set num_pos=0 to avoid hanging
|
||||
""" Just set num_pos=0 to avoid hanging and override defaults if explicitly set from
|
||||
parameters.
|
||||
"""
|
||||
self.num_pos = 0
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
# Override if explicitly passed as parameter
|
||||
if "abr_command" in self.caller_kwargs:
|
||||
self.abr_command = self.caller_kwargs.get("abr_command")
|
||||
if "abr_globals" in self.caller_kwargs:
|
||||
self.abr_globals = self.caller_kwargs.get("abr_globals")
|
||||
if "abr_reset" in self.caller_kwargs:
|
||||
self.abr_reset = self.caller_kwargs.get("abr_reset")
|
||||
if "abr_raster_reset" in self.caller_kwargs:
|
||||
self.abr_raster_reset = self.caller_kwargs.get("abr_raster_reset")
|
||||
if "abr_complete" in self.caller_kwargs:
|
||||
self.abr_complete = self.caller_kwargs.get("abr_complete")
|
||||
if "abr_timeout" in self.caller_kwargs:
|
||||
self.abr_timeout = self.caller_kwargs.get("abr_timeout")
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset the scan engine
|
||||
if self.abr_reset:
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "reset")
|
||||
if self.abr_raster_reset:
|
||||
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.abr_command}
|
||||
d.update(self.abr_globals)
|
||||
logger.info(d)
|
||||
# Configure
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
|
||||
def stage(self):
|
||||
""" ToDo: Sot sure if we should call super() here as it'd stage the whole beamline"""
|
||||
return super().stage()
|
||||
@@ -65,10 +110,13 @@ class MxFlyscanBase(AsyncFlyScanBase):
|
||||
"""
|
||||
self.pointID = 0
|
||||
# Kick off the run
|
||||
yield from self.stubs.command("abr", "start_command.set", 1)
|
||||
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "start_command.set", 1)
|
||||
logger.info("Measurement launched on the ABR stage...")
|
||||
# ToDo: Wait for scan to finish
|
||||
|
||||
# Wait for grid scanner to finish
|
||||
if self.abr_complete and self.abr_timeout is not None:
|
||||
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.abr_timeout)
|
||||
st.wait()
|
||||
|
||||
def cleanup(self):
|
||||
"""Set scan progress to 1 to finish the scan"""
|
||||
@@ -77,7 +125,7 @@ class MxFlyscanBase(AsyncFlyScanBase):
|
||||
|
||||
|
||||
|
||||
class MeasureStandardWedge(MxFlyscanBase):
|
||||
class MeasureStandardWedge(AerotechFlyscanBase):
|
||||
""" Standard wedge scan
|
||||
|
||||
Measure a standard continous line scan from `start` to `start` + `range`
|
||||
@@ -105,33 +153,25 @@ class MeasureStandardWedge(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "scan_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.MEASURE_STANDARD
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = float(self.caller_kwargs.get("scan_time"))
|
||||
self.scan_ready_rate = float(self.caller_kwargs.get("ready_rate", 500))
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_ready_rate = parameter['kwargs'].get("ready_rate", 500)
|
||||
self.abr_command = AbrCmd.MEASURE_STANDARD
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_scan_time,
|
||||
"var_4" : self.scan_ready_rate,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureScanSlits(MxFlyscanBase):
|
||||
class MeasureScanSlits(AerotechFlyscanBase):
|
||||
""" Slit scan
|
||||
|
||||
Measure a standard slit scan.
|
||||
@@ -156,31 +196,23 @@ class MeasureScanSlits(MxFlyscanBase):
|
||||
required_kwargs = ["delta_x", "delta_y", "velocity"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
|
||||
self.scan_command = AbrCmd.SLIT_SCAN
|
||||
self.scan_delta_x = self.caller_kwargs.get("delta_x")
|
||||
self.scan_delta_y = self.caller_kwargs.get("delta_y")
|
||||
self.scan_velocity = self.caller_kwargs.get("velocity")
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.SLIT_SCAN
|
||||
self.scan_delta_x = parameter['kwargs'].get("delta_x")
|
||||
self.scan_delta_y = parameter['kwargs'].get("delta_y")
|
||||
self.scan_velocity = parameter['kwargs'].get("velocity")
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_delta_x,
|
||||
"var_2" : self.scan_delta_y,
|
||||
"var_3" : self.scan_velocity,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureRasterSimple(MxFlyscanBase):
|
||||
class MeasureRasterSimple(AerotechFlyscanBase):
|
||||
""" Simple raster scan
|
||||
|
||||
Measure a simplified raster scan, assuming the goniometer is currently
|
||||
@@ -210,23 +242,16 @@ class MeasureRasterSimple(MxFlyscanBase):
|
||||
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
|
||||
self.scan_command = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
|
||||
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
|
||||
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
|
||||
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset done flag
|
||||
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_stepsize_x = parameter['kwargs'].get("cell_width")
|
||||
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("nrows")
|
||||
self.abr_command = AbrCmd.RASTER_SCAN_SIMPLE
|
||||
self.abr_raster_reset = True
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_exp_time,
|
||||
"var_2" : self.scan_stepsize_x,
|
||||
"var_3" : self.scan_stepsize_y,
|
||||
@@ -236,17 +261,12 @@ class MeasureRasterSimple(MxFlyscanBase):
|
||||
"var_7" : 0,
|
||||
"var_8" : 0,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# ToDo: This line was left out
|
||||
#self.raster._raster_num_rows = nrows
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureSASTT(MxFlyscanBase):
|
||||
class MeasureSASTT(AerotechFlyscanBase):
|
||||
""" Small angle scanning tensor tomography scan
|
||||
|
||||
Measure a Small Angle Scanning Tensor Tomography scan on the specified
|
||||
@@ -288,54 +308,41 @@ class MeasureSASTT(MxFlyscanBase):
|
||||
required_kwargs = ["exp_time", "cell_width", "cell_height", "ncols", "nrows"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
|
||||
self.scan_version = int(self.caller_kwargs.get("version", 1))
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_version = int(parameter['kwargs'].get("version", 1))
|
||||
if self.scan_version==1:
|
||||
self.scan_command = AbrCmd.SCAN_SASTT
|
||||
self.abr_command = AbrCmd.SCAN_SASTT
|
||||
if self.scan_version==2:
|
||||
self.scan_command = AbrCmd.SCAN_SASTT_V2
|
||||
self.abr_command = AbrCmd.SCAN_SASTT_V2
|
||||
if self.scan_version==3:
|
||||
self.scan_command = AbrCmd.SCAN_SASTT_V3
|
||||
self.abr_command = AbrCmd.SCAN_SASTT_V3
|
||||
else:
|
||||
raise RuntimeError(f"Non existing SAS-TT scan mode: {self.scan_version}")
|
||||
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_stepsize_x = self.caller_kwargs.get("cell_width")
|
||||
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
|
||||
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
|
||||
self.scan_stepnum_y = int(self.caller_kwargs.get("nrows"))
|
||||
self.scan_even_tweak = self.caller_kwargs.get("even_tweak", 0)
|
||||
self.scan_odd_tweak = self.caller_kwargs.get("off_tweak", 0)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset done flag
|
||||
yield from self.stubs.command("abr", "raster_scan_done.set", 0)
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
"var_1" : self.scan_exp_time,
|
||||
"var_2" : self.scan_stepsize_x,
|
||||
"var_3" : self.scan_stepsize_y,
|
||||
"var_4" : self.scan_stepnum_x,
|
||||
"var_5" : self.scan_stepnum_y,
|
||||
"var_6" : self.scan_even_tweak,
|
||||
"var_7" : self.scan_odd_tweak,
|
||||
"var_8" : 0,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# ToDo: This line was left out
|
||||
# self.raster._raster_num_rows = nrows
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_stepsize_x = parameter['kwargs'].get("cell_width")
|
||||
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("nrows")
|
||||
self.scan_even_tweak = parameter['kwargs'].get("even_tweak", 0)
|
||||
self.scan_odd_tweak = parameter['kwargs'].get("odd_tweak", 0)
|
||||
self.abr_globals = {
|
||||
'var_1': self.scan_exp_time,
|
||||
'var_2': self.scan_stepsize_x,
|
||||
'var_3': self.scan_stepsize_y,
|
||||
'var_4': self.scan_stepnum_x,
|
||||
'var_5': self.scan_stepnum_y,
|
||||
'var_6': self.scan_even_tweak,
|
||||
'var_7': self.scan_odd_tweak,
|
||||
'var_8': 0,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureVerticalLine(MxFlyscanBase):
|
||||
class MeasureVerticalLine(AerotechFlyscanBase):
|
||||
""" Vertical line scan using the GMY motor
|
||||
|
||||
Simple fly scan that records a single vertical line.
|
||||
@@ -361,31 +368,23 @@ class MeasureVerticalLine(MxFlyscanBase):
|
||||
required_kwargs = ["cell_time", "cell_height", "cell_num"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['gmy']
|
||||
|
||||
self.scan_command = AbrCmd.VERTICAL_LINE_SCAN
|
||||
self.scan_cell_time = self.caller_kwargs.get("cell_time")
|
||||
self.scan_stepsize_y = self.caller_kwargs.get("cell_height")
|
||||
self.scan_stepnum_y = int(self.caller_kwargs.get("cell_num"))
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
"var_1" : self.scan_stepsize_y,
|
||||
"var_2" : self.scan_stepnum_y,
|
||||
"var_3" : self.scan_cell_time,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_cell_time = parameter['kwargs'].get("cell_time")
|
||||
self.scan_stepsize_y = parameter['kwargs'].get("cell_height")
|
||||
self.scan_stepnum_y = parameter['kwargs'].get("cell_num")
|
||||
self.abr_command = AbrCmd.VERTICAL_LINE_SCAN
|
||||
self.abr_globals = {
|
||||
'var_1': self.scan_cell_time,
|
||||
'var_2': self.scan_stepsize_y,
|
||||
'var_3': self.scan_stepnum_y,
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureScreening(MxFlyscanBase):
|
||||
class MeasureScreening(AerotechFlyscanBase):
|
||||
""" Sample screening scan
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -412,22 +411,16 @@ class MeasureScreening(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "scan_time", "degrees", "frames"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['gmy']
|
||||
|
||||
self.scan_command = AbrCmd.SCREENING
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = self.caller_kwargs.get("scan_time")
|
||||
self.scan_degrees = self.caller_kwargs.get("degrees")
|
||||
self.scan_frames = self.caller_kwargs.get("frames")
|
||||
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_degrees = parameter['kwargs'].get("degrees")
|
||||
self.scan_frames = parameter['kwargs'].get("frames")
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
|
||||
self.abr_command = AbrCmd.SCREENING
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_scan_time,
|
||||
@@ -435,14 +428,12 @@ class MeasureScreening(MxFlyscanBase):
|
||||
"var_5" : self.scan_frames,
|
||||
"var_6" : self.scan_delta,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from super().pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureFastOmega(MxFlyscanBase):
|
||||
class MeasureFastOmega(AerotechFlyscanBase):
|
||||
""" Fast omega scan
|
||||
|
||||
MEasures a fast rotational scan with the rotation stage (omega).
|
||||
@@ -469,33 +460,26 @@ class MeasureFastOmega(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "scan_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.SUPER_FAST_OMEGA
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = self.caller_kwargs.get("scan_time")
|
||||
self.scan_rate = self.caller_kwargs.get("rate", 200)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
logger.info(parameter)
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_rate = parameter['kwargs'].get("rate", 200)
|
||||
self.abr_command = AbrCmd.SUPER_FAST_OMEGA
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_scan_time,
|
||||
"var_4" : self.scan_rate,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureStillWedge(MxFlyscanBase):
|
||||
class MeasureStillWedge(AerotechFlyscanBase):
|
||||
""" Still wedge scan
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -521,35 +505,27 @@ class MeasureStillWedge(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "exp_time", "oscrange"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.STILL_WEDGE
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = self.caller_kwargs.get("scan_time")
|
||||
self.scan_oscrange = self.caller_kwargs.get("oscrange")
|
||||
self.scan_shutter_sleep = self.caller_kwargs.get("sleep_after_shutter_close", 0.01)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_oscrange = parameter['kwargs'].get("oscrange")
|
||||
self.scan_shutter_sleep = parameter['kwargs'].get("sleep_after_shutter_close", 0.01)
|
||||
self.abr_command = AbrCmd.STILL_WEDGE
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_scan_time,
|
||||
"var_4" : self.scan_oscrange,
|
||||
"var_5" : self.scan_shutter_sleep,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureStills(MxFlyscanBase):
|
||||
class MeasureStills(AerotechFlyscanBase):
|
||||
""" Still image sequence scan
|
||||
|
||||
Measures a series of still images without any motor movement.
|
||||
@@ -575,31 +551,23 @@ class MeasureStills(MxFlyscanBase):
|
||||
required_kwargs = ["exp_time", "nframes", "idle_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = []
|
||||
|
||||
self.scan_command = AbrCmd.STILLS
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_num_frames = int(self.caller_kwargs.get("nframes"))
|
||||
self.scan_idle_time = self.caller_kwargs.get("idle_time")
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_num_frames = parameter['kwargs'].get("nframes")
|
||||
self.scan_idle_time = parameter['kwargs'].get("idle_time")
|
||||
self.abr_command = AbrCmd.STILLS
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_num_frames,
|
||||
"var_2" : self.scan_exp_time,
|
||||
"var_3" : self.scan_idle_time,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureSingleOscillation(MxFlyscanBase):
|
||||
class MeasureSingleOscillation(AerotechFlyscanBase):
|
||||
""" Single oscillation scan
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -626,35 +594,27 @@ class MeasureSingleOscillation(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "scan_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.SINGLE_OSCILLATION
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = self.caller_kwargs.get("scan_time")
|
||||
self.scan_delta = self.caller_kwargs.get("delta", 0)
|
||||
self.scan_settle = self.caller_kwargs.get("settle", 0.5)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0)
|
||||
self.scan_settle = parameter['kwargs'].get("settle", 0.5)
|
||||
self.abr_command = AbrCmd.SINGLE_OSCILLATION
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_scan_time,
|
||||
"var_4" : self.scan_delta,
|
||||
"var_5" : self.scan_settle,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureRepeatedOscillation(MxFlyscanBase):
|
||||
class MeasureRepeatedOscillation(AerotechFlyscanBase):
|
||||
""" Repeated oscillation scan
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -681,35 +641,27 @@ class MeasureRepeatedOscillation(MxFlyscanBase):
|
||||
required_kwargs = ["nframes", "range", "scan_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.REPEAT_SINGLE_OSCILLATION
|
||||
self.scan_num_frames = self.caller_kwargs.get("nframes")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_scan_time = self.caller_kwargs.get("scan_time")
|
||||
self.scan_sleep = self.caller_kwargs.get("sleep", 0)
|
||||
self.scan_delta = self.caller_kwargs.get("delta", 0.5)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_num_frames = parameter['kwargs'].get("nframes")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_scan_time = parameter['kwargs'].get("scan_time")
|
||||
self.scan_sleep = parameter['kwargs'].get("sleep", 0)
|
||||
self.scan_delta = parameter['kwargs'].get("delta", 0.5)
|
||||
self.abr_command = AbrCmd.REPEAT_SINGLE_OSCILLATION
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_num_frames,
|
||||
"var_2" : self.scan_scan_time,
|
||||
"var_3" : self.scan_range,
|
||||
"var_4" : self.scan_sleep,
|
||||
"var_5" : self.scan_delta,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureMSOX(MxFlyscanBase):
|
||||
class MeasureMSOX(AerotechFlyscanBase):
|
||||
""" Standard MSOX scan
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -736,24 +688,16 @@ class MeasureMSOX(MxFlyscanBase):
|
||||
required_kwargs = ["start", "range", "exp_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.MSOX
|
||||
self.scan_start = self.caller_kwargs.get("start")
|
||||
self.scan_range = self.caller_kwargs.get("range")
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_num_datasets = int(self.caller_kwargs.get("num_datasets", 1))
|
||||
self.scan_rest_time = self.caller_kwargs.get("rest_time", 0)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset the scan engine
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "reset")
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_start = parameter['kwargs'].get("start")
|
||||
self.scan_range = parameter['kwargs'].get("range")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_num_datasets = parameter['kwargs'].get("num_datasets", 1)
|
||||
self.scan_rest_time = parameter['kwargs'].get("rest_time", 0)
|
||||
self.abr_command = AbrCmd.MSOX
|
||||
self.abr_reset = True
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_start,
|
||||
"var_2" : self.scan_range,
|
||||
"var_3" : self.scan_exp_time,
|
||||
@@ -765,14 +709,12 @@ class MeasureMSOX(MxFlyscanBase):
|
||||
"var_9" : 0,
|
||||
"var_10" : 0,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
|
||||
|
||||
class MeasureGrid(MxFlyscanBase):
|
||||
class MeasureGrid(AerotechFlyscanBase):
|
||||
""" General grid scan
|
||||
|
||||
Note: This was probably never used in its current form, because the
|
||||
@@ -799,27 +741,30 @@ class MeasureGrid(MxFlyscanBase):
|
||||
scan_name = "measure_raster"
|
||||
required_kwargs = ["positions", "ncols", "angle", "exp_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
# Default values
|
||||
scan_start_x = None
|
||||
scan_end_x = None
|
||||
scan_xstep = 0.010
|
||||
scan_ystep = 0.010
|
||||
scan_zstep = 0.010
|
||||
scan_gmy_step = 0.010
|
||||
|
||||
self.scan_command = AbrCmd.RASTER_SCAN
|
||||
self.scan_positions = self.caller_kwargs.get("positions")
|
||||
self.scan_stepnum_x = int(self.caller_kwargs.get("ncols"))
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.RASTER_SCAN
|
||||
self.abr_reset = True
|
||||
self.scan_positions = parameter['kwargs'].get("positions")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("ncols")
|
||||
self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x
|
||||
self.scan_angle = self.caller_kwargs.get("angle")
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_angle = parameter['kwargs'].get("angle")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
|
||||
if len(self.scan_positions)==1:
|
||||
raise RuntimeWarning("Raster scan with one cell makes no sense")
|
||||
|
||||
# Good practice: new members only in __init__
|
||||
self.scan_start_x = None
|
||||
self.scan_end_x = None
|
||||
self.scan_xstep = 0.010
|
||||
self.scan_ystep = 0.010
|
||||
self.scan_zstep = 0.010
|
||||
self.scan_gmy_step = 0.010
|
||||
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def prepare_positions(self):
|
||||
# Calculate step sizes
|
||||
@@ -851,14 +796,8 @@ class MeasureGrid(MxFlyscanBase):
|
||||
# self._raster_current_row = 0
|
||||
# self._raster_num_rows = rows
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset the scan engine
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "reset")
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_omega_position,
|
||||
"var_2" : self.scan_start_x,
|
||||
"var_3" : self.scan_end_x,
|
||||
@@ -870,16 +809,10 @@ class MeasureGrid(MxFlyscanBase):
|
||||
"var_9" : self.scan_gmx_offset,
|
||||
"var_10" : self.scan_gmy_step,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class MeasureGridStill(MxFlyscanBase):
|
||||
class MeasureGridStill(AerotechFlyscanBase):
|
||||
""" Still grid scan
|
||||
|
||||
Note: This was probably never used in its current form, because the
|
||||
@@ -908,27 +841,29 @@ class MeasureGridStill(MxFlyscanBase):
|
||||
scan_name = "measure_raster_still"
|
||||
required_kwargs = ["positions", "columns", "exp_time", "delay"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
# Default values
|
||||
scan_start_x = None
|
||||
scan_end_x = None
|
||||
scan_xstep = 0.010
|
||||
scan_ystep = 0.010
|
||||
scan_zstep = 0.010
|
||||
scan_gmy_step = 0.010
|
||||
|
||||
self.scan_command = AbrCmd.RASTER_SCAN_STILL
|
||||
self.scan_positions = self.caller_kwargs.get("positions")
|
||||
self.scan_stepnum_x = int(self.caller_kwargs.get("columns"))
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
self.axis = ['omega']
|
||||
# Set parameters from meaningful inputs
|
||||
self.abr_command = AbrCmd.RASTER_SCAN_STILL
|
||||
self.abr_reset = True
|
||||
self.scan_positions = parameter['kwargs'].get("positions")
|
||||
self.scan_stepnum_x = parameter['kwargs'].get("columns")
|
||||
self.scan_stepnum_y = self.scan_positions / self.scan_stepnum_x
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_delay = self.caller_kwargs.get("delay")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_delay = parameter['kwargs'].get("delay")
|
||||
|
||||
if len(self.scan_positions)==1:
|
||||
raise RuntimeWarning("Raster scan with one cell makes no sense")
|
||||
|
||||
# Good practice: new members only in __init__
|
||||
self.scan_start_x = None
|
||||
self.scan_end_x = None
|
||||
self.scan_xstep = 0.010
|
||||
self.scan_ystep = 0.010
|
||||
self.scan_zstep = 0.010
|
||||
self.scan_gmy_step = 0.010
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def prepare_positions(self):
|
||||
# Calculate step sizes
|
||||
@@ -960,14 +895,7 @@ class MeasureGridStill(MxFlyscanBase):
|
||||
# self._raster_current_row = 0
|
||||
# self._raster_num_rows = rows
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Reset the scan engine
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "reset")
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_omega_position,
|
||||
"var_2" : self.scan_start_x,
|
||||
"var_3" : self.scan_end_x,
|
||||
@@ -979,14 +907,10 @@ class MeasureGridStill(MxFlyscanBase):
|
||||
"var_9" : self.scan_gmx_offset,
|
||||
"var_10" : self.scan_gmy_step,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
|
||||
|
||||
|
||||
class MeasureJungfrau(MxFlyscanBase):
|
||||
class MeasureJungfrau(AerotechFlyscanBase):
|
||||
""" Scan with the Jungfrau detector
|
||||
|
||||
The scan itself is executed by the scan service running on the Aerotech
|
||||
@@ -1015,24 +939,19 @@ class MeasureJungfrau(MxFlyscanBase):
|
||||
required_kwargs = ["columns", "rows", "width", "height", "exp_time"]
|
||||
|
||||
def __init__(self, *args, parameter: dict = None, **kwargs):
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
self.axis = ['omega']
|
||||
|
||||
self.scan_command = AbrCmd.JUNGFRAU
|
||||
self.scan_num_cols = self.caller_kwargs.get("columns")
|
||||
self.scan_num_rows = self.caller_kwargs.get("rows")
|
||||
self.scan_width = self.caller_kwargs.get("width")
|
||||
self.scan_height = self.caller_kwargs.get("height")
|
||||
self.scan_exp_time = self.caller_kwargs.get("exp_time")
|
||||
self.scan_sleep = self.caller_kwargs.get("sleep", 0.1)
|
||||
|
||||
self.scan_timeout = None if self.scan_sleep <= 0 else 5.0 + (self.scan_num_rows * self.scan_sleep) + (self.scan_num_cols * self.scan_num_rows * self.scan_exp_time)
|
||||
|
||||
def pre_scan(self):
|
||||
# ToDo: Move roughly to start position
|
||||
|
||||
# Configure the global variables
|
||||
d = {"scan_command" : self.scan_command,
|
||||
# Set parameters from meaningful inputs
|
||||
self.scan_num_cols = parameter['kwargs'].get("columns")
|
||||
self.scan_num_rows = parameter['kwargs'].get("rows")
|
||||
self.scan_width = parameter['kwargs'].get("width")
|
||||
self.scan_height = parameter['kwargs'].get("height")
|
||||
self.scan_exp_time = parameter['kwargs'].get("exp_time")
|
||||
self.scan_sleep = parameter['kwargs'].get("sleep", 0.1)
|
||||
self.abr_timeout = None if self.scan_sleep <= 0 else 5.0 + (self.scan_num_rows * self.scan_sleep) + (self.scan_num_cols * self.scan_num_rows * self.scan_exp_time)
|
||||
self.abr_command = AbrCmd.JUNGFRAU
|
||||
self.abr_reset = True
|
||||
self.abr_complete = True
|
||||
self.abr_globals = {
|
||||
"var_1" : self.scan_num_cols,
|
||||
"var_2" : self.scan_num_rows,
|
||||
"var_3" : self.scan_width,
|
||||
@@ -1041,23 +960,14 @@ class MeasureJungfrau(MxFlyscanBase):
|
||||
"var_6" : self.scan_sleep,
|
||||
"var_7" : 0,
|
||||
"var_8" : 0,
|
||||
}
|
||||
yield from self.stubs.send_rpc_and_wait("abr", "configure", d)
|
||||
|
||||
# Call super
|
||||
yield from self.stubs.pre_scan()
|
||||
}
|
||||
# Call base class
|
||||
super().__init__(parameter=parameter, **kwargs)
|
||||
|
||||
def scan_core(self):
|
||||
""" The actual scan logic comes here.
|
||||
"""
|
||||
self.pointID = 0
|
||||
# Kick off the run
|
||||
yield from self.stubs.command("abr", "start_command.set", 1)
|
||||
logger.info("Measurement launched on the ABR stage...")
|
||||
|
||||
# Wait for grid scanner to finish
|
||||
st = yield from self.stubs.send_rpc_and_wait("abr", "complete", self.scan_timeout)
|
||||
st.wait()
|
||||
yield from super().scan_core()
|
||||
|
||||
# Go back to direct mode
|
||||
st = yield from self.stubs.send_rpc_and_wait("abr", "set_axis_mode", "direct")
|
||||
|
||||
Reference in New Issue
Block a user